gdal/dem.py

416 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import math
import os
import shutil
import tomli
import ezdxf
from osgeo import gdal
import numpy as np
import pandas as pd
from pw import DFile, ControlFile
import dem_utils
from nwed import Nwed
class Dem:
def __init__(self, toml_path):
with open(toml_path, "rb") as tf:
toml_dict = tomli.load(tf)
self._toml_dict = toml_dict
dem_file_path = toml_dict["parameter"]["dem_file_path"]
self._tree_height = toml_dict["parameter"]["tree_height"]
self._dataset = gdal.Open(dem_file_path)
self._dem_resolution = self._dataset.GetGeoTransform()[1]
def get_dem_info(self, if_print=False):
"""Get the information of DEM data.
Parameters:
dem_data <osgeo.gdal.Dataset> -- The data of DEM.
if_print <bool> -- If print the information of DEM. Default is False (not print).
Return:
... <...> -- The information and parameters of DEM.
"""
dem_data = self._dataset
dem_row = dem_data.RasterYSize # height
dem_col = dem_data.RasterXSize # width
dem_band = dem_data.RasterCount
dem_gt = dem_data.GetGeoTransform()
dem_proj = dem_data.GetProjection()
if if_print:
print("\nThe information of DEM:")
print("The number of row (height) is: %d" % dem_row)
print("The number of column (width) is: %d" % dem_col)
print("The number of band is: %d" % dem_band)
print("The 6 GeoTransform parameters are:\n", dem_gt)
print("The GCS/PCS information is:\n", dem_proj)
return dem_row, dem_col, dem_band, dem_gt, dem_proj
def write(self):
# TODO:不应该设置缩放因数
zoom_factor = 1
excel_pfs = self._read_path_file()
segments = []
plate_doc = ezdxf.new(dxfversion="R2010")
plate_msp = plate_doc.modelspace()
toml_dict = self._toml_dict
out_dxf_file_dir = toml_dict["parameter"]["out_dxf_file_dir"]
# 写整个断面
dm_whole_doc = ezdxf.new(dxfversion="R2004")
dm_whole_accumulative_distance = 0 # 累加里程
dm_whole_msp = dm_whole_doc.modelspace()
for foo in range(len(excel_pfs) - 1):
start_point_name: str = excel_pfs.iloc[foo, 3]
end_point_name: str = excel_pfs.iloc[foo + 1, 3]
point_x_s = float(excel_pfs.iloc[foo, 1])
point_y_s = float(excel_pfs.iloc[foo, 2])
point_x_e = float(excel_pfs.iloc[foo + 1, 1])
point_y_e = float(excel_pfs.iloc[foo + 1, 2])
line_coordination = self.to_line_coordination(
point_x_s, point_y_s, point_x_e, point_y_e
)
left_elevation = self.get_elevation(line_coordination[:, 0:2])
center_elevation = self.get_elevation(line_coordination[:, 2:4])
right_elevation = self.get_elevation(line_coordination[:, 4:6])
dm_doc = ezdxf.new(dxfversion="R2004")
# 设置线形
# for name, desc, pattern in linetypes():
# if name not in dm_doc.linetypes:
# dm_doc.linetypes.add(
# name=name,
# pattern=pattern,
# description=desc,
# )
dm_msp = dm_doc.modelspace()
x_axis = [0]
cord_0 = line_coordination[0, 2:4] # 取中线的x轴作为横坐标
for cord in line_coordination[1:, 2:4]:
x_axis.append(dem_utils.distance(cord, cord_0))
# 左边线
left_line = [
(x_axis[i] / 5 / zoom_factor, left_elevation[i] * 2 / zoom_factor)
for i in range(len(left_elevation))
]
dm_whole_msp.add_polyline2d(
np.array(left_line)
+ np.hstack(
(
dm_whole_accumulative_distance * np.ones((len(x_axis), 1)) / 5,
np.zeros((len(x_axis), 1)),
)
),
dxfattribs={"color": 1},
) # 红色
dm_msp.add_polyline2d(left_line, dxfattribs={"color": 1}) # 红色
mid_line = [
(x_axis[i] / 5 / zoom_factor, center_elevation[i] * 2 / zoom_factor)
for i in range(len(center_elevation))
] # 中线
dm_whole_msp.add_polyline2d(
np.array(mid_line)
+ np.hstack(
(
dm_whole_accumulative_distance * np.ones((len(x_axis), 1)) / 5,
np.zeros((len(x_axis), 1)),
)
)
)
dm_msp.add_polyline2d(mid_line)
# 右边线
right_line = [
(x_axis[i] / 5 / zoom_factor, right_elevation[i] * 2 / zoom_factor)
for i in range(len(right_elevation))
]
dm_whole_msp.add_polyline2d(
np.array(right_line)
+ np.hstack(
(
dm_whole_accumulative_distance * np.ones((len(x_axis), 1)) / 5,
np.zeros((len(x_axis), 1)),
)
),
dxfattribs={"color": 5}, # 蓝色
)
dm_msp.add_polyline2d(
right_line,
dxfattribs={"color": 5},
) # 蓝色
# 树的线
# 考虑用最高边线的情况
tree_line = [
(
x_axis[i] / 5 / zoom_factor,
(
np.max(
(
center_elevation[i],
left_elevation[i],
right_elevation[i],
)
)
+ self._tree_height
)
* 2
/ zoom_factor,
)
for i in range(len(center_elevation))
]
if self._tree_height > 0:
dm_whole_msp.add_polyline2d(
np.array(tree_line)
+ np.hstack(
(
dm_whole_accumulative_distance
* np.ones((len(x_axis), 1))
/ 5,
np.zeros((len(x_axis), 1)),
)
),
dxfattribs={"color": 5},
)
dm_msp.add_polyline2d(tree_line, dxfattribs={"color": 5})
dm_whole_accumulative_distance += x_axis[-1]
os.makedirs(out_dxf_file_dir, exist_ok=True)
ezdxf.options.set(
"odafc-addon",
"win_exec_path",
"d:/ProgramFiles/ODAFileConverter/ODAFileConverter.exe",
)
from ezdxf.addons.odafc import export_dwg
export_dwg(
dm_doc,
f"{out_dxf_file_dir}/D{100+int(start_point_name[1:])}.dwg",
replace=True,
) # 写断面文件
# 写Z文件
z_file_path = f"{out_dxf_file_dir}/ZD{100+int(start_point_name[1:])}"
with open(z_file_path, "w") as z_file:
z_file.write("0 ")
z_file.write(f"{center_elevation[0]*2} ")
z_file.write("0 ")
z_file.write("0 ")
z_file.write(f"{center_elevation[0]*2-50}")
if foo == 0:
# copy file a to dist b
shutil.copy(z_file_path, f"{out_dxf_file_dir}/ZDA")
# 写平面文件
plate_msp.add_polyline2d(
line_coordination[:, 0:2],
dxfattribs={"color": 1},
) # 红色
plate_msp.add_polyline2d(
line_coordination[:, 2:4],
)
plate_msp.add_polyline2d(
line_coordination[:, 4:6],
dxfattribs={"color": 5},
) # 蓝色
mileage = [0]
start_point = line_coordination[0, 2:4]
for bar in line_coordination[1:, 2:4]:
mileage.append(round(dem_utils.distance(start_point, bar)))
mileage = np.array(mileage)
start_num = 4000 + int(start_point_name[1:])
segments.append(start_num)
end_num = 4000 + int(end_point_name[1:])
if foo == len(excel_pfs) - 2:
segments.append(end_num)
d_file = DFile(
start_num,
end_num,
mileage,
center_elevation,
left_elevation,
right_elevation,
self._tree_height,
)
d_file.save(out_dxf_file_dir)
self._copy_db_file()
tower_start_num = toml_dict["parameter"]["tower_number_start"]
control_file_template_path = toml_dict["parameter"][
"control_file_template_path"
]
c_file = ControlFile(
segments, tower_start_num, control_file_template_path, out_dxf_file_dir
)
c_file.save()
##################
##################
# 写nwed
section_name = f"{100+int(start_point_name[1:])}"
licheng = f"{mileage[-1]}"
remarks_start = f"{int(start_point_name[1:])}"
pole_name_start = f"{start_num}"
remarks_end = f"{int(end_point_name[1:])}"
pole_name_end = f"{end_num}"
toml_dict = self._toml_dict
side_width = toml_dict["parameter"]["side_width"] # 边线宽度
verticalExtent2 = f"{side_width}"
mid_vec_list = list(map(lambda x: [f"{x[0]*5}", f"{x[1]/2}"], mid_line))
right_vec_list = list(map(lambda x: [f"{x[0]*5}", f"{x[1]/2}"], right_line))
left_vec_list = list(map(lambda x: [f"{x[0]*5}", f"{x[1]/2}"], left_line))
nwed = Nwed(
section_name,
licheng,
remarks_start,
pole_name_start,
remarks_end,
pole_name_end,
verticalExtent2,
mid_vec_list,
right_vec_list,
left_vec_list,
)
nwed.write(
f"{out_dxf_file_dir}/{100+int(start_point_name[1:])}.nwed",
)
# 写整个断面文件
export_dwg(
dm_whole_doc,
f"{out_dxf_file_dir}/DA.dwg",
replace=True,
)
# 写平面文件
plate_doc.saveas(f"{out_dxf_file_dir}/plate.dxf")
def get_elevation(self, site_x_y):
"""Get the elevation of given locations from DEM in GCS.
Parameters:
dem_gcs <osgeo.gdal.Dataset> -- The input DEM (in GCS).
site_latlng <numpy.ndarray> -- The latitude and longitude of given locations.
Return:
site_ele <numpy.ndarray> -- The elevation and other information of given locations.
"""
gdal_data = self._dataset
# gdal_band = gdal_data.GetRasterBand(1)
# nodataval = gdal_band.GetNoDataValue()
# if np.any(gdal_array == nodataval):
# gdal_array[gdal_array == nodataval] = np.nan
gt = gdal_data.GetGeoTransform()
# print("\nThe 6 GeoTransform parameters of DEM are:\n", gt)
N_site = site_x_y.shape[0]
Xgeo = site_x_y[:, 0]
Ygeo = site_x_y[:, 1]
site_ele = np.zeros(N_site)
for i in range(N_site):
# Note:
# Xgeo = gt[0] + Xpixel * gt[1] + Yline * gt[2]
# Ygeo = gt[3] + Xpixel * gt[4] + Yline * gt[5]
#
# Xpixel - Pixel/column of DEM
# Yline - Line/row of DEM
#
# Xgeo - Longitude
# Ygeo - Latitude
#
# [0] = Longitude of left-top pixel
# [3] = Latitude of left-top pixel
#
# [1] = + Pixel width
# [5] = - Pixel height
#
# [2] = 0 for north up DEM
# [4] = 0 for north up DEM
Xpixel = (Xgeo[i] - gt[0]) / gt[1]
Yline = (Ygeo[i] - gt[3]) / gt[5]
# 寻找左上左下右上右下4个点
lu_xy = np.array([math.floor(Xpixel), math.ceil(Yline)]) # 左上
ld_xy = np.array([math.floor(Xpixel), math.floor(Yline)]) # 左下
ru_xy = np.array([math.ceil(Xpixel), math.ceil(Yline)]) # 右上
rd_xy = np.array([math.ceil(Xpixel), math.floor(Yline)]) # 右下
lu_elevation = gdal_data.ReadAsArray(
int(lu_xy[0]), int(lu_xy[1]), 1, 1
).astype(float)
ld_elevation = gdal_data.ReadAsArray(
int(ld_xy[0]), int(ld_xy[1]), 1, 1
).astype(float)
ru_elevation = gdal_data.ReadAsArray(
int(ru_xy[0]), int(ru_xy[1]), 1, 1
).astype(float)
rd_elevation = gdal_data.ReadAsArray(
int(rd_xy[0]), int(rd_xy[1]), 1, 1
).astype(float)
# 依据https://blog.csdn.net/jameschen9051/article/details/109469228中的公司
point_z = (
lu_elevation[0]
* (
(1 - math.fabs(Xpixel - lu_xy[0]))
* (1 - math.fabs(Yline - lu_xy[1]))
)
+ ld_elevation[0]
* (
(1 - math.fabs(Xpixel - ld_xy[0]))
* (1 - math.fabs(Yline - ld_xy[1]))
)
+ ru_elevation[0]
* (
(1 - math.fabs(Xpixel - ru_xy[0]))
* (1 - math.fabs(Yline - ru_xy[1]))
)
+ rd_elevation[0]
* (
(1 - math.fabs(Xpixel - rd_xy[0]))
* (1 - math.fabs(Yline - rd_xy[1]))
)
)
site_ele[i] = point_z
pass
return site_ele
def to_line_coordination(self, point_x_s, point_y_s, point_x_e, point_y_e):
path_length = (
(point_x_s - point_x_e) ** 2 + (point_y_s - point_y_e) ** 2
) ** 0.5
dem_resolution = self._dem_resolution # dem的精度
# TODO:设定为5m 1个点。
# n = round(path_length / dem_resolution)
n = round(path_length / 5)
center_point_x = np.linspace(point_x_s, point_x_e, n, endpoint=True)
center_point_y = np.linspace(point_y_s, point_y_e, n, endpoint=True)
# 计算左右边线
# 计算角度
toml_dict = self._toml_dict
side_width = toml_dict["parameter"]["side_width"] # 边线宽度
_line_angel = math.atan((point_y_e - point_y_s) / (point_x_e - point_x_s))
if point_x_e < point_x_s:
line_angel = _line_angel + math.pi
else:
line_angel = _line_angel
left_offset_x = side_width * math.cos(line_angel + math.pi / 2)
left_offset_y = side_width * math.sin(line_angel + math.pi / 2)
left_offset_point_x = center_point_x + left_offset_x
left_offset_point_y = center_point_y + left_offset_y
right_offset_point_x = center_point_x - left_offset_x # 向左偏移和向右偏移正好是相反的
right_offset_point_y = center_point_y - left_offset_y
r = np.array(
[
np.array(left_offset_point_x),
np.array(left_offset_point_y),
center_point_x,
center_point_y,
np.array(right_offset_point_x),
np.array(right_offset_point_y),
]
).T
return r
def _read_path_file(self):
toml_dict = self._toml_dict
path_file = toml_dict["parameter"]["path_file"]
excel_pds = pd.read_excel(path_file, header=None)
return excel_pds
def _copy_db_file(self):
toml_dict = self._toml_dict
db_file_dir = toml_dict["parameter"]["db_file_dir"]
out_dxf_file_dir = toml_dict["parameter"]["out_dxf_file_dir"]
shutil.copy(f"{db_file_dir}/Tower.db", f"{out_dxf_file_dir}/Tower.db")
shutil.copy(f"{db_file_dir}/ATMOS.db", f"{out_dxf_file_dir}/ATMOS.db")
shutil.copy(f"{db_file_dir}/Fit.db", f"{out_dxf_file_dir}/Fit.db")
shutil.copy(f"{db_file_dir}/RULE.db", f"{out_dxf_file_dir}/RULE.db")
shutil.copy(f"{db_file_dir}/WIRE.db", f"{out_dxf_file_dir}/WIRE.db")