Files
mailc/manifest_parser.py
T
2026-04-18 23:58:27 +08:00

183 lines
7.9 KiB
Python

import re
from typing import List, Dict, Tuple, Optional
from logger import logger
class ManifestParser:
def __init__(self):
self.tasks = []
def parse(self, content: str) -> List[Dict]:
"""
解析 manifest.txt 或邮件正文,返回任务列表。
支持:
- 全局默认: to: fmt(size), fmt...
- 批量格式: from: fmt to: fmt(size)...
- 精准文件名: from.name: name to: fmt(size)...
- 重命名(多版本): from.name: name 后跟多行 to.name: newname.ext(size)
- 压缩包作用域: in archive.zip: / in .
- 临时作用域: 指令 in archive.zip
"""
lines = [line.strip() for line in content.splitlines() if line.strip() and not line.strip().startswith('#')]
self.tasks = []
current_scope = None # 当前压缩包作用域
pending_rename = None # 等待多行 to.name 的 from.name 任务
i = 0
while i < len(lines):
line = lines[i]
# 处理压缩包作用域开始
if line.startswith('in ') and line.endswith(':'):
archive_name = line[3:-1].strip()
current_scope = archive_name
i += 1
continue
elif line == 'in .':
current_scope = None
i += 1
continue
# 检查行内临时作用域
inline_archive = None
if ' in ' in line and not line.startswith('in '):
parts = line.split(' in ')
cmd_part = parts[0].strip()
archive_part = parts[1].strip()
# 确保 archive_part 是单纯的文件名(不含空格)
if ' ' not in archive_part:
inline_archive = archive_part
line = cmd_part
# 解析指令
if line.startswith('to:') and not line.startswith('to.name:'):
# 全局默认
targets = self._parse_targets(line[3:])
self.tasks.append({
'type': 'global',
'targets': targets,
'scope': current_scope,
'inline_archive': inline_archive
})
elif line.startswith('from:'):
# 按格式批量转换
match = re.match(r'from:\s*(\S+)\s+to:\s*(.+)', line)
if match:
src_format = match.group(1).lower()
targets = self._parse_targets(match.group(2))
self.tasks.append({
'type': 'by_format',
'src_format': src_format,
'targets': targets,
'scope': current_scope,
'inline_archive': inline_archive
})
elif line.startswith('from.name:'):
# 精准文件名匹配,可能后跟多行 to.name:
rest = line[10:].strip()
if ' to.name:' in rest:
# 单行重命名
parts = rest.split(' to.name:')
src_name = parts[0].strip()
dst_spec = parts[1].strip()
dst_name, size = self._parse_dst_with_size(dst_spec)
self.tasks.append({
'type': 'rename',
'src_name': src_name,
'dst_name': dst_name,
'size': size,
'scope': current_scope,
'inline_archive': inline_archive
})
else:
# 可能后跟多行 to.name:(多版本导出)
src_name = rest.split(' to:')[0].strip() if ' to:' in rest else rest
# 检查下一行是否以 to.name: 开头
if i + 1 < len(lines) and lines[i+1].startswith('to.name:'):
pending_rename = {
'src_name': src_name,
'targets': [],
'scope': current_scope,
'inline_archive': inline_archive
}
# 跳过当前 from.name 行,后续循环处理 to.name 行
i += 1
continue
elif ' to:' in rest:
# 普通精准转换
parts = rest.split(' to:')
src_name = parts[0].strip()
targets = self._parse_targets(parts[1])
self.tasks.append({
'type': 'by_name',
'src_name': src_name,
'targets': targets,
'scope': current_scope,
'inline_archive': inline_archive
})
else:
# 只有 from.name: xxx,没有规则,无效
logger.warning(f"无效的 from.name 指令: {line}")
elif line.startswith('to.name:') and pending_rename:
# 多版本导出中的一行
dst_spec = line[8:].strip()
dst_name, size = self._parse_dst_with_size(dst_spec)
pending_rename['targets'].append({
'dst_name': dst_name,
'size': size
})
# 检查下一行是否还是 to.name:
if i + 1 >= len(lines) or not lines[i+1].startswith('to.name:'):
# 结束,将 pending_rename 转为多个 rename 任务
for tgt in pending_rename['targets']:
self.tasks.append({
'type': 'rename',
'src_name': pending_rename['src_name'],
'dst_name': tgt['dst_name'],
'size': tgt['size'],
'scope': pending_rename['scope'],
'inline_archive': pending_rename['inline_archive']
})
pending_rename = None
else:
logger.warning(f"无法识别的指令: {line}")
i += 1
return self.tasks
def _parse_targets(self, target_str: str) -> List[Tuple[str, Optional[Tuple]]]:
"""解析 "webp, png(16:9), jpg(800x600)" -> [('webp',None), ('png',('ratio',16,9)), ...]"""
items = [t.strip() for t in target_str.split(',')]
result = []
for item in items:
size = None
if '(' in item and item.endswith(')'):
fmt, size_part = item.split('(', 1)
size_str = size_part.rstrip(')')
if ':' in size_str:
w_ratio, h_ratio = map(int, size_str.split(':'))
size = ('ratio', w_ratio, h_ratio)
elif 'x' in size_str:
w, h = map(int, size_str.split('x'))
size = ('pixel', w, h)
result.append((fmt.lower(), size))
else:
result.append((item.lower(), None))
return result
def _parse_dst_with_size(self, dst_spec: str) -> Tuple[str, Optional[Tuple]]:
"""解析 "banner.webp(800x600)" -> ('banner.webp', ('pixel',800,600))"""
size = None
if '(' in dst_spec and dst_spec.endswith(')'):
name, size_part = dst_spec.split('(', 1)
size_str = size_part.rstrip(')')
if ':' in size_str:
w, h = map(int, size_str.split(':'))
size = ('ratio', w, h)
elif 'x' in size_str:
w, h = map(int, size_str.split('x'))
size = ('pixel', w, h)
return name.strip(), size
else:
return dst_spec.strip(), None