#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import shutil import time import email from email.utils import parseaddr from typing import List, Dict, Optional, Tuple from PIL import Image from config import Config from logger import logger from temp_manager import TempManager from mail_handler import MailHandler from manifest_parser import ManifestParser from image_processor import ImageProcessor from zip_handler import ZipHandler from agreement import AgreementManager class MailConverter: def __init__(self): self.temp_mgr = TempManager() self.mail = MailHandler() self.img_proc = ImageProcessor() self.zip_proc = ZipHandler() self.parser = ManifestParser() # ==================== 规则匹配核心 ==================== def match_rules_for_file( self, filename: str, file_ext: str, tasks: List[Dict], current_archive: Optional[str] = None ) -> List[Dict]: base_name = os.path.splitext(filename)[0] results = [] applicable = [] for task in tasks: task_scope = task.get('scope') task_inline = task.get('inline_archive') effective_scope = task_inline if task_inline else task_scope if current_archive: if effective_scope is None or effective_scope == current_archive: applicable.append(task) else: if effective_scope is None: applicable.append(task) logger.debug(f"文件 {filename} 在作用域 {current_archive} 下匹配到 {len(applicable)} 条规则") # 1. 重命名 rename_tasks = [t for t in applicable if t.get('type') == 'rename' and t.get('src_name') == base_name] if rename_tasks: for task in rename_tasks: dst_name = task['dst_name'] dst_ext = os.path.splitext(dst_name)[1][1:].lower() size = task.get('size') results.append({ 'output_name': dst_name, 'format': dst_ext, 'size': size }) return results # 2. 精准文件名 name_tasks = [t for t in applicable if t.get('type') == 'by_name' and t.get('src_name') == base_name] if name_tasks: for task in name_tasks: for fmt, size in task.get('targets', []): output_name = f"{base_name}.{fmt}" results.append({ 'output_name': output_name, 'format': fmt, 'size': size }) return results # 3. 按格式批量 format_tasks = [t for t in applicable if t.get('type') == 'by_format' and t.get('src_format') == file_ext.lower()] if format_tasks: for task in format_tasks: for fmt, size in task.get('targets', []): output_name = f"{base_name}.{fmt}" results.append({ 'output_name': output_name, 'format': fmt, 'size': size }) return results # 4. 全局默认 global_tasks = [t for t in applicable if t.get('type') == 'global'] if global_tasks: for task in global_tasks: for fmt, size in task.get('targets', []): output_name = f"{base_name}.{fmt}" results.append({ 'output_name': output_name, 'format': fmt, 'size': size }) return results # 无规则,原样保留 results.append({ 'output_name': filename, 'format': file_ext, 'size': None }) return results # ==================== 单张图片处理 ==================== def process_single_image( self, input_path: str, output_dir: str, tasks: List[Dict], current_archive: Optional[str] = None ) -> List[str]: filename = os.path.basename(input_path) file_ext = os.path.splitext(filename)[1][1:].lower() output_files = [] output_tasks = self.match_rules_for_file(filename, file_ext, tasks, current_archive) logger.info(f"图片 {filename} 将生成 {len(output_tasks)} 个输出任务") for task in output_tasks: out_name = task['output_name'] out_fmt = task['format'] out_size = task['size'] out_path = os.path.join(output_dir, out_name) try: if out_size: with Image.open(input_path) as img: if out_size[0] == 'ratio': img = ImageProcessor.resize_by_ratio(img, out_size[1], out_size[2]) elif out_size[0] == 'pixel': img = ImageProcessor.resize_by_pixel(img, out_size[1], out_size[2]) pillow_format = ImageProcessor.FORMAT_MAP.get(out_fmt.lower()) if not pillow_format: logger.error(f"不支持的输出格式: {out_fmt}") continue if pillow_format == 'JPEG' and img.mode in ('RGBA', 'P'): rgb_img = Image.new('RGB', img.size, (255, 255, 255)) rgb_img.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) img = rgb_img save_kwargs = {} if pillow_format in ('JPEG', 'WEBP'): save_kwargs['quality'] = Config.DEFAULT_QUALITY img.save(out_path, format=pillow_format, **save_kwargs) logger.info(f"转换+缩放成功: {input_path} -> {out_path}") else: success = ImageProcessor.convert_image(input_path, out_path, out_fmt, Config.DEFAULT_QUALITY) if not success: logger.error(f"转换失败: {input_path} -> {out_path}") continue output_files.append(out_path) except Exception as e: logger.error(f"处理图片异常 {input_path}: {e}", exc_info=True) continue return output_files # ==================== 压缩包处理 ==================== def process_archive( self, archive_path: str, output_base_dir: str, tasks: List[Dict] ) -> List[str]: archive_name = os.path.basename(archive_path) extract_dir = os.path.join(output_base_dir, f"ext_{archive_name}") os.makedirs(extract_dir, exist_ok=True) ext = os.path.splitext(archive_path)[1].lower() try: if ext == '.zip': ZipHandler.extract_zip(archive_path, extract_dir) elif ext == '.7z': ZipHandler.extract_7z(archive_path, extract_dir) else: logger.warning(f"不支持的压缩包格式: {archive_path}") return [] except Exception as e: logger.error(f"解压失败 {archive_path}: {e}") return [] all_output_files = [] for root, _, files in os.walk(extract_dir): for file in files: file_path = os.path.join(root, file) file_ext_lower = os.path.splitext(file)[1][1:].lower() if file_ext_lower not in Config.SUPPORTED_INPUT_FORMATS: continue output_files = self.process_single_image( file_path, output_base_dir, tasks, current_archive=archive_name ) all_output_files.extend(output_files) if not Config.KEEP_TEMP_FILES: shutil.rmtree(extract_dir, ignore_errors=True) return all_output_files # ==================== 邮件处理主流程 ==================== def process_one_email(self, msg_id, raw_email, proto_conn, proto_type) -> bool: msg = email.message_from_bytes(raw_email) sender = msg.get('From') sender_email = parseaddr(sender)[1] subject = msg.get('Subject', '无主题') logger.info(f"开始处理邮件: {subject} from {sender_email}") if not self.mail.is_domain_allowed(sender_email): logger.warning(f"域名不在白名单: {sender_email},删除原邮件") self.mail.delete_email(msg_id, proto_conn, proto_type) return False mail_work_dir = os.path.join(Config.TEMP_DIR, f"mail_{msg_id}_{int(time.time())}") os.makedirs(mail_work_dir, exist_ok=True) try: attachments, body, _ = self.mail.download_attachments(raw_email, mail_work_dir) logger.info(f"下载到附件: {[os.path.basename(a) for a in attachments]}") manifest_path = None for att in attachments: if os.path.basename(att) == 'manifest.txt': manifest_path = att break if manifest_path: with open(manifest_path, 'r', encoding='utf-8') as f: rule_content = f.read() logger.info("使用 manifest.txt 规则") else: rule_content = body logger.info("使用邮件正文规则") logger.info(f"规则原始内容:\n{rule_content}") if not rule_content or not rule_content.strip(): logger.warning("规则内容为空,删除原邮件") self.mail.delete_email(msg_id, proto_conn, proto_type) return False tasks = self.parser.parse(rule_content) logger.info(f"解析出 {len(tasks)} 条规则指令") for idx, t in enumerate(tasks): logger.debug(f"规则{idx}: {t}") all_output_files = [] original_info = [] for att in attachments: if os.path.basename(att) == 'manifest.txt': continue att_basename = os.path.basename(att) att_ext = os.path.splitext(att)[1][1:].lower() if ZipHandler.is_archive(att): archive_output_dir = os.path.join(mail_work_dir, f"out_{att_basename}") os.makedirs(archive_output_dir, exist_ok=True) output_files = self.process_archive(att, archive_output_dir, tasks) all_output_files.extend(output_files) original_info.append({ 'original_name': att_basename, 'output_dir': archive_output_dir, 'output_files': output_files }) elif att_ext in Config.SUPPORTED_INPUT_FORMATS: single_output_dir = os.path.join(mail_work_dir, f"out_{att_basename}") os.makedirs(single_output_dir, exist_ok=True) output_files = self.process_single_image(att, single_output_dir, tasks, current_archive=None) all_output_files.extend(output_files) original_info.append({ 'original_name': att_basename, 'output_dir': single_output_dir, 'output_files': output_files }) else: logger.warning(f"跳过不支持的文件: {att_basename} (扩展名: {att_ext})") logger.info(f"总共生成 {len(all_output_files)} 个输出文件") if not all_output_files: logger.warning("没有生成任何转换结果,删除原邮件") self.mail.delete_email(msg_id, proto_conn, proto_type) return False # 保持原附件样式打包 return_attachments = [] for info in original_info: if len(info['output_files']) == 1: return_attachments.append(info['output_files'][0]) else: zip_name = f"{os.path.splitext(info['original_name'])[0]}_result.zip" zip_path = os.path.join(mail_work_dir, zip_name) ZipHandler.pack_to_zip(info['output_dir'], zip_path) return_attachments.append(zip_path) if Config.FLATTEN_OUTPUT: return_attachments = all_output_files # ========== 构建详细主题 ========== conversion_details = [] for info in original_info: orig_name = info['original_name'] out_files = info['output_files'] if len(out_files) == 1: out_name = os.path.basename(out_files[0]) out_ext = os.path.splitext(out_name)[1][1:].upper() conversion_details.append(f"{orig_name} → {out_ext}") else: conversion_details.append(f"{orig_name} → {len(out_files)}个文件") if len(conversion_details) > 2: detail_str = f"{', '.join(conversion_details[:2])} 等{len(conversion_details)}项" else: detail_str = ', '.join(conversion_details) time_str = time.strftime("%Y-%m-%d %H:%M:%S") result_subject = f"MailC[{time_str}]:{detail_str}" # 发送结果 self.mail.send_result( recipient=sender_email, subject=result_subject, attachments=return_attachments, split_volume_mb=Config.SPLIT_VOLUME_SIZE_MB ) logger.info(f"结果已发送至 {sender_email}") self.mail.delete_email(msg_id, proto_conn, proto_type) return True except Exception as e: error_msg = f"处理邮件时异常: {str(e)}" logger.error(error_msg, exc_info=True) try: self.mail.send_error_report(sender_email, error_msg) except Exception as send_err: logger.error(f"发送错误报告失败: {send_err}") self.mail.delete_email(msg_id, proto_conn, proto_type) return False finally: if not Config.KEEP_TEMP_FILES: shutil.rmtree(mail_work_dir, ignore_errors=True) # ==================== 循环监听 ==================== def run_forever(self): if not AgreementManager.is_agreed(): logger.info("首次使用,需要同意协议") recipient, subject, body = AgreementManager.request_agreement(Config.ADMIN_EMAIL, self.mail) self.mail.send_result(recipient, subject, []) logger.info(f"协议请求已发送至 {recipient},请在 .env 中设置 AGREED_TOS=True") return logger.info(f"启动监听循环,轮询间隔 {Config.POLL_INTERVAL_SECONDS} 秒,{'无限循环' if Config.RUN_FOREVER else '单次运行'}") while True: TempManager.cleanup_stale() conn = None proto_type = None try: conn, proto_type = self.mail.connect_inbox() if proto_type == "IMAP": typ, data = conn.search(None, 'UNSEEN') if typ != 'OK': logger.error("IMAP搜索失败") continue msg_ids = data[0].split() else: msg_count = len(conn.list()[1]) msg_ids = list(range(1, msg_count + 1)) if not msg_ids: logger.debug("无新邮件") else: logger.info(f"发现 {len(msg_ids)} 封待处理邮件") for msg_id in msg_ids[:Config.MAX_EMAILS_PER_RUN]: try: if proto_type == "IMAP": typ, msg_data = conn.fetch(msg_id, '(RFC822)') if typ != 'OK': continue raw_email = msg_data[0][1] else: raw_email = b'\n'.join(conn.retr(msg_id)[1]) self.process_one_email(msg_id, raw_email, conn, proto_type) except Exception as e: logger.error(f"处理邮件 {msg_id} 崩溃: {e}", exc_info=True) try: self.mail.delete_email(msg_id, conn, proto_type) except: pass if proto_type == "IMAP": try: conn.expunge() except: pass except Exception as e: logger.error(f"主循环异常: {e}", exc_info=True) finally: if conn: try: if proto_type == "IMAP": conn.close() conn.logout() else: conn.quit() except: pass if not Config.RUN_FOREVER: break time.sleep(Config.POLL_INTERVAL_SECONDS) def main(): converter = MailConverter() converter.run_forever() if __name__ == "__main__": main()