ImgGPS2KML/自动归类/core.py

119 lines
4.2 KiB
Python

import os
import glob
from typing import List
from exif import Image
from datetime import datetime
import os
from PySide6.QtCore import QThread, Signal # 导入 QThread 和 Signal 类
# import shutil
def get_jpg_files(file_path: str):
"""获取当前目录下所有.jpg文件的完整路径"""
current_dir = os.path.abspath(file_path)
return glob.glob(os.path.join(current_dir, "*.jpg"))
def get_datetime_from_image_exif(image_path: str):
with open(image_path, "rb") as image_f:
my_image = Image(image_f)
if hasattr(my_image, "datetime_original"):
return my_image.datetime_original
else:
print(my_image.list_all())
raise Exception(f"{image_path}该图片没有时间信息")
def convert_to_unix_timestamp(date_time_str):
# 将字符串解析为 datetime 对象
date_time_obj = datetime.strptime(date_time_str, "%Y:%m:%d %H:%M:%S")
# 将 datetime 对象转换为 Unix 时间戳
unix_timestamp = date_time_obj.timestamp()
return unix_timestamp
# 把时间戳相差在2分钟以内的分为一组
def group_datetime(image_with_datetime: list):
"""把时间戳相差在2分钟以内的分为一组"""
image_with_datetime.sort(key=lambda x: x[1])
grouped_images = []
current_group = []
for i in range(len(image_with_datetime)):
if i == 0:
current_group.append(image_with_datetime[i])
else:
# 计算当前图像与前一个图像的时间差
time_diff = image_with_datetime[i][1] - image_with_datetime[i - 1][1]
if time_diff <= 120: # 2分钟 = 120秒
current_group.append(image_with_datetime[i])
else:
grouped_images.append(current_group)
current_group = [image_with_datetime[i]]
# 添加最后一组
grouped_images.append(current_group)
return grouped_images
def get_file_name_before_bracket(file_path):
# 获取文件名
file_name = os.path.basename(file_path)
# 查找括号的位置
bracket_index = file_name.find("(")
# 如果找到了括号,返回括号前的部分
if bracket_index != -1:
return file_name[:bracket_index]
else:
# 如果没有找到括号,返回整个文件名
return file_name
class Worker(QThread):
progress = Signal(int) # 定义一个信号,用于传递进度
destination_directory = Signal(str)
def __init__(self, images_path: List[str], start_index: int = 0):
super().__init__()
self.images_path = images_path
self.start_index = start_index
def run(self):
if len(self.images_path) == 0:
return
# 以下是接需要设置的参数
# image_path = r"D:/现场查看/"
# 提取目录路径
new_dir_location = os.path.join(os.path.dirname(self.images_path[0]), "整理后")
self.destination_directory.emit(os.path.normpath(new_dir_location))
# start_index = 156 # 目录编号
# 以下是实际代码
# all_images = get_jpg_files(image_path)
all_images = self.images_path
image_with_datetime = []
for image_path in all_images:
image_datetime = get_datetime_from_image_exif(image_path)
image_with_datetime.append(
(image_path, convert_to_unix_timestamp(image_datetime))
) # 第一个是图片路径,第二个是时间戳
grouped_image = group_datetime(image_with_datetime)
# total_files = sum(len(group) for group in grouped_image)
processed_files = 0
for group in grouped_image:
dir_name = get_file_name_before_bracket(group[0][0])
new_dir_name = f"{new_dir_location}/{self.start_index}.{dir_name}"
os.makedirs(new_dir_name, exist_ok=True)
for image in group:
processed_files += 1
self.progress.emit(processed_files) # 发射进度信号
new_file_path_name = new_dir_name.replace("/", "//")
os.system(
f'copy "{os.path.normpath(image[0])}" "{os.path.normpath(new_file_path_name)}" /Y'
)
print(f"copy {image[0]} {new_file_path_name}")
self.start_index += 1