import re import datetime import shutil import os.path import re def file_backup_time(): now = datetime.datetime.now() return now.strftime("%Y%m%d%H%M%S") def get_directory(file_path): return os.path.dirname(file_path) def get_file_name_with_extention(file_path): dir_part = get_directory(file_path) file_name = file_path.replace(dir_part, "").replace("\\", "").replace(r"/", "") (name, ext) = file_name.split(".") return name, ext class SFileObject: def __init__(self, file_path): self._content = [] self.file_path = file_path self._read(file_path) pass def _read(self, file_path): with open(file_path) as f: for line in f: trimeed_line = line.strip() if trimeed_line == "": continue norm_trimed_line = re.sub(r"(\s+)", ",", trimeed_line) sep = norm_trimed_line.split(",") self._content.append(sep) def _save_backup(self, back_file_path): shutil.copy(self.file_path, back_file_path) def write(self, target_file_path): (file_name, ext) = get_file_name_with_extention(self.file_path) src_file_dir = get_directory(self.file_path) self._save_backup( "{dir}/{file_name}-{time}.{ext}".format( dir=src_file_dir, file_name=file_name, time=file_backup_time(), ext=ext ) ) with open(target_file_path, "wt") as file: for content in self._content: _content = [str(x) for x in content] file.write("{content}\n".format(content=" ".join(_content))) def has(self, tower_number): # 塔位号是否存在S文件中。可能存在多行的情况。 indexes = [] for index, content in enumerate(self._content): if content[0] == tower_number: indexes.append(index) return indexes def content(self): return self._content def update_height(self, index, new_height): # 更新内存中S文件中的塔高 content = self._content content[index][7] = new_height # 把所有S文件当做一个来操作 class SFileAsSingle: def __init__(self, s_files): self._s_files = s_files # 考虑不同位置处出现塔号的情况 # 打开一次S文件 def has(self, tower_number) -> [(SFileObject, int)]: s_file = self._s_files ret = [] for s in s_file: d_file_obj = SFileObject(s) indexes = d_file_obj.has(tower_number) for index in indexes: ret.append((d_file_obj, index)) return ret # 写入一次S文件 def update_height(self, result: [(SFileObject, int)], new_height): s_changed_record = {} for s_file_obj, index in result: s_file_obj.update_height(index, new_height) s_changed_record[s_file_obj] = 1 for s_file_obj in s_changed_record: s_output_file_path = s_file_obj.file_path s_file_obj.write(s_output_file_path) print("update S file {Ss}".format(Ss=s_output_file_path)) def content(self): s_file = self._s_files for s in s_file: d_file_obj = SFileObject(s) s_content = d_file_obj.content() for c in s_content: yield c class TaFileObject: def __init__(self, file_path, SFile): # self._content = [] self._file_path = file_path self._read(file_path) self._old_tower_height_record = None self._SFile = SFile pass def _read(self, file_path): contents = [] with open(file_path) as f: for line in f: trimeed_line = line.strip() if trimeed_line == "": continue norm_trimed_line = re.sub(r"(\s+)", "", trimeed_line) sep = norm_trimed_line.split(",") contents.append(sep) return contents def _make_tower_height_record(self, contents): # 将TA文件读入内存字典中 tower_height_record = {} for content in contents: sep = content if len(sep) > 9: tower_number = sep[0] tower_height = sep[9] tower_height_record[tower_number] = tower_height return tower_height_record def sync_changed_tower_height_to_S(self, new_contents, old_tower_height_record): updated = False for content in new_contents: sep = content if len(sep) > 9: tower_number = sep[0] old_tower_height = float(old_tower_height_record[tower_number]) new_tower_height = float(sep[9]) if abs(old_tower_height - new_tower_height) > 1e-5: print( "{tower_number} height changes to {new_tower_height}".format( tower_number=tower_number, new_tower_height=new_tower_height ) ) SFile = self._SFile s_as_single = SFileAsSingle(SFile) result = s_as_single.has(tower_number) if len(result) > 0: s_as_single.update_height(result, new_tower_height) updated = True return updated def sync_all_tower_height_from_TA_to_S(self): enter_string = input("Will synchronize all height from TA to S. Enter AH2S:\n") if enter_string.lower() != "ah2s": return ta_content = self._read(self._file_path) for entry in ta_content: sep = entry if len(sep) > 9: tower_number = sep[0] new_tower_height = float(sep[9]) s_files = self._SFile s_as_single = SFileAsSingle(s_files) result = s_as_single.has(tower_number) if len(result) > 0: s_as_single.update_height(result, new_tower_height) # 将S文件中的塔高同步到TA文件中。 def sync_tower_height_from_S(self): enter_string = input( "Will synchronize height in S to Ta. Enter S2T to proceed.\n" ) if enter_string.lower() != "s2t": print("Not confirmed.") return # 先取得所有S的呼高 s_tower_height_dic = {} s_as_single = SFileAsSingle(self._SFile) for s_entry in s_as_single.content(): if s_entry[0] == "首端转角号" or s_entry[0] == "塔号": continue tower_number = s_entry[0] tower_height = s_entry[7] s_tower_height_dic[tower_number] = tower_height ta_content = self._read(self._file_path) new_ta_content = list(ta_content) for index, ta_entry in enumerate(ta_content): ta_tower_number = ta_entry[0] if ta_tower_number in s_tower_height_dic: if ( abs( float(new_ta_content[index][9]) - float(s_tower_height_dic[ta_tower_number]) ) > 1e-5 # 新旧呼高不一致 ): print( "Tower {tower_number} height({old_height} -> {new_height}) in S has been synchronized to TA.\n".format( tower_number=ta_tower_number, old_height=ta_content[index][9], new_height=s_tower_height_dic[ta_tower_number], ) ) new_tower_height = s_tower_height_dic[ta_tower_number] new_ta_content[index][9] = new_tower_height # 将S文件中的塔高赋值到TA文件中 # 修改TA中WNSZ_51这样的塔名 old_tower_name_height = ta_entry[8] tower_name = old_tower_name_height.split("_")[0] new_tower_name_height = "{tower_name}_{new_height}".format( tower_name=tower_name, new_height=new_tower_height ) new_ta_content[index][8] = new_tower_name_height # 备份TA文件 ta_file_dir = get_directory(self._file_path) ta_file_name = get_file_name_with_extention(self._file_path)[0] backup_time = file_backup_time() backup_file_path = "{ta_file_dir}/{ta_file_name}{backup_time}.TA".format( ta_file_dir=ta_file_dir, ta_file_name=ta_file_name, backup_time=backup_time ) shutil.copy(self._file_path, backup_file_path) with open(self._file_path, "w") as ta_file: for ta_c in new_ta_content: ta_file.write("{ta_c}\n".format(ta_c=",".join(ta_c))) print("Synchronization form S to TA is finished.") pass def start(self): file_path = self._file_path while True: # 检测文件是否有变化的循环 new_contents = self._read(file_path) # 每一次循环都会重新读一遍TA文件 if self._old_tower_height_record is None: self._old_tower_height_record = self._make_tower_height_record( new_contents ) if self.sync_changed_tower_height_to_S( new_contents, self._old_tower_height_record ): self._old_tower_height_record = self._make_tower_height_record( new_contents ) while True: # 接受输入的循环 key = input("press Y to synchronize TA to S\n") if key == "y" or key == "Y": break print("key invalid.") class CordinationObject: def __init__(self, cord_file_path): content = [] cord_dic = {} with open(cord_file_path) as cord_file: for line in cord_file: if line.strip() == "": continue sep = line.split(",") J_or_Z = sep[0][0] pile = re.findall(r"[JZ]0*([0-9]+)", sep[0])[0] JZ_pile = J_or_Z + pile # pile = sep[0][1:len(sep[0])] altitude = sep[1] mileage = sep[2] content.append((JZ_pile, altitude, mileage)) index = len(content) - 1 cord_dic[pile] = index # 字典是不带J或者Z的 self._content = content self._cord_dic = cord_dic def get_altitude(self, pile): cord_dic = self._cord_dic if pile in cord_dic: index = cord_dic[pile] return self._content[index][1] return None def content(self): return self._content