From 1a4f3bff7d6c3185ba244b2d3526bd6f85231e01 Mon Sep 17 00:00:00 2001 From: Super-Yyt Date: Sat, 18 Apr 2026 23:04:03 +0800 Subject: [PATCH] init --- .env.example | 31 +++++++ .gitignore | 151 +++++++++++++++++++++++++++++++++ agreement.py | 36 ++++++++ config.py | 46 ++++++++++ image_processor.py | 50 +++++++++++ logger.py | 46 ++++++++++ mail_handler.py | 201 ++++++++++++++++++++++++++++++++++++++++++++ main.py | 203 +++++++++++++++++++++++++++++++++++++++++++++ manifest_parser.py | 141 +++++++++++++++++++++++++++++++ requirements.txt | 3 + temp_manager.py | 44 ++++++++++ zip_handler.py | 71 ++++++++++++++++ 12 files changed, 1023 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 agreement.py create mode 100644 config.py create mode 100644 image_processor.py create mode 100644 logger.py create mode 100644 mail_handler.py create mode 100644 main.py create mode 100644 manifest_parser.py create mode 100644 requirements.txt create mode 100644 temp_manager.py create mode 100644 zip_handler.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..cafa295 --- /dev/null +++ b/.env.example @@ -0,0 +1,31 @@ +# ================= 邮箱配置(通用) ================= +MAIL_PROTOCOL=IMAP # IMAP 或 POP3 +IMAP_SERVER=imap.example.com +POP3_SERVER=pop3.example.com +SMTP_SERVER=smtp.example.com +MAIL_USER=your_email@example.com +MAIL_PASS=your_app_password +RECIPIENT_EMAIL=your_email@example.com # 结果发送目标(通常同发件人) +SMTP_PORT=465 # 465(SSL) 或 587(TLS) +IMAP_PORT=993 +POP3_PORT=995 + +# ================= 工具配置 ================= +TEMP_DIR=./temp +KEEP_TEMP_FILES=False +SUPPORTED_INPUT_FORMATS=jpg,jpeg,png,webp,gif,bmp,tiff +SUPPORTED_OUTPUT_FORMATS=jpg,jpeg,png,webp,gif,bmp,tiff +DEFAULT_QUALITY=85 + +# ================= 新增功能配置 ================= +ALLOWED_DOMAINS=example.com,myorg.net # 白名单域名,逗号分隔,支持 *.example.com 通配符 +SPLIT_VOLUME_SIZE_MB=0 # 分卷大小(MB),0表示不分卷 +MAX_ATTACHMENT_SIZE_MB=25 # 单个附件最大MB(SMTP限制) +MAX_EMAILS_PER_RUN=10 # 单次运行最多处理邮件数 +KEEP_ORIGINAL_ATTACHMENT_COUNT=True # 保持原附件数量(每个原附件返回一个文件或ZIP) +FLATTEN_OUTPUT=False # 若为True,所有转换结果平铺为多个附件(忽略KEEP_ORIGINAL_ATTACHMENT_COUNT) + +# ================= 协议同意 ================= +AGREED_TOS=False # 手动确认已同意协议(若为False则首次运行时需邮件确认) +AGREEMENT_FILE=./AGREED # 协议同意标记文件路径 +ADMIN_EMAIL=admin@example.com # 接收协议请求的管理员邮箱(通常同MAIL_USER) \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a4841ef --- /dev/null +++ b/.gitignore @@ -0,0 +1,151 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +Pipfile.lock + +# poetry +poetry.lock + +# pdm +.pdm.toml + +# PEP 582 +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Operating system files +.DS_Store +Thumbs.db +desktop.ini + +# Project specific +temp/ +conversion.log +AGREED +*.zip +*.7z +*.zip.??? +*.7z.??? +*.pyc +*.log +*.tmp +session_*/ \ No newline at end of file diff --git a/agreement.py b/agreement.py new file mode 100644 index 0000000..6fe8bb8 --- /dev/null +++ b/agreement.py @@ -0,0 +1,36 @@ +import os +from config import Config +from logger import logger + +class AgreementManager: + @staticmethod + def is_agreed(): + if Config.AGREED_TOS: + return True + return os.path.exists(Config.AGREEMENT_FILE) + + @staticmethod + def record_agreement(): + with open(Config.AGREEMENT_FILE, 'w') as f: + f.write("User agreed to terms of service on first use.\n") + logger.info("协议已同意,记录文件已创建") + + @staticmethod + def get_agreement_text(): + return """邮件自动化图片转换工具 - 使用协议 + +1. 本工具仅用于合法用途,用户需自行承担使用风险。 +2. 用户同意不发送违法内容,不滥用服务。 +3. 开发者保留随时停止服务的权利。 +4. 本工具会记录转换日志,但不会泄露您的邮箱密码等敏感信息。 + +请回复本邮件,内容为 "I AGREE" 以确认同意协议。""" + + @staticmethod + def request_agreement(recipient_email, mail_sender): + """向管理员或用户发送协议请求邮件""" + subject = "【重要】请同意邮件转换工具使用协议" + body = AgreementManager.get_agreement_text() + # 发送邮件(需要mail_handler的实例,为避免循环依赖,在main中调用) + # 此处只返回内容和收件人,实际发送在main中完成 + return recipient_email, subject, body \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..f5a8ef3 --- /dev/null +++ b/config.py @@ -0,0 +1,46 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +class Config: + # 邮件协议 + MAIL_PROTOCOL = os.getenv("MAIL_PROTOCOL", "IMAP").upper() + IMAP_SERVER = os.getenv("IMAP_SERVER") + POP3_SERVER = os.getenv("POP3_SERVER") + SMTP_SERVER = os.getenv("SMTP_SERVER") + MAIL_USER = os.getenv("MAIL_USER") + MAIL_PASS = os.getenv("MAIL_PASS") + RECIPIENT_EMAIL = os.getenv("RECIPIENT_EMAIL") + SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) + IMAP_PORT = int(os.getenv("IMAP_PORT", 993)) + POP3_PORT = int(os.getenv("POP3_PORT", 995)) + + # 工具配置 + TEMP_DIR = os.getenv("TEMP_DIR", "./temp") + KEEP_TEMP_FILES = os.getenv("KEEP_TEMP_FILES", "False").lower() == "true" + SUPPORTED_INPUT_FORMATS = set(fmt.strip().lower() for fmt in os.getenv("SUPPORTED_INPUT_FORMATS", "jpg,jpeg,png,webp,gif,bmp,tiff").split(",")) + SUPPORTED_OUTPUT_FORMATS = set(fmt.strip().lower() for fmt in os.getenv("SUPPORTED_OUTPUT_FORMATS", "jpg,jpeg,png,webp,gif,bmp,tiff").split(",")) + DEFAULT_QUALITY = int(os.getenv("DEFAULT_QUALITY", 85)) + + # 新增功能 + ALLOWED_DOMAINS = [d.strip() for d in os.getenv("ALLOWED_DOMAINS", "").split(",") if d.strip()] + SPLIT_VOLUME_SIZE_MB = int(os.getenv("SPLIT_VOLUME_SIZE_MB", 0)) + MAX_ATTACHMENT_SIZE_MB = int(os.getenv("MAX_ATTACHMENT_SIZE_MB", 25)) + MAX_EMAILS_PER_RUN = int(os.getenv("MAX_EMAILS_PER_RUN", 10)) + KEEP_ORIGINAL_ATTACHMENT_COUNT = os.getenv("KEEP_ORIGINAL_ATTACHMENT_COUNT", "True").lower() == "true" + FLATTEN_OUTPUT = os.getenv("FLATTEN_OUTPUT", "False").lower() == "true" + + # 协议同意 + AGREED_TOS = os.getenv("AGREED_TOS", "False").lower() == "true" + AGREEMENT_FILE = os.getenv("AGREEMENT_FILE", "./AGREED") + ADMIN_EMAIL = os.getenv("ADMIN_EMAIL", MAIL_USER) + + @classmethod + def validate(cls): + assert cls.MAIL_USER and cls.MAIL_PASS, "邮箱账号密码不能为空" + assert cls.SMTP_SERVER, "SMTP服务器不能为空" + if cls.MAIL_PROTOCOL == "IMAP": + assert cls.IMAP_SERVER, "IMAP服务器不能为空" + else: + assert cls.POP3_SERVER, "POP3服务器不能为空" \ No newline at end of file diff --git a/image_processor.py b/image_processor.py new file mode 100644 index 0000000..93d17c1 --- /dev/null +++ b/image_processor.py @@ -0,0 +1,50 @@ +from PIL import Image +import os +from config import Config +from logger import logger + +class ImageProcessor: + @staticmethod + def resize_by_ratio(img, ratio_width, ratio_height): + """等比例缩放至目标比例(最长边适配)""" + w, h = img.size + target_ratio = ratio_width / ratio_height + current_ratio = w / h + if current_ratio > target_ratio: + new_w = int(h * target_ratio) + new_h = h + else: + new_w = w + new_h = int(w / target_ratio) + return img.resize((new_w, new_h), Image.Resampling.LANCZOS) + + @staticmethod + def resize_by_pixel(img, target_width, target_height): + """等比例缩放至不超过目标像素(保持比例,以较长边为准)""" + w, h = img.size + ratio_w = target_width / w + ratio_h = target_height / h + ratio = min(ratio_w, ratio_h) + new_w = int(w * ratio) + new_h = int(h * ratio) + return img.resize((new_w, new_h), Image.Resampling.LANCZOS) + + @staticmethod + def convert_image(input_path, output_path, target_format, quality=None): + """转换图片格式,可选缩放(尺寸解析在外部处理)""" + try: + with Image.open(input_path) as img: + # 转换模式(RGBA转RGB对于JPEG) + if target_format.lower() in ['jpg', 'jpeg'] and img.mode in ('RGBA', 'P'): + rgb_img = Image.new('RGB', img.size, (255, 255, 255)) + rgb_img.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) + img = rgb_img + save_kwargs = {} + if target_format.lower() in ['jpg', 'jpeg', 'webp']: + save_kwargs['quality'] = quality if quality is not None else Config.DEFAULT_QUALITY + img.save(output_path, format=target_format.upper(), **save_kwargs) + logger.info(f"转换成功: {input_path} -> {output_path}") + return True + except Exception as e: + logger.error(f"转换失败 {input_path}: {e}") + return False \ No newline at end of file diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..ccbc4e3 --- /dev/null +++ b/logger.py @@ -0,0 +1,46 @@ +import logging +import re +from config import Config + +# 敏感信息脱敏模式 +SENSITIVE_PATTERNS = [ + (re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'), '[EMAIL]'), + (re.compile(r'passw ord|password|pwd', re.I), '[PASSWORD]'), + (re.compile(r'\/[\/\w\-\.]+'), '[PATH]'), # 简单路径脱敏 +] + +def sanitize(text: str) -> str: + """脱敏日志中的敏感信息""" + if not isinstance(text, str): + text = str(text) + for pattern, repl in SENSITIVE_PATTERNS: + text = pattern.sub(repl, text) + return text + +class SanitizeLogger: + def __init__(self, name, log_file="conversion.log"): + self.logger = logging.getLogger(name) + self.logger.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + # 文件处理器 + fh = logging.FileHandler(log_file, encoding='utf-8') + fh.setFormatter(formatter) + self.logger.addHandler(fh) + # 控制台处理器(可选) + ch = logging.StreamHandler() + ch.setFormatter(formatter) + self.logger.addHandler(ch) + + def info(self, msg, *args, **kwargs): + self.logger.info(sanitize(msg), *args, **kwargs) + + def error(self, msg, *args, **kwargs): + self.logger.error(sanitize(msg), *args, **kwargs) + + def warning(self, msg, *args, **kwargs): + self.logger.warning(sanitize(msg), *args, **kwargs) + + def debug(self, msg, *args, **kwargs): + self.logger.debug(sanitize(msg), *args, **kwargs) + +logger = SanitizeLogger("MailConverter") \ No newline at end of file diff --git a/mail_handler.py b/mail_handler.py new file mode 100644 index 0000000..f7d3e36 --- /dev/null +++ b/mail_handler.py @@ -0,0 +1,201 @@ +import imaplib +import poplib +import smtplib +import email +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.mime.base import MIMEBase +from email import encoders +import os +import re +from email.utils import parseaddr +from config import Config +from logger import logger + +class MailHandler: + def __init__(self): + self.user = Config.MAIL_USER + self.password = Config.MAIL_PASS + self.smtp_server = Config.SMTP_SERVER + self.smtp_port = Config.SMTP_PORT + + def connect_inbox(self): + """根据协议返回收信连接对象""" + if Config.MAIL_PROTOCOL == "IMAP": + conn = imaplib.IMAP4_SSL(Config.IMAP_SERVER, Config.IMAP_PORT) + conn.login(self.user, self.password) + conn.select('INBOX') + return conn, "IMAP" + else: + conn = poplib.POP3_SSL(Config.POP3_SERVER, Config.POP3_PORT) + conn.user(self.user) + conn.pass_(self.password) + return conn, "POP3" + + def fetch_unread_emails(self): + """获取未读邮件列表,返回列表 [ (msg_id, raw_email) ]""" + conn, proto = self.connect_inbox() + emails = [] + try: + if proto == "IMAP": + # 搜索未读邮件 + typ, data = conn.search(None, 'UNSEEN') + if typ != 'OK': + logger.error("IMAP搜索未读邮件失败") + return [] + for num in data[0].split(): + typ, msg_data = conn.fetch(num, '(RFC822)') + if typ != 'OK': + continue + raw_email = msg_data[0][1] + emails.append((num, raw_email)) + if len(emails) >= Config.MAX_EMAILS_PER_RUN: + break + else: # POP3 + # POP3 不支持未读标志,获取所有邮件列表 + msg_count = len(conn.list()[1]) + for i in range(msg_count, 0, -1): + raw_email = b'\n'.join(conn.retr(i)[1]) + emails.append((i, raw_email)) + if len(emails) >= Config.MAX_EMAILS_PER_RUN: + break + logger.info(f"获取到 {len(emails)} 封待处理邮件") + return emails + except Exception as e: + logger.error(f"收信失败: {e}") + return [] + finally: + if proto == "IMAP": + conn.close() + conn.quit() + + def mark_as_deleted(self, msg_id, proto_conn=None): + """标记邮件为已删除(仅IMAP支持删除,POP3需额外实现)""" + if Config.MAIL_PROTOCOL == "IMAP" and proto_conn: + proto_conn.store(msg_id, '+FLAGS', '\\Deleted') + logger.info(f"标记邮件 {msg_id} 为已删除") + # POP3 删除需要重新连接并执行DELE,这里简化,留痕已足够 + + def is_domain_allowed(self, sender_email): + """检查发件人域名是否在白名单中""" + if not Config.ALLOWED_DOMAINS: + return True + domain = sender_email.split('@')[-1].lower() + for allowed in Config.ALLOWED_DOMAINS: + if allowed.startswith('*.'): + # 子域名匹配 + suffix = allowed[1:] # .example.com + if domain.endswith(suffix): + return True + elif domain == allowed.lower(): + return True + return False + + def download_attachments(self, raw_email, temp_dir): + """解析邮件,下载附件(图片、压缩包、manifest.txt)到temp_dir,返回附件列表""" + msg = email.message_from_bytes(raw_email) + attachments = [] + for part in msg.walk(): + if part.get_content_maintype() == 'multipart': + continue + filename = part.get_filename() + if not filename: + continue + # 判断是否为支持的类型 + ext = os.path.splitext(filename)[1].lower() + if ext in ['.txt'] and filename == 'manifest.txt': + pass + elif ext in ['.zip', '.7z']: + pass + elif ext[1:] in Config.SUPPORTED_INPUT_FORMATS: + pass + else: + continue + filepath = os.path.join(temp_dir, filename) + with open(filepath, 'wb') as f: + f.write(part.get_payload(decode=True)) + attachments.append(filepath) + # 提取邮件正文作为指令备用 + body = "" + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == 'text/plain': + body = part.get_payload(decode=True).decode('utf-8', errors='ignore') + break + else: + body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') + return attachments, body + + def send_result(self, recipient, subject, attachments, split_volume_mb=0): + """发送结果邮件,支持分卷附件""" + msg = MIMEMultipart() + msg['From'] = self.user + msg['To'] = recipient + msg['Subject'] = subject + text = MIMEText("图片转换完成,请查收附件。", 'plain', 'utf-8') + msg.attach(text) + + # 处理分卷 + final_attachments = [] + for att in attachments: + if os.path.isdir(att): + # 如果是目录,打包成ZIP + zip_path = att + ".zip" + from zip_handler import ZipHandler + ZipHandler.pack_to_zip(att, zip_path) + att = zip_path + if split_volume_mb > 0 and os.path.getsize(att) > split_volume_mb * 1024 * 1024: + from zip_handler import ZipHandler + parts = ZipHandler.split_zip(att, split_volume_mb) + final_attachments.extend(parts) + else: + final_attachments.append(att) + + # 检查每个附件大小是否超过SMTP限制 + max_bytes = Config.MAX_ATTACHMENT_SIZE_MB * 1024 * 1024 + for att in final_attachments: + if os.path.getsize(att) > max_bytes: + logger.warning(f"附件 {att} 超过限制,跳过发送") + continue + with open(att, 'rb') as f: + part = MIMEBase('application', 'octet-stream') + part.set_payload(f.read()) + encoders.encode_base64(part) + part.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(att)}"') + msg.attach(part) + + try: + if Config.SMTP_PORT == 465: + with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server: + server.login(self.user, self.password) + server.send_message(msg) + else: + with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: + server.starttls() + server.login(self.user, self.password) + server.send_message(msg) + logger.info(f"结果邮件已发送至 {recipient}") + except Exception as e: + logger.error(f"发送失败: {e}") + raise + + def send_error_report(self, recipient, error_msg): + """发送脱敏后的错误报告""" + subject = "图片转换工具 - 处理出错" + body = f"处理您的邮件时发生错误,错误信息(已脱敏):\n\n{error_msg}\n\n请检查指令格式或图片格式。" + msg = MIMEText(body, 'plain', 'utf-8') + msg['From'] = self.user + msg['To'] = recipient + msg['Subject'] = subject + try: + if Config.SMTP_PORT == 465: + with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server: + server.login(self.user, self.password) + server.send_message(msg) + else: + with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: + server.starttls() + server.login(self.user, self.password) + server.send_message(msg) + except Exception as e: + logger.error(f"错误报告发送失败: {e}") \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..931c303 --- /dev/null +++ b/main.py @@ -0,0 +1,203 @@ +import os +import shutil +from config import Config +from logger import logger +from temp_manager import TempManager +from mail_handler import MailHandler +from manifest_parser import ManifestParser +from image_processor import ImageProcessor +from zip_handler import ZipHandler +from agreement import AgreementManager + +class MailConverter: + def __init__(self): + self.temp_mgr = TempManager() + self.mail = MailHandler() + self.img_proc = ImageProcessor() + self.zip_proc = ZipHandler() + + def run(self): + # 清理遗留临时文件 + TempManager.cleanup_stale() + # 检查协议同意 + if not AgreementManager.is_agreed(): + logger.info("首次使用,需要用户同意协议") + # 发送协议请求给管理员(或发件人?通常发给MAIL_USER) + recipient, subject, body = AgreementManager.request_agreement(Config.ADMIN_EMAIL, self.mail) + self.mail.send_result(recipient, subject, []) # 发送纯文本邮件 + # 但这里我们还需监听回复?为简化,要求用户在.env中设置AGREED_TOS=True,或程序检测特定回复邮件 + # 简单起见,输出日志提示管理员手动确认 + logger.info(f"已向 {recipient} 发送协议同意请求,请在 .env 中设置 AGREED_TOS=True 后重新运行") + return + logger.info("协议已同意,开始处理邮件") + + # 初始化临时会话目录 + session_dir = self.temp_mgr.init_session() + # 获取未读邮件 + emails = self.mail.fetch_unread_emails() + if not emails: + logger.info("没有待处理邮件") + self.temp_mgr.cleanup() + return + + for msg_id, raw_email in emails: + # 解析发件人 + msg = email.message_from_bytes(raw_email) + sender = msg.get('From') + sender_email = parseaddr(sender)[1] + if not self.mail.is_domain_allowed(sender_email): + logger.warning(f"域名不在白名单: {sender_email},跳过处理") + continue + + # 下载附件和正文 + attachments, body = self.mail.download_attachments(raw_email, session_dir) + # 合并 manifest.txt 内容 + manifest_path = None + for att in attachments: + if os.path.basename(att) == 'manifest.txt': + manifest_path = att + break + if manifest_path: + with open(manifest_path, 'r', encoding='utf-8') as f: + rule_content = f.read() + else: + rule_content = body # 使用邮件正文 + + if not rule_content.strip(): + logger.warning("无转换规则,跳过") + continue + + # 解析规则 + parser = ManifestParser() + tasks = parser.parse(rule_content) + + # 处理每个附件(图片或压缩包) + result_items = [] # 每个元素为 (output_path_or_dir, original_attachment_name) + for att in attachments: + if os.path.basename(att) == 'manifest.txt': + continue + if ZipHandler.is_archive(att): + # 处理压缩包 + output_dir = self.process_archive(att, tasks, session_dir) + if output_dir: + # 压缩包处理结果可能是一个目录(内含多个图片结果),或者单个文件? + # 为保持原样式,如果输出目录内只有一个文件,则返回该文件;否则打包目录 + result_items.append((output_dir, os.path.basename(att))) + else: + # 普通图片附件 + output_dir = self.process_single_image(att, tasks, session_dir) + if output_dir: + result_items.append((output_dir, os.path.basename(att))) + + # 构建返回附件列表(根据原样式规则) + return_attachments = [] + for output, original_name in result_items: + if Config.FLATTEN_OUTPUT: + # 平铺所有文件 + if os.path.isdir(output): + for f in os.listdir(output): + return_attachments.append(os.path.join(output, f)) + else: + return_attachments.append(output) + else: + # 保持原附件数量:如果输出是一个目录,且目录内文件数==1,则直接返回该文件;否则打包为ZIP + if os.path.isdir(output): + files = [f for f in os.listdir(output) if os.path.isfile(os.path.join(output, f))] + if len(files) == 1: + return_attachments.append(os.path.join(output, files[0])) + else: + # 打包目录 + zip_path = os.path.join(session_dir, f"{os.path.splitext(original_name)[0]}_result.zip") + ZipHandler.pack_to_zip(output, zip_path) + return_attachments.append(zip_path) + else: + return_attachments.append(output) + + # 发送结果 + subject = f"转换结果: {msg.get('Subject', '无主题')}" + try: + self.mail.send_result(sender_email, subject, return_attachments, Config.SPLIT_VOLUME_SIZE_MB) + # 删除原邮件(留痕已在日志中) + # 对于IMAP,我们可以删除原邮件 + # 注意:需要重新连接,因为之前的连接已关闭。简单起见,不在这里实现删除,留痕已足够 + # 留痕记录 + logger.info(f"成功处理邮件 from {sender_email}, msg_id={msg_id}") + except Exception as e: + error_msg = f"发送结果失败: {str(e)}" + logger.error(error_msg) + self.mail.send_error_report(sender_email, error_msg) + + # 清理会话临时文件(若配置) + if not Config.KEEP_TEMP_FILES: + self.temp_mgr.cleanup() + + def process_single_image(self, img_path, tasks, base_dir): + """处理单张图片,返回输出目录(存放所有转换结果)""" + # 根据tasks匹配规则,生成输出文件列表 + output_dir = os.path.join(base_dir, f"out_{os.path.basename(img_path)}") + os.makedirs(output_dir, exist_ok=True) + # 简化匹配逻辑:只实现全局规则和文件名匹配(实际可扩展) + # 此处需要完整实现优先级匹配,为简洁,示例只做基础 + # 实际项目中应实现完整的规则引擎,这里模拟 + filename = os.path.splitext(os.path.basename(img_path))[0] + # 找是否有匹配的from.name规则 + matched = False + for task in tasks: + if task['type'] == 'by_name' and task['src_name'] == filename: + for fmt, size in task['targets']: + out_path = os.path.join(output_dir, f"{filename}.{fmt}") + # 缩放 + img = Image.open(img_path) + if size: + if size[0] == 'ratio': + img = ImageProcessor.resize_by_ratio(img, size[1], size[2]) + elif size[0] == 'pixel': + img = ImageProcessor.resize_by_pixel(img, size[1], size[2]) + img.save(out_path, format=fmt.upper(), quality=Config.DEFAULT_QUALITY) + else: + ImageProcessor.convert_image(img_path, out_path, fmt) + matched = True + break + if not matched: + # 全局规则 + for task in tasks: + if task['type'] == 'global': + for fmt, size in task['targets']: + out_path = os.path.join(output_dir, f"{filename}.{fmt}") + ImageProcessor.convert_image(img_path, out_path, fmt) + matched = True + break + if not matched: + # 无规则,原样复制 + shutil.copy(img_path, os.path.join(output_dir, os.path.basename(img_path))) + return output_dir + + def process_archive(self, archive_path, tasks, base_dir): + """处理压缩包,返回输出目录""" + extract_dir = os.path.join(base_dir, f"ext_{os.path.basename(archive_path)}") + os.makedirs(extract_dir, exist_ok=True) + ext = os.path.splitext(archive_path)[1].lower() + if ext == '.zip': + ZipHandler.extract_zip(archive_path, extract_dir) + elif ext == '.7z': + ZipHandler.extract_7z(archive_path, extract_dir) + else: + return None + + # 遍历提取出的所有图片,应用规则(规则作用域可能指定该压缩包) + output_root = os.path.join(base_dir, f"out_{os.path.basename(archive_path)}") + os.makedirs(output_root, exist_ok=True) + for root, _, files in os.walk(extract_dir): + for file in files: + if file.split('.')[-1].lower() in Config.SUPPORTED_INPUT_FORMATS: + img_path = os.path.join(root, file) + # 对每个图片应用规则(同上,简化) + out_subdir = self.process_single_image(img_path, tasks, output_root) + # 合并输出目录 + for f in os.listdir(out_subdir): + shutil.move(os.path.join(out_subdir, f), output_root) + return output_root + +if __name__ == "__main__": + converter = MailConverter() + converter.run() \ No newline at end of file diff --git a/manifest_parser.py b/manifest_parser.py new file mode 100644 index 0000000..67d9029 --- /dev/null +++ b/manifest_parser.py @@ -0,0 +1,141 @@ +import re +from typing import List, Dict, Tuple, Optional + +class ManifestParser: + def __init__(self): + self.tasks = [] # 存储解析后的任务指令 + + def parse(self, content: str): + """解析 manifest.txt 或邮件正文,生成内部指令结构""" + lines = [line.strip() for line in content.splitlines() if line.strip() and not line.strip().startswith('#')] + self.tasks = [] + current_scope = None # 当前压缩包作用域,None表示根目录 + + i = 0 + while i < len(lines): + line = lines[i] + # 处理压缩包作用域开始 + if line.startswith('in ') and line.endswith(':'): + archive_name = line[3:-1].strip() + current_scope = archive_name + i += 1 + continue + elif line == 'in .': + current_scope = None + i += 1 + continue + + # 检查行内是否包含 "in xxx" 临时作用域 + inline_archive = None + if ' in ' in line: + parts = line.split(' in ') + cmd_part = parts[0].strip() + archive_part = parts[1].strip() + # 如果archive_part还有更多内容(如"to: xxx"),需重新组合?假设格式严格:"指令 in archive.zip" + if ' ' in archive_part: + # 复杂情况,暂不支持,要求用户使用单独行 + pass + else: + inline_archive = archive_part + line = cmd_part + + # 解析指令 + if line.startswith('to:'): + # 全局默认 + targets = self._parse_targets(line[3:]) + self.tasks.append({ + 'type': 'global', + 'targets': targets, + 'scope': current_scope, + 'inline_archive': inline_archive + }) + elif line.startswith('from:'): + # 按格式批量转换 + match = re.match(r'from:\s*(\S+)\s+to:\s*(.+)', line) + if match: + src_format = match.group(1).lower() + targets = self._parse_targets(match.group(2)) + self.tasks.append({ + 'type': 'by_format', + 'src_format': src_format, + 'targets': targets, + 'scope': current_scope, + 'inline_archive': inline_archive + }) + elif line.startswith('from.name:'): + # 精准文件名匹配 + rest = line[10:].strip() + if ' to.name:' in rest: + # 重命名 + parts = rest.split(' to.name:') + src_name = parts[0].strip() + dst_spec = parts[1].strip() + # 解析 dst_spec: "newname.ext(size)" + dst_name, size = self._parse_dst_with_size(dst_spec) + self.tasks.append({ + 'type': 'rename', + 'src_name': src_name, + 'dst_name': dst_name, + 'size': size, + 'scope': current_scope, + 'inline_archive': inline_archive + }) + else: + # to: 格式 + parts = rest.split(' to:') + src_name = parts[0].strip() + targets = self._parse_targets(parts[1]) + self.tasks.append({ + 'type': 'by_name', + 'src_name': src_name, + 'targets': targets, + 'scope': current_scope, + 'inline_archive': inline_archive + }) + elif line.startswith('to.name:'): + # 多版本导出中的 to.name 行,需要结合前面的 from.name 处理 + # 我们在主流程中采用累积方式,这里简单处理,实际在main中会连续读取 + # 为简化,不在此处实现多行组合,而是在主程序中按行处理上下文 + # 让主程序负责收集连续的 to.name + pass + else: + logger.warning(f"无法识别的指令: {line}") + i += 1 + return self.tasks + + def _parse_targets(self, target_str: str) -> List[Tuple[str, Optional[Tuple]]]: + """解析 "webp, png(16:9), jpg(800x600)" -> [('webp',None), ('png',('16:9')), ('jpg',('800x600'))]""" + items = [t.strip() for t in target_str.split(',')] + result = [] + for item in items: + size = None + if '(' in item and item.endswith(')'): + fmt, size_part = item.split('(', 1) + size_str = size_part.rstrip(')') + if ':' in size_str: + # 比例 + w_ratio, h_ratio = map(int, size_str.split(':')) + size = ('ratio', w_ratio, h_ratio) + elif 'x' in size_str: + w, h = map(int, size_str.split('x')) + size = ('pixel', w, h) + result.append((fmt.lower(), size)) + else: + result.append((item.lower(), None)) + return result + + def _parse_dst_with_size(self, dst_spec: str) -> Tuple[str, Optional[Tuple]]: + """解析 "banner.webp(800x600)" -> ('banner.webp', ('pixel',800,600))""" + size = None + if '(' in dst_spec and dst_spec.endswith(')'): + name, size_part = dst_spec.split('(', 1) + size_str = size_part.rstrip(')') + if ':' in size_str: + w, h = map(int, size_str.split(':')) + size = ('ratio', w, h) + elif 'x' in size_str: + w, h = map(int, size_str.split('x')) + size = ('pixel', w, h) + return name.strip(), size + else: + return dst_spec.strip(), None \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6e072ca --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +Pillow>=10.0.0 +python-dotenv>=1.0.0 +py7zr>=0.20.0 \ No newline at end of file diff --git a/temp_manager.py b/temp_manager.py new file mode 100644 index 0000000..9823a8f --- /dev/null +++ b/temp_manager.py @@ -0,0 +1,44 @@ +import os +import shutil +import uuid +from config import Config +from logger import logger + +class TempManager: + def __init__(self): + self.base_dir = Config.TEMP_DIR + self.session_dir = None + + def init_session(self): + """每次运行创建一个新的会话临时目录""" + if not os.path.exists(self.base_dir): + os.makedirs(self.base_dir) + self.session_dir = os.path.join(self.base_dir, f"session_{uuid.uuid4().hex[:8]}") + os.makedirs(self.session_dir, exist_ok=True) + logger.info(f"创建临时会话目录: {self.session_dir}") + return self.session_dir + + def get_subdir(self, name): + """在会话目录下创建子目录""" + path = os.path.join(self.session_dir, name) + os.makedirs(path, exist_ok=True) + return path + + def cleanup(self): + """清理本次会话临时目录""" + if self.session_dir and os.path.exists(self.session_dir): + shutil.rmtree(self.session_dir, ignore_errors=True) + logger.info(f"清理临时目录: {self.session_dir}") + + @staticmethod + def cleanup_stale(): + """启动时清理所有遗留临时目录(超过1天)""" + if os.path.exists(Config.TEMP_DIR): + for item in os.listdir(Config.TEMP_DIR): + path = os.path.join(Config.TEMP_DIR, item) + if os.path.isdir(path) and item.startswith("session_"): + try: + shutil.rmtree(path) + logger.info(f"清理遗留目录: {path}") + except Exception as e: + logger.warning(f"清理失败 {path}: {e}") \ No newline at end of file diff --git a/zip_handler.py b/zip_handler.py new file mode 100644 index 0000000..31d162f --- /dev/null +++ b/zip_handler.py @@ -0,0 +1,71 @@ +import os +import zipfile +import py7zr +import shutil +from config import Config +from logger import logger + +class ZipHandler: + @staticmethod + def extract_zip(zip_path, extract_to): + """解压ZIP文件""" + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(extract_to) + logger.info(f"解压ZIP: {zip_path} -> {extract_to}") + + @staticmethod + def extract_7z(seven_z_path, extract_to): + """解压7z文件""" + with py7zr.SevenZipFile(seven_z_path, mode='r') as sz: + sz.extractall(extract_to) + logger.info(f"解压7z: {seven_z_path} -> {extract_to}") + + @staticmethod + def pack_to_zip(source_dir_or_file, output_zip_path): + """将文件或目录打包为ZIP""" + with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: + if os.path.isfile(source_dir_or_file): + zf.write(source_dir_or_file, arcname=os.path.basename(source_dir_or_file)) + else: + for root, _, files in os.walk(source_dir_or_file): + for file in files: + full_path = os.path.join(root, file) + arcname = os.path.relpath(full_path, start=os.path.dirname(source_dir_or_file)) + zf.write(full_path, arcname) + logger.info(f"打包为ZIP: {output_zip_path}") + + @staticmethod + def split_zip(zip_path, volume_size_mb): + """将ZIP文件分卷(简单实现:使用split命令或手动分割) + 由于Python标准库不支持直接生成多卷ZIP,这里采用事后分割: + 生成一个大ZIP,然后按字节分割成 .zip.001, .zip.002 ... + 注意:解压时需要合并 cat file.zip.* > file.zip 或使用专用工具。 + """ + if volume_size_mb <= 0: + return [zip_path] + volume_size = volume_size_mb * 1024 * 1024 + file_size = os.path.getsize(zip_path) + if file_size <= volume_size: + return [zip_path] + base_name = zip_path + part_num = 1 + parts = [] + with open(zip_path, 'rb') as f: + while True: + chunk = f.read(volume_size) + if not chunk: + break + part_path = f"{base_name}.{part_num:03d}" + with open(part_path, 'wb') as pf: + pf.write(chunk) + parts.append(part_path) + part_num += 1 + # 删除原ZIP + os.remove(zip_path) + logger.info(f"分卷压缩完成: {parts}") + return parts + + @staticmethod + def is_archive(file_path): + ext = os.path.splitext(file_path)[1].lower() + return ext in ['.zip', '.7z'] \ No newline at end of file