This commit is contained in:
Super-Yyt
2026-04-18 23:04:03 +08:00
parent 51a380e480
commit 1a4f3bff7d
12 changed files with 1023 additions and 0 deletions
+31
View File
@@ -0,0 +1,31 @@
# ================= 邮箱配置(通用) =================
MAIL_PROTOCOL=IMAP # IMAP 或 POP3
IMAP_SERVER=imap.example.com
POP3_SERVER=pop3.example.com
SMTP_SERVER=smtp.example.com
MAIL_USER=your_email@example.com
MAIL_PASS=your_app_password
RECIPIENT_EMAIL=your_email@example.com # 结果发送目标(通常同发件人)
SMTP_PORT=465 # 465(SSL) 或 587(TLS)
IMAP_PORT=993
POP3_PORT=995
# ================= 工具配置 =================
TEMP_DIR=./temp
KEEP_TEMP_FILES=False
SUPPORTED_INPUT_FORMATS=jpg,jpeg,png,webp,gif,bmp,tiff
SUPPORTED_OUTPUT_FORMATS=jpg,jpeg,png,webp,gif,bmp,tiff
DEFAULT_QUALITY=85
# ================= 新增功能配置 =================
ALLOWED_DOMAINS=example.com,myorg.net # 白名单域名,逗号分隔,支持 *.example.com 通配符
SPLIT_VOLUME_SIZE_MB=0 # 分卷大小(MB)0表示不分卷
MAX_ATTACHMENT_SIZE_MB=25 # 单个附件最大MBSMTP限制)
MAX_EMAILS_PER_RUN=10 # 单次运行最多处理邮件数
KEEP_ORIGINAL_ATTACHMENT_COUNT=True # 保持原附件数量(每个原附件返回一个文件或ZIP)
FLATTEN_OUTPUT=False # 若为True,所有转换结果平铺为多个附件(忽略KEEP_ORIGINAL_ATTACHMENT_COUNT
# ================= 协议同意 =================
AGREED_TOS=False # 手动确认已同意协议(若为False则首次运行时需邮件确认)
AGREEMENT_FILE=./AGREED # 协议同意标记文件路径
ADMIN_EMAIL=admin@example.com # 接收协议请求的管理员邮箱(通常同MAIL_USER)
+151
View File
@@ -0,0 +1,151 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
Pipfile.lock
# poetry
poetry.lock
# pdm
.pdm.toml
# PEP 582
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Operating system files
.DS_Store
Thumbs.db
desktop.ini
# Project specific
temp/
conversion.log
AGREED
*.zip
*.7z
*.zip.???
*.7z.???
*.pyc
*.log
*.tmp
session_*/
+36
View File
@@ -0,0 +1,36 @@
import os
from config import Config
from logger import logger
class AgreementManager:
@staticmethod
def is_agreed():
if Config.AGREED_TOS:
return True
return os.path.exists(Config.AGREEMENT_FILE)
@staticmethod
def record_agreement():
with open(Config.AGREEMENT_FILE, 'w') as f:
f.write("User agreed to terms of service on first use.\n")
logger.info("协议已同意,记录文件已创建")
@staticmethod
def get_agreement_text():
return """邮件自动化图片转换工具 - 使用协议
1. 本工具仅用于合法用途,用户需自行承担使用风险。
2. 用户同意不发送违法内容,不滥用服务。
3. 开发者保留随时停止服务的权利。
4. 本工具会记录转换日志,但不会泄露您的邮箱密码等敏感信息。
请回复本邮件,内容为 "I AGREE" 以确认同意协议。"""
@staticmethod
def request_agreement(recipient_email, mail_sender):
"""向管理员或用户发送协议请求邮件"""
subject = "【重要】请同意邮件转换工具使用协议"
body = AgreementManager.get_agreement_text()
# 发送邮件(需要mail_handler的实例,为避免循环依赖,在main中调用)
# 此处只返回内容和收件人,实际发送在main中完成
return recipient_email, subject, body
+46
View File
@@ -0,0 +1,46 @@
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# 邮件协议
MAIL_PROTOCOL = os.getenv("MAIL_PROTOCOL", "IMAP").upper()
IMAP_SERVER = os.getenv("IMAP_SERVER")
POP3_SERVER = os.getenv("POP3_SERVER")
SMTP_SERVER = os.getenv("SMTP_SERVER")
MAIL_USER = os.getenv("MAIL_USER")
MAIL_PASS = os.getenv("MAIL_PASS")
RECIPIENT_EMAIL = os.getenv("RECIPIENT_EMAIL")
SMTP_PORT = int(os.getenv("SMTP_PORT", 465))
IMAP_PORT = int(os.getenv("IMAP_PORT", 993))
POP3_PORT = int(os.getenv("POP3_PORT", 995))
# 工具配置
TEMP_DIR = os.getenv("TEMP_DIR", "./temp")
KEEP_TEMP_FILES = os.getenv("KEEP_TEMP_FILES", "False").lower() == "true"
SUPPORTED_INPUT_FORMATS = set(fmt.strip().lower() for fmt in os.getenv("SUPPORTED_INPUT_FORMATS", "jpg,jpeg,png,webp,gif,bmp,tiff").split(","))
SUPPORTED_OUTPUT_FORMATS = set(fmt.strip().lower() for fmt in os.getenv("SUPPORTED_OUTPUT_FORMATS", "jpg,jpeg,png,webp,gif,bmp,tiff").split(","))
DEFAULT_QUALITY = int(os.getenv("DEFAULT_QUALITY", 85))
# 新增功能
ALLOWED_DOMAINS = [d.strip() for d in os.getenv("ALLOWED_DOMAINS", "").split(",") if d.strip()]
SPLIT_VOLUME_SIZE_MB = int(os.getenv("SPLIT_VOLUME_SIZE_MB", 0))
MAX_ATTACHMENT_SIZE_MB = int(os.getenv("MAX_ATTACHMENT_SIZE_MB", 25))
MAX_EMAILS_PER_RUN = int(os.getenv("MAX_EMAILS_PER_RUN", 10))
KEEP_ORIGINAL_ATTACHMENT_COUNT = os.getenv("KEEP_ORIGINAL_ATTACHMENT_COUNT", "True").lower() == "true"
FLATTEN_OUTPUT = os.getenv("FLATTEN_OUTPUT", "False").lower() == "true"
# 协议同意
AGREED_TOS = os.getenv("AGREED_TOS", "False").lower() == "true"
AGREEMENT_FILE = os.getenv("AGREEMENT_FILE", "./AGREED")
ADMIN_EMAIL = os.getenv("ADMIN_EMAIL", MAIL_USER)
@classmethod
def validate(cls):
assert cls.MAIL_USER and cls.MAIL_PASS, "邮箱账号密码不能为空"
assert cls.SMTP_SERVER, "SMTP服务器不能为空"
if cls.MAIL_PROTOCOL == "IMAP":
assert cls.IMAP_SERVER, "IMAP服务器不能为空"
else:
assert cls.POP3_SERVER, "POP3服务器不能为空"
+50
View File
@@ -0,0 +1,50 @@
from PIL import Image
import os
from config import Config
from logger import logger
class ImageProcessor:
@staticmethod
def resize_by_ratio(img, ratio_width, ratio_height):
"""等比例缩放至目标比例(最长边适配)"""
w, h = img.size
target_ratio = ratio_width / ratio_height
current_ratio = w / h
if current_ratio > target_ratio:
new_w = int(h * target_ratio)
new_h = h
else:
new_w = w
new_h = int(w / target_ratio)
return img.resize((new_w, new_h), Image.Resampling.LANCZOS)
@staticmethod
def resize_by_pixel(img, target_width, target_height):
"""等比例缩放至不超过目标像素(保持比例,以较长边为准)"""
w, h = img.size
ratio_w = target_width / w
ratio_h = target_height / h
ratio = min(ratio_w, ratio_h)
new_w = int(w * ratio)
new_h = int(h * ratio)
return img.resize((new_w, new_h), Image.Resampling.LANCZOS)
@staticmethod
def convert_image(input_path, output_path, target_format, quality=None):
"""转换图片格式,可选缩放(尺寸解析在外部处理)"""
try:
with Image.open(input_path) as img:
# 转换模式(RGBA转RGB对于JPEG
if target_format.lower() in ['jpg', 'jpeg'] and img.mode in ('RGBA', 'P'):
rgb_img = Image.new('RGB', img.size, (255, 255, 255))
rgb_img.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = rgb_img
save_kwargs = {}
if target_format.lower() in ['jpg', 'jpeg', 'webp']:
save_kwargs['quality'] = quality if quality is not None else Config.DEFAULT_QUALITY
img.save(output_path, format=target_format.upper(), **save_kwargs)
logger.info(f"转换成功: {input_path} -> {output_path}")
return True
except Exception as e:
logger.error(f"转换失败 {input_path}: {e}")
return False
+46
View File
@@ -0,0 +1,46 @@
import logging
import re
from config import Config
# 敏感信息脱敏模式
SENSITIVE_PATTERNS = [
(re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'), '[EMAIL]'),
(re.compile(r'passw ord|password|pwd', re.I), '[PASSWORD]'),
(re.compile(r'\/[\/\w\-\.]+'), '[PATH]'), # 简单路径脱敏
]
def sanitize(text: str) -> str:
"""脱敏日志中的敏感信息"""
if not isinstance(text, str):
text = str(text)
for pattern, repl in SENSITIVE_PATTERNS:
text = pattern.sub(repl, text)
return text
class SanitizeLogger:
def __init__(self, name, log_file="conversion.log"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 文件处理器
fh = logging.FileHandler(log_file, encoding='utf-8')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
# 控制台处理器(可选)
ch = logging.StreamHandler()
ch.setFormatter(formatter)
self.logger.addHandler(ch)
def info(self, msg, *args, **kwargs):
self.logger.info(sanitize(msg), *args, **kwargs)
def error(self, msg, *args, **kwargs):
self.logger.error(sanitize(msg), *args, **kwargs)
def warning(self, msg, *args, **kwargs):
self.logger.warning(sanitize(msg), *args, **kwargs)
def debug(self, msg, *args, **kwargs):
self.logger.debug(sanitize(msg), *args, **kwargs)
logger = SanitizeLogger("MailConverter")
+201
View File
@@ -0,0 +1,201 @@
import imaplib
import poplib
import smtplib
import email
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os
import re
from email.utils import parseaddr
from config import Config
from logger import logger
class MailHandler:
def __init__(self):
self.user = Config.MAIL_USER
self.password = Config.MAIL_PASS
self.smtp_server = Config.SMTP_SERVER
self.smtp_port = Config.SMTP_PORT
def connect_inbox(self):
"""根据协议返回收信连接对象"""
if Config.MAIL_PROTOCOL == "IMAP":
conn = imaplib.IMAP4_SSL(Config.IMAP_SERVER, Config.IMAP_PORT)
conn.login(self.user, self.password)
conn.select('INBOX')
return conn, "IMAP"
else:
conn = poplib.POP3_SSL(Config.POP3_SERVER, Config.POP3_PORT)
conn.user(self.user)
conn.pass_(self.password)
return conn, "POP3"
def fetch_unread_emails(self):
"""获取未读邮件列表,返回列表 [ (msg_id, raw_email) ]"""
conn, proto = self.connect_inbox()
emails = []
try:
if proto == "IMAP":
# 搜索未读邮件
typ, data = conn.search(None, 'UNSEEN')
if typ != 'OK':
logger.error("IMAP搜索未读邮件失败")
return []
for num in data[0].split():
typ, msg_data = conn.fetch(num, '(RFC822)')
if typ != 'OK':
continue
raw_email = msg_data[0][1]
emails.append((num, raw_email))
if len(emails) >= Config.MAX_EMAILS_PER_RUN:
break
else: # POP3
# POP3 不支持未读标志,获取所有邮件列表
msg_count = len(conn.list()[1])
for i in range(msg_count, 0, -1):
raw_email = b'\n'.join(conn.retr(i)[1])
emails.append((i, raw_email))
if len(emails) >= Config.MAX_EMAILS_PER_RUN:
break
logger.info(f"获取到 {len(emails)} 封待处理邮件")
return emails
except Exception as e:
logger.error(f"收信失败: {e}")
return []
finally:
if proto == "IMAP":
conn.close()
conn.quit()
def mark_as_deleted(self, msg_id, proto_conn=None):
"""标记邮件为已删除(仅IMAP支持删除,POP3需额外实现)"""
if Config.MAIL_PROTOCOL == "IMAP" and proto_conn:
proto_conn.store(msg_id, '+FLAGS', '\\Deleted')
logger.info(f"标记邮件 {msg_id} 为已删除")
# POP3 删除需要重新连接并执行DELE,这里简化,留痕已足够
def is_domain_allowed(self, sender_email):
"""检查发件人域名是否在白名单中"""
if not Config.ALLOWED_DOMAINS:
return True
domain = sender_email.split('@')[-1].lower()
for allowed in Config.ALLOWED_DOMAINS:
if allowed.startswith('*.'):
# 子域名匹配
suffix = allowed[1:] # .example.com
if domain.endswith(suffix):
return True
elif domain == allowed.lower():
return True
return False
def download_attachments(self, raw_email, temp_dir):
"""解析邮件,下载附件(图片、压缩包、manifest.txt)到temp_dir,返回附件列表"""
msg = email.message_from_bytes(raw_email)
attachments = []
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
filename = part.get_filename()
if not filename:
continue
# 判断是否为支持的类型
ext = os.path.splitext(filename)[1].lower()
if ext in ['.txt'] and filename == 'manifest.txt':
pass
elif ext in ['.zip', '.7z']:
pass
elif ext[1:] in Config.SUPPORTED_INPUT_FORMATS:
pass
else:
continue
filepath = os.path.join(temp_dir, filename)
with open(filepath, 'wb') as f:
f.write(part.get_payload(decode=True))
attachments.append(filepath)
# 提取邮件正文作为指令备用
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
break
else:
body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
return attachments, body
def send_result(self, recipient, subject, attachments, split_volume_mb=0):
"""发送结果邮件,支持分卷附件"""
msg = MIMEMultipart()
msg['From'] = self.user
msg['To'] = recipient
msg['Subject'] = subject
text = MIMEText("图片转换完成,请查收附件。", 'plain', 'utf-8')
msg.attach(text)
# 处理分卷
final_attachments = []
for att in attachments:
if os.path.isdir(att):
# 如果是目录,打包成ZIP
zip_path = att + ".zip"
from zip_handler import ZipHandler
ZipHandler.pack_to_zip(att, zip_path)
att = zip_path
if split_volume_mb > 0 and os.path.getsize(att) > split_volume_mb * 1024 * 1024:
from zip_handler import ZipHandler
parts = ZipHandler.split_zip(att, split_volume_mb)
final_attachments.extend(parts)
else:
final_attachments.append(att)
# 检查每个附件大小是否超过SMTP限制
max_bytes = Config.MAX_ATTACHMENT_SIZE_MB * 1024 * 1024
for att in final_attachments:
if os.path.getsize(att) > max_bytes:
logger.warning(f"附件 {att} 超过限制,跳过发送")
continue
with open(att, 'rb') as f:
part = MIMEBase('application', 'octet-stream')
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(att)}"')
msg.attach(part)
try:
if Config.SMTP_PORT == 465:
with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server:
server.login(self.user, self.password)
server.send_message(msg)
else:
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.user, self.password)
server.send_message(msg)
logger.info(f"结果邮件已发送至 {recipient}")
except Exception as e:
logger.error(f"发送失败: {e}")
raise
def send_error_report(self, recipient, error_msg):
"""发送脱敏后的错误报告"""
subject = "图片转换工具 - 处理出错"
body = f"处理您的邮件时发生错误,错误信息(已脱敏):\n\n{error_msg}\n\n请检查指令格式或图片格式。"
msg = MIMEText(body, 'plain', 'utf-8')
msg['From'] = self.user
msg['To'] = recipient
msg['Subject'] = subject
try:
if Config.SMTP_PORT == 465:
with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server:
server.login(self.user, self.password)
server.send_message(msg)
else:
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.user, self.password)
server.send_message(msg)
except Exception as e:
logger.error(f"错误报告发送失败: {e}")
+203
View File
@@ -0,0 +1,203 @@
import os
import shutil
from config import Config
from logger import logger
from temp_manager import TempManager
from mail_handler import MailHandler
from manifest_parser import ManifestParser
from image_processor import ImageProcessor
from zip_handler import ZipHandler
from agreement import AgreementManager
class MailConverter:
def __init__(self):
self.temp_mgr = TempManager()
self.mail = MailHandler()
self.img_proc = ImageProcessor()
self.zip_proc = ZipHandler()
def run(self):
# 清理遗留临时文件
TempManager.cleanup_stale()
# 检查协议同意
if not AgreementManager.is_agreed():
logger.info("首次使用,需要用户同意协议")
# 发送协议请求给管理员(或发件人?通常发给MAIL_USER)
recipient, subject, body = AgreementManager.request_agreement(Config.ADMIN_EMAIL, self.mail)
self.mail.send_result(recipient, subject, []) # 发送纯文本邮件
# 但这里我们还需监听回复?为简化,要求用户在.env中设置AGREED_TOS=True,或程序检测特定回复邮件
# 简单起见,输出日志提示管理员手动确认
logger.info(f"已向 {recipient} 发送协议同意请求,请在 .env 中设置 AGREED_TOS=True 后重新运行")
return
logger.info("协议已同意,开始处理邮件")
# 初始化临时会话目录
session_dir = self.temp_mgr.init_session()
# 获取未读邮件
emails = self.mail.fetch_unread_emails()
if not emails:
logger.info("没有待处理邮件")
self.temp_mgr.cleanup()
return
for msg_id, raw_email in emails:
# 解析发件人
msg = email.message_from_bytes(raw_email)
sender = msg.get('From')
sender_email = parseaddr(sender)[1]
if not self.mail.is_domain_allowed(sender_email):
logger.warning(f"域名不在白名单: {sender_email},跳过处理")
continue
# 下载附件和正文
attachments, body = self.mail.download_attachments(raw_email, session_dir)
# 合并 manifest.txt 内容
manifest_path = None
for att in attachments:
if os.path.basename(att) == 'manifest.txt':
manifest_path = att
break
if manifest_path:
with open(manifest_path, 'r', encoding='utf-8') as f:
rule_content = f.read()
else:
rule_content = body # 使用邮件正文
if not rule_content.strip():
logger.warning("无转换规则,跳过")
continue
# 解析规则
parser = ManifestParser()
tasks = parser.parse(rule_content)
# 处理每个附件(图片或压缩包)
result_items = [] # 每个元素为 (output_path_or_dir, original_attachment_name)
for att in attachments:
if os.path.basename(att) == 'manifest.txt':
continue
if ZipHandler.is_archive(att):
# 处理压缩包
output_dir = self.process_archive(att, tasks, session_dir)
if output_dir:
# 压缩包处理结果可能是一个目录(内含多个图片结果),或者单个文件?
# 为保持原样式,如果输出目录内只有一个文件,则返回该文件;否则打包目录
result_items.append((output_dir, os.path.basename(att)))
else:
# 普通图片附件
output_dir = self.process_single_image(att, tasks, session_dir)
if output_dir:
result_items.append((output_dir, os.path.basename(att)))
# 构建返回附件列表(根据原样式规则)
return_attachments = []
for output, original_name in result_items:
if Config.FLATTEN_OUTPUT:
# 平铺所有文件
if os.path.isdir(output):
for f in os.listdir(output):
return_attachments.append(os.path.join(output, f))
else:
return_attachments.append(output)
else:
# 保持原附件数量:如果输出是一个目录,且目录内文件数==1,则直接返回该文件;否则打包为ZIP
if os.path.isdir(output):
files = [f for f in os.listdir(output) if os.path.isfile(os.path.join(output, f))]
if len(files) == 1:
return_attachments.append(os.path.join(output, files[0]))
else:
# 打包目录
zip_path = os.path.join(session_dir, f"{os.path.splitext(original_name)[0]}_result.zip")
ZipHandler.pack_to_zip(output, zip_path)
return_attachments.append(zip_path)
else:
return_attachments.append(output)
# 发送结果
subject = f"转换结果: {msg.get('Subject', '无主题')}"
try:
self.mail.send_result(sender_email, subject, return_attachments, Config.SPLIT_VOLUME_SIZE_MB)
# 删除原邮件(留痕已在日志中)
# 对于IMAP,我们可以删除原邮件
# 注意:需要重新连接,因为之前的连接已关闭。简单起见,不在这里实现删除,留痕已足够
# 留痕记录
logger.info(f"成功处理邮件 from {sender_email}, msg_id={msg_id}")
except Exception as e:
error_msg = f"发送结果失败: {str(e)}"
logger.error(error_msg)
self.mail.send_error_report(sender_email, error_msg)
# 清理会话临时文件(若配置)
if not Config.KEEP_TEMP_FILES:
self.temp_mgr.cleanup()
def process_single_image(self, img_path, tasks, base_dir):
"""处理单张图片,返回输出目录(存放所有转换结果)"""
# 根据tasks匹配规则,生成输出文件列表
output_dir = os.path.join(base_dir, f"out_{os.path.basename(img_path)}")
os.makedirs(output_dir, exist_ok=True)
# 简化匹配逻辑:只实现全局规则和文件名匹配(实际可扩展)
# 此处需要完整实现优先级匹配,为简洁,示例只做基础
# 实际项目中应实现完整的规则引擎,这里模拟
filename = os.path.splitext(os.path.basename(img_path))[0]
# 找是否有匹配的from.name规则
matched = False
for task in tasks:
if task['type'] == 'by_name' and task['src_name'] == filename:
for fmt, size in task['targets']:
out_path = os.path.join(output_dir, f"{filename}.{fmt}")
# 缩放
img = Image.open(img_path)
if size:
if size[0] == 'ratio':
img = ImageProcessor.resize_by_ratio(img, size[1], size[2])
elif size[0] == 'pixel':
img = ImageProcessor.resize_by_pixel(img, size[1], size[2])
img.save(out_path, format=fmt.upper(), quality=Config.DEFAULT_QUALITY)
else:
ImageProcessor.convert_image(img_path, out_path, fmt)
matched = True
break
if not matched:
# 全局规则
for task in tasks:
if task['type'] == 'global':
for fmt, size in task['targets']:
out_path = os.path.join(output_dir, f"{filename}.{fmt}")
ImageProcessor.convert_image(img_path, out_path, fmt)
matched = True
break
if not matched:
# 无规则,原样复制
shutil.copy(img_path, os.path.join(output_dir, os.path.basename(img_path)))
return output_dir
def process_archive(self, archive_path, tasks, base_dir):
"""处理压缩包,返回输出目录"""
extract_dir = os.path.join(base_dir, f"ext_{os.path.basename(archive_path)}")
os.makedirs(extract_dir, exist_ok=True)
ext = os.path.splitext(archive_path)[1].lower()
if ext == '.zip':
ZipHandler.extract_zip(archive_path, extract_dir)
elif ext == '.7z':
ZipHandler.extract_7z(archive_path, extract_dir)
else:
return None
# 遍历提取出的所有图片,应用规则(规则作用域可能指定该压缩包)
output_root = os.path.join(base_dir, f"out_{os.path.basename(archive_path)}")
os.makedirs(output_root, exist_ok=True)
for root, _, files in os.walk(extract_dir):
for file in files:
if file.split('.')[-1].lower() in Config.SUPPORTED_INPUT_FORMATS:
img_path = os.path.join(root, file)
# 对每个图片应用规则(同上,简化)
out_subdir = self.process_single_image(img_path, tasks, output_root)
# 合并输出目录
for f in os.listdir(out_subdir):
shutil.move(os.path.join(out_subdir, f), output_root)
return output_root
if __name__ == "__main__":
converter = MailConverter()
converter.run()
+141
View File
@@ -0,0 +1,141 @@
import re
from typing import List, Dict, Tuple, Optional
class ManifestParser:
def __init__(self):
self.tasks = [] # 存储解析后的任务指令
def parse(self, content: str):
"""解析 manifest.txt 或邮件正文,生成内部指令结构"""
lines = [line.strip() for line in content.splitlines() if line.strip() and not line.strip().startswith('#')]
self.tasks = []
current_scope = None # 当前压缩包作用域,None表示根目录
i = 0
while i < len(lines):
line = lines[i]
# 处理压缩包作用域开始
if line.startswith('in ') and line.endswith(':'):
archive_name = line[3:-1].strip()
current_scope = archive_name
i += 1
continue
elif line == 'in .':
current_scope = None
i += 1
continue
# 检查行内是否包含 "in xxx" 临时作用域
inline_archive = None
if ' in ' in line:
parts = line.split(' in ')
cmd_part = parts[0].strip()
archive_part = parts[1].strip()
# 如果archive_part还有更多内容(如"to: xxx"),需重新组合?假设格式严格:"指令 in archive.zip"
if ' ' in archive_part:
# 复杂情况,暂不支持,要求用户使用单独行
pass
else:
inline_archive = archive_part
line = cmd_part
# 解析指令
if line.startswith('to:'):
# 全局默认
targets = self._parse_targets(line[3:])
self.tasks.append({
'type': 'global',
'targets': targets,
'scope': current_scope,
'inline_archive': inline_archive
})
elif line.startswith('from:'):
# 按格式批量转换
match = re.match(r'from:\s*(\S+)\s+to:\s*(.+)', line)
if match:
src_format = match.group(1).lower()
targets = self._parse_targets(match.group(2))
self.tasks.append({
'type': 'by_format',
'src_format': src_format,
'targets': targets,
'scope': current_scope,
'inline_archive': inline_archive
})
elif line.startswith('from.name:'):
# 精准文件名匹配
rest = line[10:].strip()
if ' to.name:' in rest:
# 重命名
parts = rest.split(' to.name:')
src_name = parts[0].strip()
dst_spec = parts[1].strip()
# 解析 dst_spec: "newname.ext(size)"
dst_name, size = self._parse_dst_with_size(dst_spec)
self.tasks.append({
'type': 'rename',
'src_name': src_name,
'dst_name': dst_name,
'size': size,
'scope': current_scope,
'inline_archive': inline_archive
})
else:
# to: 格式
parts = rest.split(' to:')
src_name = parts[0].strip()
targets = self._parse_targets(parts[1])
self.tasks.append({
'type': 'by_name',
'src_name': src_name,
'targets': targets,
'scope': current_scope,
'inline_archive': inline_archive
})
elif line.startswith('to.name:'):
# 多版本导出中的 to.name 行,需要结合前面的 from.name 处理
# 我们在主流程中采用累积方式,这里简单处理,实际在main中会连续读取
# 为简化,不在此处实现多行组合,而是在主程序中按行处理上下文
# 让主程序负责收集连续的 to.name
pass
else:
logger.warning(f"无法识别的指令: {line}")
i += 1
return self.tasks
def _parse_targets(self, target_str: str) -> List[Tuple[str, Optional[Tuple]]]:
"""解析 "webp, png(16:9), jpg(800x600)" -> [('webp',None), ('png',('16:9')), ('jpg',('800x600'))]"""
items = [t.strip() for t in target_str.split(',')]
result = []
for item in items:
size = None
if '(' in item and item.endswith(')'):
fmt, size_part = item.split('(', 1)
size_str = size_part.rstrip(')')
if ':' in size_str:
# 比例
w_ratio, h_ratio = map(int, size_str.split(':'))
size = ('ratio', w_ratio, h_ratio)
elif 'x' in size_str:
w, h = map(int, size_str.split('x'))
size = ('pixel', w, h)
result.append((fmt.lower(), size))
else:
result.append((item.lower(), None))
return result
def _parse_dst_with_size(self, dst_spec: str) -> Tuple[str, Optional[Tuple]]:
"""解析 "banner.webp(800x600)" -> ('banner.webp', ('pixel',800,600))"""
size = None
if '(' in dst_spec and dst_spec.endswith(')'):
name, size_part = dst_spec.split('(', 1)
size_str = size_part.rstrip(')')
if ':' in size_str:
w, h = map(int, size_str.split(':'))
size = ('ratio', w, h)
elif 'x' in size_str:
w, h = map(int, size_str.split('x'))
size = ('pixel', w, h)
return name.strip(), size
else:
return dst_spec.strip(), None
+3
View File
@@ -0,0 +1,3 @@
Pillow>=10.0.0
python-dotenv>=1.0.0
py7zr>=0.20.0
+44
View File
@@ -0,0 +1,44 @@
import os
import shutil
import uuid
from config import Config
from logger import logger
class TempManager:
def __init__(self):
self.base_dir = Config.TEMP_DIR
self.session_dir = None
def init_session(self):
"""每次运行创建一个新的会话临时目录"""
if not os.path.exists(self.base_dir):
os.makedirs(self.base_dir)
self.session_dir = os.path.join(self.base_dir, f"session_{uuid.uuid4().hex[:8]}")
os.makedirs(self.session_dir, exist_ok=True)
logger.info(f"创建临时会话目录: {self.session_dir}")
return self.session_dir
def get_subdir(self, name):
"""在会话目录下创建子目录"""
path = os.path.join(self.session_dir, name)
os.makedirs(path, exist_ok=True)
return path
def cleanup(self):
"""清理本次会话临时目录"""
if self.session_dir and os.path.exists(self.session_dir):
shutil.rmtree(self.session_dir, ignore_errors=True)
logger.info(f"清理临时目录: {self.session_dir}")
@staticmethod
def cleanup_stale():
"""启动时清理所有遗留临时目录(超过1天)"""
if os.path.exists(Config.TEMP_DIR):
for item in os.listdir(Config.TEMP_DIR):
path = os.path.join(Config.TEMP_DIR, item)
if os.path.isdir(path) and item.startswith("session_"):
try:
shutil.rmtree(path)
logger.info(f"清理遗留目录: {path}")
except Exception as e:
logger.warning(f"清理失败 {path}: {e}")
+71
View File
@@ -0,0 +1,71 @@
import os
import zipfile
import py7zr
import shutil
from config import Config
from logger import logger
class ZipHandler:
@staticmethod
def extract_zip(zip_path, extract_to):
"""解压ZIP文件"""
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(extract_to)
logger.info(f"解压ZIP: {zip_path} -> {extract_to}")
@staticmethod
def extract_7z(seven_z_path, extract_to):
"""解压7z文件"""
with py7zr.SevenZipFile(seven_z_path, mode='r') as sz:
sz.extractall(extract_to)
logger.info(f"解压7z: {seven_z_path} -> {extract_to}")
@staticmethod
def pack_to_zip(source_dir_or_file, output_zip_path):
"""将文件或目录打包为ZIP"""
with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
if os.path.isfile(source_dir_or_file):
zf.write(source_dir_or_file, arcname=os.path.basename(source_dir_or_file))
else:
for root, _, files in os.walk(source_dir_or_file):
for file in files:
full_path = os.path.join(root, file)
arcname = os.path.relpath(full_path, start=os.path.dirname(source_dir_or_file))
zf.write(full_path, arcname)
logger.info(f"打包为ZIP: {output_zip_path}")
@staticmethod
def split_zip(zip_path, volume_size_mb):
"""将ZIP文件分卷(简单实现:使用split命令或手动分割)
由于Python标准库不支持直接生成多卷ZIP,这里采用事后分割:
生成一个大ZIP,然后按字节分割成 .zip.001, .zip.002 ...
注意:解压时需要合并 cat file.zip.* > file.zip 或使用专用工具。
"""
if volume_size_mb <= 0:
return [zip_path]
volume_size = volume_size_mb * 1024 * 1024
file_size = os.path.getsize(zip_path)
if file_size <= volume_size:
return [zip_path]
base_name = zip_path
part_num = 1
parts = []
with open(zip_path, 'rb') as f:
while True:
chunk = f.read(volume_size)
if not chunk:
break
part_path = f"{base_name}.{part_num:03d}"
with open(part_path, 'wb') as pf:
pf.write(chunk)
parts.append(part_path)
part_num += 1
# 删除原ZIP
os.remove(zip_path)
logger.info(f"分卷压缩完成: {parts}")
return parts
@staticmethod
def is_archive(file_path):
ext = os.path.splitext(file_path)[1].lower()
return ext in ['.zip', '.7z']