import re import datetime import shutil import os.path import re def file_backup_time(): now = datetime.datetime.now() return now.strftime("%Y%I%d%H%M%S") def get_directory(file_path): return os.path.dirname(file_path) def get_file_name_with_extention(file_path): dir_part = get_directory(file_path) file_name = file_path.replace(dir_part, "").replace("\\", "").replace(r"/", "") (name, ext) = file_name.split(".") return (name, ext) class SFileObject: def __init__(self, file_path): self._content = [] self._file_path = file_path self._read(file_path) pass def _read(self, file_path): with open(file_path) as f: for line in f: trimeed_line = line.strip() if trimeed_line == "": continue norm_trimed_line = re.sub(r"(\s+)", ",", trimeed_line) sep = norm_trimed_line.split(",") self._content.append(sep) def _save_backup(self, back_file_path): shutil.copy(self._file_path, back_file_path) def write(self, target_file_path): (file_name, ext) = get_file_name_with_extention(self._file_path) src_file_dir = get_directory(self._file_path) self._save_backup( "{dir}/{file_name}-{time}.{ext}".format( dir=src_file_dir, file_name=file_name, time=file_backup_time(), ext=ext ) ) with open(target_file_path, "wt") as file: for content in self._content: file.write("{content}\n".format(content=" ".join(content))) def has(self, tower_number): # 塔位号是否存在S文件中。 for index, content in enumerate(self._content): if content[0] == tower_number: return index return 0 def content(self): return self._content def update_height(self, index, new_height): # 更新内存中S文件中的塔高 content = self._content content[index][7] = new_height class TaFileObject: def __init__(self, file_path, SFile): # self._content = [] self._file_path = file_path self._read(file_path) self._old_tower_height_record = None self._SFile = SFile pass def _read(self, file_path): contents = [] with open(file_path) as f: for line in f: trimeed_line = line.strip() if trimeed_line == "": continue norm_trimed_line = re.sub(r"(\s+)", "", trimeed_line) sep = norm_trimed_line.split(",") contents.append(sep) return contents def _make_tower_height_record(self, contents): # 将TA文件读入内存字典中 tower_height_record = {} for content in contents: sep = content if len(sep) > 9: tower_number = sep[0] tower_height = sep[9] tower_height_record[tower_number] = tower_height return tower_height_record def sync_tower_height(self, new_contents, old_tower_height_record): updated = False for content in new_contents: sep = content if len(sep) > 9: tower_number = sep[0] old_tower_height = float(old_tower_height_record[tower_number]) new_tower_height = float(sep[9]) if abs(old_tower_height - new_tower_height) > 1e-5: print( "{tower_number} height changes to {new_tower_height}".format( tower_number=tower_number, new_tower_height=new_tower_height ) ) SFile = self._SFile norm_tower_number = tower_number.replace("G", "") for Ss in SFile: d_file_obj = SFileObject(Ss) index = d_file_obj.has(norm_tower_number) if index > 0: d_file_obj.update_height(index, str(new_tower_height)) d_file_obj.write(Ss) print("update S file {Ss}".format(Ss=Ss)) break updated = True return updated def start(self): file_path = self._file_path while True: new_contents = self._read(file_path) # 每一次循环都会重新读一遍TA文件 if self._old_tower_height_record is None: self._old_tower_height_record = self._make_tower_height_record( new_contents ) if self.sync_tower_height(new_contents, self._old_tower_height_record): self._old_tower_height_record = self._make_tower_height_record( new_contents ) input("press enter to continue\n") class CordinationObject: def __init__(self, cord_file_path): content = [] cord_dic = {} with open(cord_file_path) as cord_file: for line in cord_file: if line.strip() == "": continue sep = line.split(",") J_or_Z=sep[0][0] pile=re.findall(r'[JZ]0*([0-9]+)',sep[0])[0] JZ_pile=J_or_Z+pile # pile = sep[0][1:len(sep[0])] altitude = sep[1] mileage = sep[2] content.append((JZ_pile, altitude, mileage)) index = len(content) - 1 cord_dic[pile] = index#字典是不带J或者Z的 self._content = content self._cord_dic = cord_dic def get_altitude(self,pile): cord_dic=self._cord_dic if pile in cord_dic: index=cord_dic[pile] return self._content[index][1] return None def content(self): return self._content