119 lines
4.2 KiB
Python
119 lines
4.2 KiB
Python
import os
|
|
import glob
|
|
from typing import List
|
|
from exif import Image
|
|
from datetime import datetime
|
|
import os
|
|
from PySide6.QtCore import QThread, Signal # 导入 QThread 和 Signal 类
|
|
|
|
|
|
# import shutil
|
|
def get_jpg_files(file_path: str):
|
|
"""获取当前目录下所有.jpg文件的完整路径"""
|
|
current_dir = os.path.abspath(file_path)
|
|
return glob.glob(os.path.join(current_dir, "*.jpg"))
|
|
|
|
|
|
def get_datetime_from_image_exif(image_path: str):
|
|
with open(image_path, "rb") as image_f:
|
|
my_image = Image(image_f)
|
|
if hasattr(my_image, "datetime_original"):
|
|
return my_image.datetime_original
|
|
else:
|
|
print(my_image.list_all())
|
|
raise Exception(f"{image_path}该图片没有时间信息")
|
|
|
|
|
|
def convert_to_unix_timestamp(date_time_str):
|
|
# 将字符串解析为 datetime 对象
|
|
date_time_obj = datetime.strptime(date_time_str, "%Y:%m:%d %H:%M:%S")
|
|
|
|
# 将 datetime 对象转换为 Unix 时间戳
|
|
unix_timestamp = date_time_obj.timestamp()
|
|
|
|
return unix_timestamp
|
|
|
|
|
|
# 把时间戳相差在2分钟以内的分为一组
|
|
def group_datetime(image_with_datetime: list):
|
|
"""把时间戳相差在2分钟以内的分为一组"""
|
|
image_with_datetime.sort(key=lambda x: x[1])
|
|
grouped_images = []
|
|
current_group = []
|
|
|
|
for i in range(len(image_with_datetime)):
|
|
if i == 0:
|
|
current_group.append(image_with_datetime[i])
|
|
else:
|
|
# 计算当前图像与前一个图像的时间差
|
|
time_diff = image_with_datetime[i][1] - image_with_datetime[i - 1][1]
|
|
if time_diff <= 120: # 2分钟 = 120秒
|
|
current_group.append(image_with_datetime[i])
|
|
else:
|
|
grouped_images.append(current_group)
|
|
current_group = [image_with_datetime[i]]
|
|
|
|
# 添加最后一组
|
|
grouped_images.append(current_group)
|
|
return grouped_images
|
|
|
|
|
|
def get_file_name_before_bracket(file_path):
|
|
# 获取文件名
|
|
file_name = os.path.basename(file_path)
|
|
|
|
# 查找括号的位置
|
|
bracket_index = file_name.find("(")
|
|
|
|
# 如果找到了括号,返回括号前的部分
|
|
if bracket_index != -1:
|
|
return file_name[:bracket_index]
|
|
else:
|
|
# 如果没有找到括号,返回整个文件名
|
|
return file_name
|
|
|
|
|
|
class Worker(QThread):
|
|
progress = Signal(int) # 定义一个信号,用于传递进度
|
|
destination_directory = Signal(str)
|
|
|
|
def __init__(self, images_path: List[str], start_index: int = 0):
|
|
super().__init__()
|
|
self.images_path = images_path
|
|
self.start_index = start_index
|
|
|
|
def run(self):
|
|
if len(self.images_path) == 0:
|
|
return
|
|
# 以下是接需要设置的参数
|
|
# image_path = r"D:/现场查看/"
|
|
# 提取目录路径
|
|
new_dir_location = os.path.join(os.path.dirname(self.images_path[0]), "整理后")
|
|
self.destination_directory.emit(os.path.normpath(new_dir_location))
|
|
# start_index = 156 # 目录编号
|
|
# 以下是实际代码
|
|
# all_images = get_jpg_files(image_path)
|
|
all_images = self.images_path
|
|
image_with_datetime = []
|
|
for image_path in all_images:
|
|
image_datetime = get_datetime_from_image_exif(image_path)
|
|
image_with_datetime.append(
|
|
(image_path, convert_to_unix_timestamp(image_datetime))
|
|
) # 第一个是图片路径,第二个是时间戳
|
|
grouped_image = group_datetime(image_with_datetime)
|
|
# total_files = sum(len(group) for group in grouped_image)
|
|
processed_files = 0
|
|
for group in grouped_image:
|
|
dir_name = get_file_name_before_bracket(group[0][0])
|
|
new_dir_name = f"{new_dir_location}/{self.start_index}.{dir_name}"
|
|
os.makedirs(new_dir_name, exist_ok=True)
|
|
for image in group:
|
|
processed_files += 1
|
|
self.progress.emit(processed_files) # 发射进度信号
|
|
new_file_path_name = new_dir_name.replace("/", "//")
|
|
os.system(
|
|
f'copy "{os.path.normpath(image[0])}" "{os.path.normpath(new_file_path_name)}" /Y'
|
|
)
|
|
print(f"copy {image[0]} {new_file_path_name}")
|
|
self.start_index += 1
|