import re import datetime import shutil import os.path import re def file_backup_time(): now = datetime.datetime.now() return now.strftime("%Y%m%d%H%M%S") def get_directory(file_path): return os.path.dirname(file_path) def get_file_name_with_extention(file_path): dir_part = get_directory(file_path) file_name = file_path.replace(dir_part, "").replace("\\", "").replace(r"/", "") (name, ext) = file_name.split(".") return (name, ext) class SFileObject: def __init__(self, file_path): self._content = [] self.file_path = file_path self._read(file_path) pass def _read(self, file_path): with open(file_path) as f: for line in f: trimeed_line = line.strip() if trimeed_line == "": continue norm_trimed_line = re.sub(r"(\s+)", ",", trimeed_line) sep = norm_trimed_line.split(",") self._content.append(sep) def _save_backup(self, back_file_path): shutil.copy(self.file_path, back_file_path) def write(self, target_file_path): (file_name, ext) = get_file_name_with_extention(self.file_path) src_file_dir = get_directory(self.file_path) self._save_backup( "{dir}/{file_name}-{time}.{ext}".format( dir=src_file_dir, file_name=file_name, time=file_backup_time(), ext=ext ) ) with open(target_file_path, "wt") as file: for content in self._content: _content=[str(x) for x in content] file.write("{content}\n".format(content=" ".join(_content))) def has(self, tower_number): # 塔位号是否存在S文件中。 for index, content in enumerate(self._content): if content[0] == tower_number: return index return 0 def content(self): return self._content def update_height(self, index, new_height): # 更新内存中S文件中的塔高 content = self._content content[index][7] = new_height # 把所有S文件当做一个来操作 class SFileAsSingle: def __init__(self, s_files): self._s_files = s_files # 考虑不同位置处出现塔号的情况 def has(self, tower_number) -> [(SFileObject, int)]: s_file = self._s_files ret = [] for s in s_file: d_file_obj = SFileObject(s) index = d_file_obj.has(tower_number) if index > 0: ret.append((d_file_obj, index)) return ret def update_height(self, result: [(SFileObject, int)], new_height): s_changed_record = {} for s_file_obj, index in result: s_file_obj.update_height(index, new_height) s_changed_record[s_file_obj] = 1 for s_file_obj in s_changed_record: s_output_file_path = s_file_obj.file_path s_file_obj.write(s_output_file_path) print("update S file {Ss}".format(Ss=s_output_file_path)) class TaFileObject: def __init__(self, file_path, SFile): # self._content = [] self._file_path = file_path self._read(file_path) self._old_tower_height_record = None self._SFile = SFile pass def _read(self, file_path): contents = [] with open(file_path) as f: for line in f: trimeed_line = line.strip() if trimeed_line == "": continue norm_trimed_line = re.sub(r"(\s+)", "", trimeed_line) sep = norm_trimed_line.split(",") contents.append(sep) return contents def _make_tower_height_record(self, contents): # 将TA文件读入内存字典中 tower_height_record = {} for content in contents: sep = content if len(sep) > 9: tower_number = sep[0] tower_height = sep[9] tower_height_record[tower_number] = tower_height return tower_height_record def sync_tower_height_to_S(self, new_contents, old_tower_height_record): updated = False for content in new_contents: sep = content if len(sep) > 9: tower_number = sep[0] old_tower_height = float(old_tower_height_record[tower_number]) new_tower_height = float(sep[9]) if abs(old_tower_height - new_tower_height) > 1e-5: print( "{tower_number} height changes to {new_tower_height}".format( tower_number=tower_number, new_tower_height=new_tower_height ) ) SFile = self._SFile s_as_single = SFileAsSingle(SFile) result = s_as_single.has(tower_number) if len(result) > 0: s_as_single.update_height(result, new_tower_height) updated = True return updated # 将S文件中的塔高同步到TA文件中。 def sync_tower_height_from_S(self): pass def start(self): file_path = self._file_path while True: # 检测文件是否有变化的循环 new_contents = self._read(file_path) # 每一次循环都会重新读一遍TA文件 if self._old_tower_height_record is None: self._old_tower_height_record = self._make_tower_height_record( new_contents ) if self.sync_tower_height_to_S(new_contents, self._old_tower_height_record): self._old_tower_height_record = self._make_tower_height_record( new_contents ) while True: # 接受输入的循环 key = input("press Y to synchronize TA to S\n") if key == "y" or key == "Y": break print("key invalid.") class CordinationObject: def __init__(self, cord_file_path): content = [] cord_dic = {} with open(cord_file_path) as cord_file: for line in cord_file: if line.strip() == "": continue sep = line.split(",") J_or_Z = sep[0][0] pile = re.findall(r"[JZ]0*([0-9]+)", sep[0])[0] JZ_pile = J_or_Z + pile # pile = sep[0][1:len(sep[0])] altitude = sep[1] mileage = sep[2] content.append((JZ_pile, altitude, mileage)) index = len(content) - 1 cord_dic[pile] = index # 字典是不带J或者Z的 self._content = content self._cord_dic = cord_dic def get_altitude(self, pile): cord_dic = self._cord_dic if pile in cord_dic: index = cord_dic[pile] return self._content[index][1] return None def content(self): return self._content