import re import datetime import shutil import os.path import re def file_backup_time(): now = datetime.datetime.now() return now.strftime("%Y%m%d%H%M%S") def get_directory(file_path): return os.path.dirname(file_path) def get_file_name_with_extention(file_path): dir_part = get_directory(file_path) file_name = file_path.replace(dir_part, "").replace("\\", "").replace(r"/", "") (name, ext) = file_name.split(".") return name, ext class SFileObject: def __init__(self, file_path): self._content = [] self.file_path = file_path self._read(file_path) pass def _read(self, file_path): with open(file_path) as f: for line in f: trimeed_line = line.strip() if trimeed_line == "": continue norm_trimed_line = re.sub(r"(\s+)", ",", trimeed_line) sep = norm_trimed_line.split(",") self._content.append(sep) def _save_backup(self, back_file_path): shutil.copy(self.file_path, back_file_path) def write(self, target_file_path): (file_name, ext) = get_file_name_with_extention(self.file_path) src_file_dir = get_directory(self.file_path) self._save_backup( "{dir}/{file_name}-{time}.{ext}".format( dir=src_file_dir, file_name=file_name, time=file_backup_time(), ext=ext ) ) with open(target_file_path, "wt") as file: for content in self._content: _content = [str(x) for x in content] file.write("{content}\n".format(content=" ".join(_content))) def has(self, tower_number): # 塔位号是否存在S文件中。可能存在多行的情况。 indexes = [] for index, content in enumerate(self._content): if content[0] == tower_number: indexes.append(index) return indexes def content(self): return self._content def update_height(self, index, new_height): # 更新内存中S文件中的塔高 content = self._content content[index][7] = new_height # 更新档距 def update_all_mileage(self, new_mileage_dic): content = list(self._content) first_tower_number_in_s = self._content[2][0] tower_mileage_of_first_tower_in_ta = new_mileage_dic[first_tower_number_in_s] for index, s_entry in enumerate(self._content): if s_entry[0] == "首端转角号" or s_entry[0] == "塔号": continue tower_number = s_entry[0] if tower_number in new_mileage_dic: new_mileage = ( new_mileage_dic[tower_number] - tower_mileage_of_first_tower_in_ta ) content[index][1] = new_mileage self._content = content # 把所有S文件当做一个来操作 class SFileAsSingle: def __init__(self, s_files): self._s_files = s_files # 考虑不同位置处出现塔号的情况 # 打开一次S文件 def has(self, tower_number) -> [(SFileObject, int)]: s_file = self._s_files ret = [] for s in s_file: d_file_obj = SFileObject(s) indexes = d_file_obj.has(tower_number) for index in indexes: ret.append((d_file_obj, index)) return ret # 写入一次S文件 def update_height(self, result: [(SFileObject, int)], new_height): s_changed_record = {} for s_file_obj, index in result: s_file_obj.update_height(index, new_height) s_changed_record[s_file_obj] = 1 # 输出变化的文件名 for s_file_obj in s_changed_record: s_output_file_path = s_file_obj.file_path s_file_obj.write(s_output_file_path) print("update S file {Ss}".format(Ss=s_output_file_path)) # 写入一次S文件 def update_all_mileage(self, new_mileage_dic): s_file = self._s_files for s in s_file: s_object = SFileObject(s) s_object.update_all_mileage(new_mileage_dic) s_output_file_path = s_object.file_path s_object.write(s_output_file_path) print("update mileage in S file {Ss}".format(Ss=s_output_file_path)) def content(self): s_file = self._s_files for s in s_file: d_file_obj = SFileObject(s) s_content = d_file_obj.content() for c in s_content: yield c class TaFileObject: def __init__(self, file_path, SFile): # self._content = [] self._file_path = file_path self._read(file_path) self._old_tower_height_record = None self._SFile = SFile self._old_span_record = None # 档距 pass def _read(self, file_path): contents = [] with open(file_path) as f: for line in f: trimeed_line = line.strip() if trimeed_line == "": continue norm_trimed_line = re.sub(r"(\s+)", "", trimeed_line) sep = norm_trimed_line.split(",") contents.append(sep) return contents @staticmethod def _make_tower_height_record(contents): # 将TA文件读入内存字典中 tower_height_record = {} for content in contents: sep = content if len(sep) > 9: tower_number = sep[0] tower_height = sep[9] tower_height_record[tower_number] = tower_height return tower_height_record # def _make_span_record(self, ta_content): # 将TA 文件档距读入字典中。 # tower_span_record = {} # basic_mileage = 0 # for ta_entry in ta_content: # if len(ta_entry) > 2: # tower_number = ta_entry[0] # tower_mileage = ta_entry[2] # tower_span_record[tower_number] = tower_mileage - basic_mileage # basic_mileage = tower_mileage # return tower_span_record def sync_changed_tower_height_to_S(self, ta_new_contents, old_tower_height_record): updated = False for ta_entry in ta_new_contents: sep = ta_entry if len(sep) > 9: tower_number = sep[0] old_tower_height = float(old_tower_height_record[tower_number]) new_tower_height = float(sep[9]) if abs(old_tower_height - new_tower_height) > 1e-5: print( "{tower_number} height changes to {new_tower_height}".format( tower_number=tower_number, new_tower_height=new_tower_height ) ) SFile = self._SFile s_as_single = SFileAsSingle(SFile) result = s_as_single.has(tower_number) if len(result) > 0: s_as_single.update_height(result, new_tower_height) updated = True return updated def sync_all_tower_mileage_to_s(self): ta_new_contents = self._read(self._file_path) new_mileage_dic = {} for ta_entry in ta_new_contents: if len(ta_entry)>2: tower_number = ta_entry[0] new_tower_mileage = float(ta_entry[2]) new_mileage_dic[tower_number] = new_tower_mileage SFile = self._SFile s_as_single = SFileAsSingle(SFile) s_as_single.update_all_mileage(new_mileage_dic) # # 同步Ta文件中修改过的档距。 # @staticmethod # def sync_changed_tower_span_to_s(ta_new_contents, old_tower_span_record): # for ta_entry in ta_new_contents: # tower_number = ta_entry[0] # old_tower_span = float(old_tower_span_record[tower_number]) # new_tower_span = float(ta_entry[2]) # if abs(old_tower_span - new_tower_span) > 1e-5: # print( # "{tower_number} span changes to {new_tower_span}".format( # tower_number=tower_number, new_tower_span=new_tower_span # ) # ) def sync_all_tower_height_from_TA_to_S(self): enter_string = input("Will synchronize all height from TA to S. Enter AH2S:\n") if enter_string.lower() != "ah2s": return ta_content = self._read(self._file_path) for entry in ta_content: sep = entry if len(sep) > 9: tower_number = sep[0] new_tower_height = float(sep[9]) s_files = self._SFile s_as_single = SFileAsSingle(s_files) result = s_as_single.has(tower_number) if len(result) > 0: s_as_single.update_height(result, new_tower_height) # 将S文件中的塔高同步到TA文件中。 def sync_all_tower_height_from_S(self): enter_string = input( "Will synchronize height in S to Ta. Enter S2T to proceed.\n" ) if enter_string.lower() != "s2t": print("Not confirmed.") return # 先取得所有S的呼高 s_tower_height_dic = {} s_as_single = SFileAsSingle(self._SFile) for s_entry in s_as_single.content(): if s_entry[0] == "首端转角号" or s_entry[0] == "塔号": continue tower_number = s_entry[0] tower_height = s_entry[7] if tower_number in s_tower_height_dic: continue # 只取第一次出现的。避免耐张段末端和首端不一致的问题。 s_tower_height_dic[tower_number] = tower_height ta_content = self._read(self._file_path) new_ta_content = list(ta_content) for index, ta_entry in enumerate(ta_content): ta_tower_number = ta_entry[0] if ta_tower_number in s_tower_height_dic: if ( abs( float(new_ta_content[index][9]) - float(s_tower_height_dic[ta_tower_number]) ) > 1e-5 # 新旧呼高不一致 ): print( "Tower {tower_number} height({old_height} -> {new_height}) in S has been synchronized to TA.\n".format( tower_number=ta_tower_number, old_height=ta_content[index][9], new_height=s_tower_height_dic[ta_tower_number], ) ) new_tower_height = s_tower_height_dic[ta_tower_number] new_ta_content[index][9] = new_tower_height # 将S文件中的塔高赋值到TA文件中 # 修改TA中WNSZ_51这样的塔名 old_tower_name_height = ta_entry[8] tower_name = old_tower_name_height.split("_")[0] new_tower_name_height = "{tower_name}_{new_height}".format( tower_name=tower_name, new_height=new_tower_height ) new_ta_content[index][8] = new_tower_name_height # 备份TA文件 ta_file_dir = get_directory(self._file_path) ta_file_name = get_file_name_with_extention(self._file_path)[0] backup_time = file_backup_time() backup_file_path = "{ta_file_dir}/{ta_file_name}{backup_time}.TA".format( ta_file_dir=ta_file_dir, ta_file_name=ta_file_name, backup_time=backup_time ) shutil.copy(self._file_path, backup_file_path) with open(self._file_path, "w") as ta_file: for ta_c in new_ta_content: ta_file.write("{ta_c}\n".format(ta_c=",".join(ta_c))) print("Synchronization form S to TA is finished.") pass def start(self): file_path = self._file_path while True: # 检测文件是否有变化的循环 new_contents = self._read(file_path) # 每一次循环都会重新读一遍TA文件 if self._old_tower_height_record is None: self._old_tower_height_record = self._make_tower_height_record( new_contents ) if self.sync_changed_tower_height_to_S( new_contents, self._old_tower_height_record ): self._old_tower_height_record = self._make_tower_height_record( new_contents ) while True: # 接受输入的循环 key = input("press Y to synchronize TA to S\n") if key == "y" or key == "Y": break print("key invalid.") class CordinationObject: def __init__(self, cord_file_path): content = [] cord_dic = {} with open(cord_file_path) as cord_file: for line in cord_file: if line.strip() == "": continue sep = line.split(",") J_or_Z = sep[0][0] pile = re.findall(r"[JZ]0*([0-9]+)", sep[0])[0] # 桩 JZ_pile = J_or_Z + pile # pile = sep[0][1:len(sep[0])] altitude = sep[1] mileage = sep[2] content.append((JZ_pile, altitude, mileage)) index = len(content) - 1 cord_dic[pile] = index # 字典是不带J或者Z的 self._content = content self._cord_dic = cord_dic def get_altitude(self, pile): cord_dic = self._cord_dic if pile in cord_dic: index = cord_dic[pile] return self._content[index][1] return None def content(self): return self._content class DFileObject: def __init__(self, D_file_path): _content = [] with open(D_file_path) as D_file: for _line in D_file: line = re.sub(r"\s+", ",", _line) _content.append(line.split(",")) self._content = _content def content(self): return self._content class DFileAsSingle: def __init__(self, d_files): self._d_file_object = [] for d in d_files: self._d_file_object.append(DFileObject(d)) def content(self): for d_object in self._d_file_object: for c in d_object.content(): yield c def float_content(self): for c in self.content(): yield [float(num) for num in c if num != ""] # 通过里程获取高程 def get_altitude(self, mileage): lower_mileage_altitude = None upper_mileage_altitude = None d_mileage = 0 base_mileage = 0 for d_content in self.float_content(): if d_content[1] == 0: base_mileage = d_mileage d_mileage = base_mileage + d_content[1] if d_mileage < mileage: lower_mileage_altitude = (d_mileage, d_content[2]) if d_mileage >= mileage: upper_mileage_altitude = (d_mileage, d_content[2]) break if d_mileage == mileage: return upper_mileage_altitude[1] # 如果不是正好有高程,就线性插值。 if d_mileage > mileage: return ( (mileage - lower_mileage_altitude[0]) / (upper_mileage_altitude[0] - lower_mileage_altitude[0]) * (upper_mileage_altitude[1] - lower_mileage_altitude[1]) + lower_mileage_altitude[1] ) # 没有高程就返回None return None