417 lines
16 KiB
Python
417 lines
16 KiB
Python
import re
|
|
import datetime
|
|
import shutil
|
|
import os.path
|
|
import re
|
|
|
|
|
|
def file_backup_time():
|
|
now = datetime.datetime.now()
|
|
return now.strftime("%Y%m%d%H%M%S")
|
|
|
|
|
|
def get_directory(file_path):
|
|
return os.path.dirname(file_path)
|
|
|
|
|
|
def get_file_name_with_extention(file_path):
|
|
dir_part = get_directory(file_path)
|
|
file_name = file_path.replace(dir_part, "").replace("\\", "").replace(r"/", "")
|
|
(name, ext) = file_name.split(".")
|
|
return name, ext
|
|
|
|
|
|
class SFileObject:
|
|
def __init__(self, file_path):
|
|
self._content = []
|
|
self.file_path = file_path
|
|
self._read(file_path)
|
|
pass
|
|
|
|
def _read(self, file_path):
|
|
with open(file_path) as f:
|
|
for line in f:
|
|
trimeed_line = line.strip()
|
|
if trimeed_line == "":
|
|
continue
|
|
norm_trimed_line = re.sub(r"(\s+)", ",", trimeed_line)
|
|
sep = norm_trimed_line.split(",")
|
|
self._content.append(sep)
|
|
|
|
def _save_backup(self, back_file_path):
|
|
shutil.copy(self.file_path, back_file_path)
|
|
|
|
# 将数据按行写入S数据文件
|
|
def write(self, target_file_path):
|
|
(file_name, ext) = get_file_name_with_extention(self.file_path)
|
|
src_file_dir = get_directory(self.file_path)
|
|
self._save_backup(
|
|
"{dir}/{file_name}-{time}.{ext}".format(
|
|
dir=src_file_dir, file_name=file_name, time=file_backup_time(), ext=ext
|
|
)
|
|
)
|
|
with open(target_file_path, "wt") as file:
|
|
for content in self._content:
|
|
_content = [str(x) for x in content]
|
|
file.write("{content}\n".format(content=" ".join(_content)))
|
|
|
|
def has(self, tower_number): # 塔位号是否存在S文件中。可能存在多行的情况。
|
|
indexes = []
|
|
for index, content in enumerate(self._content):
|
|
if content[0] == tower_number:
|
|
indexes.append(index)
|
|
return indexes
|
|
|
|
def content(self):
|
|
return self._content
|
|
|
|
def update_height(self, index, new_height): # 更新内存中S文件中的塔高
|
|
content = self._content
|
|
content[index][7] = new_height
|
|
|
|
# 更新档距
|
|
def update_all_mileage(self, new_mileage_dic):
|
|
content = list(self._content)
|
|
first_tower_number_in_s = self._content[2][0]
|
|
tower_mileage_of_first_tower_in_ta = new_mileage_dic[first_tower_number_in_s]
|
|
for index, s_entry in enumerate(self._content):
|
|
if s_entry[0] == "首端转角号" or s_entry[0] == "塔号":
|
|
continue
|
|
tower_number = s_entry[0]
|
|
if tower_number in new_mileage_dic:
|
|
new_mileage = (
|
|
new_mileage_dic[tower_number] - tower_mileage_of_first_tower_in_ta
|
|
)
|
|
content[index][1] = new_mileage
|
|
self._content = content
|
|
|
|
|
|
# 把所有S文件当做一个来操作
|
|
class SFileAsSingle:
|
|
def __init__(self, s_files):
|
|
self._s_files = s_files
|
|
|
|
# 考虑不同位置处出现塔号的情况
|
|
# 这个函数需要打开一次S文件
|
|
def has(self, tower_number) -> [(SFileObject, int)]:
|
|
s_file = self._s_files
|
|
ret = []
|
|
for s in s_file:
|
|
d_file_obj = SFileObject(s)
|
|
indexes = d_file_obj.has(tower_number)
|
|
for index in indexes:
|
|
ret.append((d_file_obj, index))
|
|
return ret
|
|
|
|
# 写入一次S文件
|
|
def update_height(self, result: [(SFileObject, int)], new_height):
|
|
s_changed_record = {}
|
|
for s_file_obj, index in result:
|
|
s_file_obj.update_height(index, new_height)
|
|
s_changed_record[s_file_obj] = 1
|
|
# 输出变化的文件名
|
|
for s_file_obj in s_changed_record:
|
|
s_output_file_path = s_file_obj.file_path
|
|
s_file_obj.write(s_output_file_path)
|
|
print("update S file {Ss}".format(Ss=s_output_file_path))
|
|
|
|
# 写入一次S文件
|
|
def update_all_mileage(self, new_mileage_dic):
|
|
s_file = self._s_files
|
|
for s in s_file:
|
|
s_object = SFileObject(s)
|
|
s_object.update_all_mileage(new_mileage_dic)
|
|
s_output_file_path = s_object.file_path
|
|
s_object.write(s_output_file_path)
|
|
print("update mileage in S file {Ss}".format(Ss=s_output_file_path))
|
|
|
|
def content(self):
|
|
s_file = self._s_files
|
|
for s in s_file:
|
|
d_file_obj = SFileObject(s)
|
|
s_content = d_file_obj.content()
|
|
for c in s_content:
|
|
yield c
|
|
|
|
|
|
class TaFileObject:
|
|
def __init__(self, file_path, SFile):
|
|
# self._content = []
|
|
self._file_path = file_path
|
|
self._read(file_path)
|
|
self._old_tower_height_record = None
|
|
self._SFile = SFile
|
|
self._old_span_record = None # 档距
|
|
pass
|
|
|
|
def _read(self, file_path):
|
|
contents = []
|
|
with open(file_path) as f:
|
|
for line in f:
|
|
trimeed_line = line.strip()
|
|
if trimeed_line == "":
|
|
continue
|
|
norm_trimed_line = re.sub(r"(\s+)", "", trimeed_line)
|
|
sep = norm_trimed_line.split(",")
|
|
contents.append(sep)
|
|
return contents
|
|
|
|
@staticmethod
|
|
def _make_tower_height_record(contents): # 将TA文件读入内存字典中
|
|
tower_height_record = {}
|
|
for content in contents:
|
|
sep = content
|
|
if len(sep) > 9:
|
|
tower_number = sep[0]
|
|
tower_height = sep[9]
|
|
tower_height_record[tower_number] = tower_height
|
|
return tower_height_record
|
|
|
|
# def _make_span_record(self, ta_content): # 将TA 文件档距读入字典中。
|
|
# tower_span_record = {}
|
|
# basic_mileage = 0
|
|
# for ta_entry in ta_content:
|
|
# if len(ta_entry) > 2:
|
|
# tower_number = ta_entry[0]
|
|
# tower_mileage = ta_entry[2]
|
|
# tower_span_record[tower_number] = tower_mileage - basic_mileage
|
|
# basic_mileage = tower_mileage
|
|
# return tower_span_record
|
|
|
|
def sync_changed_tower_height_to_S(self, ta_new_contents, old_tower_height_record):
|
|
updated = False
|
|
for ta_entry in ta_new_contents:
|
|
sep = ta_entry
|
|
if len(sep) > 9:
|
|
tower_number = sep[0]
|
|
old_tower_height = float(old_tower_height_record[tower_number])
|
|
new_tower_height = float(sep[9])
|
|
if abs(old_tower_height - new_tower_height) > 1e-5:
|
|
print(
|
|
"{tower_number} height changes to {new_tower_height}".format(
|
|
tower_number=tower_number, new_tower_height=new_tower_height
|
|
)
|
|
)
|
|
SFile = self._SFile
|
|
s_as_single = SFileAsSingle(SFile)
|
|
result = s_as_single.has(tower_number)
|
|
if len(result) > 0:
|
|
s_as_single.update_height(result, new_tower_height)
|
|
updated = True
|
|
return updated
|
|
|
|
def sync_all_tower_mileage_to_s(self):
|
|
ta_new_contents = self._read(self._file_path)
|
|
new_mileage_dic = {}
|
|
for ta_entry in ta_new_contents:
|
|
if len(ta_entry) > 2:
|
|
tower_number = ta_entry[0]
|
|
new_tower_mileage = float(ta_entry[2])
|
|
new_mileage_dic[tower_number] = new_tower_mileage
|
|
SFile = self._SFile
|
|
s_as_single = SFileAsSingle(SFile)
|
|
s_as_single.update_all_mileage(new_mileage_dic)
|
|
|
|
# # 同步Ta文件中修改过的档距。
|
|
# @staticmethod
|
|
# def sync_changed_tower_span_to_s(ta_new_contents, old_tower_span_record):
|
|
# for ta_entry in ta_new_contents:
|
|
# tower_number = ta_entry[0]
|
|
# old_tower_span = float(old_tower_span_record[tower_number])
|
|
# new_tower_span = float(ta_entry[2])
|
|
# if abs(old_tower_span - new_tower_span) > 1e-5:
|
|
# print(
|
|
# "{tower_number} span changes to {new_tower_span}".format(
|
|
# tower_number=tower_number, new_tower_span=new_tower_span
|
|
# )
|
|
# )
|
|
|
|
def sync_all_tower_height_from_TA_to_S(self, prompt=True):
|
|
if prompt:
|
|
enter_string = input(
|
|
"Will synchronize all height from TA to S. Enter AH2S:\n"
|
|
)
|
|
if enter_string.lower() != "ah2s":
|
|
return
|
|
ta_content = self._read(self._file_path)
|
|
for entry in ta_content:
|
|
sep = entry
|
|
if len(sep) > 9:
|
|
tower_number = sep[0]
|
|
new_tower_height = float(sep[9])
|
|
s_files = self._SFile
|
|
s_as_single = SFileAsSingle(s_files)
|
|
result = s_as_single.has(tower_number)
|
|
if len(result) > 0:
|
|
s_as_single.update_height(result, new_tower_height)
|
|
|
|
# 将S文件中的塔高同步到TA文件中。
|
|
def sync_all_tower_height_from_S(self):
|
|
enter_string = input(
|
|
"Will synchronize height in S to Ta. Enter S2T to proceed.\n"
|
|
)
|
|
if enter_string.lower() != "s2t":
|
|
print("Not confirmed.")
|
|
return
|
|
# 先取得所有S的呼高
|
|
s_tower_height_dic = {}
|
|
s_as_single = SFileAsSingle(self._SFile)
|
|
for s_entry in s_as_single.content():
|
|
if s_entry[0] == "首端转角号" or s_entry[0] == "塔号":
|
|
continue
|
|
tower_number = s_entry[0]
|
|
tower_height = s_entry[7]
|
|
if tower_number in s_tower_height_dic:
|
|
continue # 只取第一次出现的。避免耐张段末端和首端不一致的问题。
|
|
s_tower_height_dic[tower_number] = tower_height
|
|
ta_content = self._read(self._file_path)
|
|
new_ta_content = list(ta_content)
|
|
for index, ta_entry in enumerate(ta_content):
|
|
ta_tower_number = ta_entry[0]
|
|
if ta_tower_number in s_tower_height_dic:
|
|
if (
|
|
abs(
|
|
float(new_ta_content[index][9])
|
|
- float(s_tower_height_dic[ta_tower_number])
|
|
)
|
|
> 1e-5 # 新旧呼高不一致
|
|
):
|
|
print(
|
|
"Tower {tower_number} height({old_height} -> {new_height}) in S has been synchronized to TA.\n".format(
|
|
tower_number=ta_tower_number,
|
|
old_height=ta_content[index][9],
|
|
new_height=s_tower_height_dic[ta_tower_number],
|
|
)
|
|
)
|
|
new_tower_height = s_tower_height_dic[ta_tower_number]
|
|
new_ta_content[index][9] = new_tower_height # 将S文件中的塔高赋值到TA文件中
|
|
# 修改TA中WNSZ_51这样的塔名
|
|
old_tower_name_height = ta_entry[8]
|
|
tower_name = old_tower_name_height.split("_")[0]
|
|
new_tower_name_height = "{tower_name}_{new_height}".format(
|
|
tower_name=tower_name, new_height=new_tower_height
|
|
)
|
|
new_ta_content[index][8] = new_tower_name_height
|
|
# 备份TA文件
|
|
ta_file_dir = get_directory(self._file_path)
|
|
ta_file_name = get_file_name_with_extention(self._file_path)[0]
|
|
backup_time = file_backup_time()
|
|
backup_file_path = "{ta_file_dir}/{ta_file_name}{backup_time}.TA".format(
|
|
ta_file_dir=ta_file_dir, ta_file_name=ta_file_name, backup_time=backup_time
|
|
)
|
|
shutil.copy(self._file_path, backup_file_path)
|
|
with open(self._file_path, "w") as ta_file:
|
|
for ta_c in new_ta_content:
|
|
ta_file.write("{ta_c}\n".format(ta_c=",".join(ta_c)))
|
|
print("Synchronization form S to TA is finished.")
|
|
pass
|
|
|
|
def start(self):
|
|
file_path = self._file_path
|
|
while True: # 检测文件是否有变化的循环
|
|
new_contents = self._read(file_path) # 每一次循环都会重新读一遍TA文件
|
|
if self._old_tower_height_record is None:
|
|
self._old_tower_height_record = self._make_tower_height_record(
|
|
new_contents
|
|
)
|
|
if self.sync_changed_tower_height_to_S(
|
|
new_contents, self._old_tower_height_record
|
|
):
|
|
self._old_tower_height_record = self._make_tower_height_record(
|
|
new_contents
|
|
)
|
|
while True: # 接受输入的循环
|
|
key = input("press Y to synchronize TA to S\n")
|
|
if key == "y" or key == "Y":
|
|
break
|
|
print("key invalid.")
|
|
|
|
|
|
class CordinationObject:
|
|
def __init__(self, cord_file_path):
|
|
content = []
|
|
cord_dic = {}
|
|
with open(cord_file_path) as cord_file:
|
|
for line in cord_file:
|
|
if line.strip() == "":
|
|
continue
|
|
sep = line.split(",")
|
|
J_or_Z = sep[0][0]
|
|
pile = re.findall(r"[JZ]0*([0-9]+)", sep[0])[0] # 桩
|
|
JZ_pile = J_or_Z + pile
|
|
# pile = sep[0][1:len(sep[0])]
|
|
altitude = sep[1]
|
|
mileage = sep[2]
|
|
content.append((JZ_pile, altitude, mileage))
|
|
index = len(content) - 1
|
|
cord_dic[pile] = index # 字典是不带J或者Z的
|
|
self._content = content
|
|
self._cord_dic = cord_dic
|
|
|
|
def get_altitude(self, pile):
|
|
cord_dic = self._cord_dic
|
|
if pile in cord_dic:
|
|
index = cord_dic[pile]
|
|
return self._content[index][1]
|
|
return None
|
|
|
|
def content(self):
|
|
return self._content
|
|
|
|
|
|
class DFileObject:
|
|
def __init__(self, D_file_path):
|
|
_content = []
|
|
with open(D_file_path) as D_file:
|
|
for _line in D_file:
|
|
line = re.sub(r"\s+", ",", _line)
|
|
_content.append(line.split(","))
|
|
self._content = _content
|
|
|
|
def content(self):
|
|
return self._content
|
|
|
|
|
|
class DFileAsSingle:
|
|
def __init__(self, d_files):
|
|
self._d_file_object = []
|
|
for d in d_files:
|
|
self._d_file_object.append(DFileObject(d))
|
|
|
|
def content(self):
|
|
for d_object in self._d_file_object:
|
|
for c in d_object.content():
|
|
yield c
|
|
|
|
def float_content(self):
|
|
for c in self.content():
|
|
yield [float(num) for num in c if num != ""]
|
|
|
|
# 通过里程获取高程
|
|
def get_altitude(self, mileage):
|
|
lower_mileage_altitude = None
|
|
upper_mileage_altitude = None
|
|
d_mileage = 0
|
|
base_mileage = 0
|
|
for d_content in self.float_content():
|
|
if d_content[1] == 0:
|
|
base_mileage = d_mileage
|
|
d_mileage = base_mileage + d_content[1]
|
|
if d_mileage < mileage:
|
|
lower_mileage_altitude = (d_mileage, d_content[2])
|
|
if d_mileage >= mileage:
|
|
upper_mileage_altitude = (d_mileage, d_content[2])
|
|
break
|
|
if d_mileage == mileage:
|
|
return upper_mileage_altitude[1]
|
|
# 如果不是正好有高程,就线性插值。
|
|
if d_mileage > mileage:
|
|
return (
|
|
(mileage - lower_mileage_altitude[0])
|
|
/ (upper_mileage_altitude[0] - lower_mileage_altitude[0])
|
|
* (upper_mileage_altitude[1] - lower_mileage_altitude[1])
|
|
+ lower_mileage_altitude[1]
|
|
)
|
|
# 没有高程就返回None
|
|
return None
|