430 lines
17 KiB
Python
430 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import shutil
|
||
import time
|
||
import email
|
||
from email.utils import parseaddr
|
||
from typing import List, Dict, Optional, Tuple
|
||
from PIL import Image
|
||
|
||
from config import Config
|
||
from logger import logger
|
||
from temp_manager import TempManager
|
||
from mail_handler import MailHandler
|
||
from manifest_parser import ManifestParser
|
||
from image_processor import ImageProcessor
|
||
from zip_handler import ZipHandler
|
||
from agreement import AgreementManager
|
||
|
||
|
||
class MailConverter:
|
||
def __init__(self):
|
||
self.temp_mgr = TempManager()
|
||
self.mail = MailHandler()
|
||
self.img_proc = ImageProcessor()
|
||
self.zip_proc = ZipHandler()
|
||
self.parser = ManifestParser()
|
||
|
||
# ==================== 规则匹配核心 ====================
|
||
def match_rules_for_file(
|
||
self,
|
||
filename: str,
|
||
file_ext: str,
|
||
tasks: List[Dict],
|
||
current_archive: Optional[str] = None
|
||
) -> List[Dict]:
|
||
base_name = os.path.splitext(filename)[0]
|
||
results = []
|
||
|
||
applicable = []
|
||
for task in tasks:
|
||
task_scope = task.get('scope')
|
||
task_inline = task.get('inline_archive')
|
||
effective_scope = task_inline if task_inline else task_scope
|
||
if current_archive:
|
||
if effective_scope is None or effective_scope == current_archive:
|
||
applicable.append(task)
|
||
else:
|
||
if effective_scope is None:
|
||
applicable.append(task)
|
||
|
||
logger.debug(f"文件 {filename} 在作用域 {current_archive} 下匹配到 {len(applicable)} 条规则")
|
||
|
||
# 1. 重命名
|
||
rename_tasks = [t for t in applicable if t.get('type') == 'rename' and t.get('src_name') == base_name]
|
||
if rename_tasks:
|
||
for task in rename_tasks:
|
||
dst_name = task['dst_name']
|
||
dst_ext = os.path.splitext(dst_name)[1][1:].lower()
|
||
size = task.get('size')
|
||
results.append({
|
||
'output_name': dst_name,
|
||
'format': dst_ext,
|
||
'size': size
|
||
})
|
||
return results
|
||
|
||
# 2. 精准文件名
|
||
name_tasks = [t for t in applicable if t.get('type') == 'by_name' and t.get('src_name') == base_name]
|
||
if name_tasks:
|
||
for task in name_tasks:
|
||
for fmt, size in task.get('targets', []):
|
||
output_name = f"{base_name}.{fmt}"
|
||
results.append({
|
||
'output_name': output_name,
|
||
'format': fmt,
|
||
'size': size
|
||
})
|
||
return results
|
||
|
||
# 3. 按格式批量
|
||
format_tasks = [t for t in applicable if t.get('type') == 'by_format' and t.get('src_format') == file_ext.lower()]
|
||
if format_tasks:
|
||
for task in format_tasks:
|
||
for fmt, size in task.get('targets', []):
|
||
output_name = f"{base_name}.{fmt}"
|
||
results.append({
|
||
'output_name': output_name,
|
||
'format': fmt,
|
||
'size': size
|
||
})
|
||
return results
|
||
|
||
# 4. 全局默认
|
||
global_tasks = [t for t in applicable if t.get('type') == 'global']
|
||
if global_tasks:
|
||
for task in global_tasks:
|
||
for fmt, size in task.get('targets', []):
|
||
output_name = f"{base_name}.{fmt}"
|
||
results.append({
|
||
'output_name': output_name,
|
||
'format': fmt,
|
||
'size': size
|
||
})
|
||
return results
|
||
|
||
# 无规则,原样保留
|
||
results.append({
|
||
'output_name': filename,
|
||
'format': file_ext,
|
||
'size': None
|
||
})
|
||
return results
|
||
|
||
# ==================== 单张图片处理 ====================
|
||
def process_single_image(
|
||
self,
|
||
input_path: str,
|
||
output_dir: str,
|
||
tasks: List[Dict],
|
||
current_archive: Optional[str] = None
|
||
) -> List[str]:
|
||
filename = os.path.basename(input_path)
|
||
file_ext = os.path.splitext(filename)[1][1:].lower()
|
||
output_files = []
|
||
|
||
output_tasks = self.match_rules_for_file(filename, file_ext, tasks, current_archive)
|
||
logger.info(f"图片 {filename} 将生成 {len(output_tasks)} 个输出任务")
|
||
|
||
for task in output_tasks:
|
||
out_name = task['output_name']
|
||
out_fmt = task['format']
|
||
out_size = task['size']
|
||
out_path = os.path.join(output_dir, out_name)
|
||
|
||
try:
|
||
if out_size:
|
||
with Image.open(input_path) as img:
|
||
if out_size[0] == 'ratio':
|
||
img = ImageProcessor.resize_by_ratio(img, out_size[1], out_size[2])
|
||
elif out_size[0] == 'pixel':
|
||
img = ImageProcessor.resize_by_pixel(img, out_size[1], out_size[2])
|
||
pillow_format = ImageProcessor.FORMAT_MAP.get(out_fmt.lower())
|
||
if not pillow_format:
|
||
logger.error(f"不支持的输出格式: {out_fmt}")
|
||
continue
|
||
if pillow_format == 'JPEG' and img.mode in ('RGBA', 'P'):
|
||
rgb_img = Image.new('RGB', img.size, (255, 255, 255))
|
||
rgb_img.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
|
||
img = rgb_img
|
||
save_kwargs = {}
|
||
if pillow_format in ('JPEG', 'WEBP'):
|
||
save_kwargs['quality'] = Config.DEFAULT_QUALITY
|
||
img.save(out_path, format=pillow_format, **save_kwargs)
|
||
logger.info(f"转换+缩放成功: {input_path} -> {out_path}")
|
||
else:
|
||
success = ImageProcessor.convert_image(input_path, out_path, out_fmt, Config.DEFAULT_QUALITY)
|
||
if not success:
|
||
logger.error(f"转换失败: {input_path} -> {out_path}")
|
||
continue
|
||
output_files.append(out_path)
|
||
except Exception as e:
|
||
logger.error(f"处理图片异常 {input_path}: {e}", exc_info=True)
|
||
continue
|
||
|
||
return output_files
|
||
|
||
# ==================== 压缩包处理 ====================
|
||
def process_archive(
|
||
self,
|
||
archive_path: str,
|
||
output_base_dir: str,
|
||
tasks: List[Dict]
|
||
) -> List[str]:
|
||
archive_name = os.path.basename(archive_path)
|
||
extract_dir = os.path.join(output_base_dir, f"ext_{archive_name}")
|
||
os.makedirs(extract_dir, exist_ok=True)
|
||
|
||
ext = os.path.splitext(archive_path)[1].lower()
|
||
try:
|
||
if ext == '.zip':
|
||
ZipHandler.extract_zip(archive_path, extract_dir)
|
||
elif ext == '.7z':
|
||
ZipHandler.extract_7z(archive_path, extract_dir)
|
||
else:
|
||
logger.warning(f"不支持的压缩包格式: {archive_path}")
|
||
return []
|
||
except Exception as e:
|
||
logger.error(f"解压失败 {archive_path}: {e}")
|
||
return []
|
||
|
||
all_output_files = []
|
||
for root, _, files in os.walk(extract_dir):
|
||
for file in files:
|
||
file_path = os.path.join(root, file)
|
||
file_ext_lower = os.path.splitext(file)[1][1:].lower()
|
||
if file_ext_lower not in Config.SUPPORTED_INPUT_FORMATS:
|
||
continue
|
||
output_files = self.process_single_image(
|
||
file_path,
|
||
output_base_dir,
|
||
tasks,
|
||
current_archive=archive_name
|
||
)
|
||
all_output_files.extend(output_files)
|
||
|
||
if not Config.KEEP_TEMP_FILES:
|
||
shutil.rmtree(extract_dir, ignore_errors=True)
|
||
return all_output_files
|
||
|
||
# ==================== 邮件处理主流程 ====================
|
||
def process_one_email(self, msg_id, raw_email, proto_conn, proto_type) -> bool:
|
||
msg = email.message_from_bytes(raw_email)
|
||
sender = msg.get('From')
|
||
sender_email = parseaddr(sender)[1]
|
||
subject = msg.get('Subject', '无主题')
|
||
logger.info(f"开始处理邮件: {subject} from {sender_email}")
|
||
|
||
if not self.mail.is_domain_allowed(sender_email):
|
||
logger.warning(f"域名不在白名单: {sender_email},删除原邮件")
|
||
self.mail.delete_email(msg_id, proto_conn, proto_type)
|
||
return False
|
||
|
||
mail_work_dir = os.path.join(Config.TEMP_DIR, f"mail_{msg_id}_{int(time.time())}")
|
||
os.makedirs(mail_work_dir, exist_ok=True)
|
||
|
||
try:
|
||
attachments, body, _ = self.mail.download_attachments(raw_email, mail_work_dir)
|
||
logger.info(f"下载到附件: {[os.path.basename(a) for a in attachments]}")
|
||
|
||
manifest_path = None
|
||
for att in attachments:
|
||
if os.path.basename(att) == 'manifest.txt':
|
||
manifest_path = att
|
||
break
|
||
if manifest_path:
|
||
with open(manifest_path, 'r', encoding='utf-8') as f:
|
||
rule_content = f.read()
|
||
logger.info("使用 manifest.txt 规则")
|
||
else:
|
||
rule_content = body
|
||
logger.info("使用邮件正文规则")
|
||
|
||
logger.info(f"规则原始内容:\n{rule_content}")
|
||
|
||
if not rule_content or not rule_content.strip():
|
||
logger.warning("规则内容为空,删除原邮件")
|
||
self.mail.delete_email(msg_id, proto_conn, proto_type)
|
||
return False
|
||
|
||
tasks = self.parser.parse(rule_content)
|
||
logger.info(f"解析出 {len(tasks)} 条规则指令")
|
||
for idx, t in enumerate(tasks):
|
||
logger.debug(f"规则{idx}: {t}")
|
||
|
||
all_output_files = []
|
||
original_info = []
|
||
|
||
for att in attachments:
|
||
if os.path.basename(att) == 'manifest.txt':
|
||
continue
|
||
att_basename = os.path.basename(att)
|
||
att_ext = os.path.splitext(att)[1][1:].lower()
|
||
|
||
if ZipHandler.is_archive(att):
|
||
archive_output_dir = os.path.join(mail_work_dir, f"out_{att_basename}")
|
||
os.makedirs(archive_output_dir, exist_ok=True)
|
||
output_files = self.process_archive(att, archive_output_dir, tasks)
|
||
all_output_files.extend(output_files)
|
||
original_info.append({
|
||
'original_name': att_basename,
|
||
'output_dir': archive_output_dir,
|
||
'output_files': output_files
|
||
})
|
||
elif att_ext in Config.SUPPORTED_INPUT_FORMATS:
|
||
single_output_dir = os.path.join(mail_work_dir, f"out_{att_basename}")
|
||
os.makedirs(single_output_dir, exist_ok=True)
|
||
output_files = self.process_single_image(att, single_output_dir, tasks, current_archive=None)
|
||
all_output_files.extend(output_files)
|
||
original_info.append({
|
||
'original_name': att_basename,
|
||
'output_dir': single_output_dir,
|
||
'output_files': output_files
|
||
})
|
||
else:
|
||
logger.warning(f"跳过不支持的文件: {att_basename} (扩展名: {att_ext})")
|
||
|
||
logger.info(f"总共生成 {len(all_output_files)} 个输出文件")
|
||
if not all_output_files:
|
||
logger.warning("没有生成任何转换结果,删除原邮件")
|
||
self.mail.delete_email(msg_id, proto_conn, proto_type)
|
||
return False
|
||
|
||
# 保持原附件样式打包
|
||
return_attachments = []
|
||
for info in original_info:
|
||
if len(info['output_files']) == 1:
|
||
return_attachments.append(info['output_files'][0])
|
||
else:
|
||
zip_name = f"{os.path.splitext(info['original_name'])[0]}_result.zip"
|
||
zip_path = os.path.join(mail_work_dir, zip_name)
|
||
ZipHandler.pack_to_zip(info['output_dir'], zip_path)
|
||
return_attachments.append(zip_path)
|
||
|
||
if Config.FLATTEN_OUTPUT:
|
||
return_attachments = all_output_files
|
||
|
||
# ========== 构建详细主题 ==========
|
||
conversion_details = []
|
||
for info in original_info:
|
||
orig_name = info['original_name']
|
||
out_files = info['output_files']
|
||
if len(out_files) == 1:
|
||
out_name = os.path.basename(out_files[0])
|
||
out_ext = os.path.splitext(out_name)[1][1:].upper()
|
||
conversion_details.append(f"{orig_name} → {out_ext}")
|
||
else:
|
||
conversion_details.append(f"{orig_name} → {len(out_files)}个文件")
|
||
|
||
if len(conversion_details) > 2:
|
||
detail_str = f"{', '.join(conversion_details[:2])} 等{len(conversion_details)}项"
|
||
else:
|
||
detail_str = ', '.join(conversion_details)
|
||
|
||
time_str = time.strftime("%Y-%m-%d %H:%M:%S")
|
||
result_subject = f"MailC[{time_str}]:{detail_str}"
|
||
|
||
# 发送结果
|
||
self.mail.send_result(
|
||
recipient=sender_email,
|
||
subject=result_subject,
|
||
attachments=return_attachments,
|
||
split_volume_mb=Config.SPLIT_VOLUME_SIZE_MB
|
||
)
|
||
logger.info(f"结果已发送至 {sender_email}")
|
||
|
||
self.mail.delete_email(msg_id, proto_conn, proto_type)
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"处理邮件时异常: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
try:
|
||
self.mail.send_error_report(sender_email, error_msg)
|
||
except Exception as send_err:
|
||
logger.error(f"发送错误报告失败: {send_err}")
|
||
self.mail.delete_email(msg_id, proto_conn, proto_type)
|
||
return False
|
||
finally:
|
||
if not Config.KEEP_TEMP_FILES:
|
||
shutil.rmtree(mail_work_dir, ignore_errors=True)
|
||
|
||
# ==================== 循环监听 ====================
|
||
def run_forever(self):
|
||
if not AgreementManager.is_agreed():
|
||
logger.info("首次使用,需要同意协议")
|
||
recipient, subject, body = AgreementManager.request_agreement(Config.ADMIN_EMAIL, self.mail)
|
||
self.mail.send_result(recipient, subject, [])
|
||
logger.info(f"协议请求已发送至 {recipient},请在 .env 中设置 AGREED_TOS=True")
|
||
return
|
||
|
||
logger.info(f"启动监听循环,轮询间隔 {Config.POLL_INTERVAL_SECONDS} 秒,{'无限循环' if Config.RUN_FOREVER else '单次运行'}")
|
||
|
||
while True:
|
||
TempManager.cleanup_stale()
|
||
conn = None
|
||
proto_type = None
|
||
try:
|
||
conn, proto_type = self.mail.connect_inbox()
|
||
if proto_type == "IMAP":
|
||
typ, data = conn.search(None, 'UNSEEN')
|
||
if typ != 'OK':
|
||
logger.error("IMAP搜索失败")
|
||
continue
|
||
msg_ids = data[0].split()
|
||
else:
|
||
msg_count = len(conn.list()[1])
|
||
msg_ids = list(range(1, msg_count + 1))
|
||
|
||
if not msg_ids:
|
||
logger.debug("无新邮件")
|
||
else:
|
||
logger.info(f"发现 {len(msg_ids)} 封待处理邮件")
|
||
for msg_id in msg_ids[:Config.MAX_EMAILS_PER_RUN]:
|
||
try:
|
||
if proto_type == "IMAP":
|
||
typ, msg_data = conn.fetch(msg_id, '(RFC822)')
|
||
if typ != 'OK':
|
||
continue
|
||
raw_email = msg_data[0][1]
|
||
else:
|
||
raw_email = b'\n'.join(conn.retr(msg_id)[1])
|
||
self.process_one_email(msg_id, raw_email, conn, proto_type)
|
||
except Exception as e:
|
||
logger.error(f"处理邮件 {msg_id} 崩溃: {e}", exc_info=True)
|
||
try:
|
||
self.mail.delete_email(msg_id, conn, proto_type)
|
||
except:
|
||
pass
|
||
if proto_type == "IMAP":
|
||
try:
|
||
conn.expunge()
|
||
except:
|
||
pass
|
||
except Exception as e:
|
||
logger.error(f"主循环异常: {e}", exc_info=True)
|
||
finally:
|
||
if conn:
|
||
try:
|
||
if proto_type == "IMAP":
|
||
conn.close()
|
||
conn.logout()
|
||
else:
|
||
conn.quit()
|
||
except:
|
||
pass
|
||
if not Config.RUN_FOREVER:
|
||
break
|
||
time.sleep(Config.POLL_INTERVAL_SECONDS)
|
||
|
||
|
||
def main():
|
||
converter = MailConverter()
|
||
converter.run_forever()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|