import re from typing import List, Dict, Tuple, Optional from logger import logger class ManifestParser: def __init__(self): self.tasks = [] def parse(self, content: str) -> List[Dict]: """ 解析 manifest.txt 或邮件正文,返回任务列表。 支持: - 全局默认: to: fmt(size), fmt... - 批量格式: from: fmt to: fmt(size)... - 精准文件名: from.name: name to: fmt(size)... - 重命名(多版本): from.name: name 后跟多行 to.name: newname.ext(size) - 压缩包作用域: in archive.zip: / in . - 临时作用域: 指令 in archive.zip """ lines = [line.strip() for line in content.splitlines() if line.strip() and not line.strip().startswith('#')] self.tasks = [] current_scope = None # 当前压缩包作用域 pending_rename = None # 等待多行 to.name 的 from.name 任务 i = 0 while i < len(lines): line = lines[i] # 处理压缩包作用域开始 if line.startswith('in ') and line.endswith(':'): archive_name = line[3:-1].strip() current_scope = archive_name i += 1 continue elif line == 'in .': current_scope = None i += 1 continue # 检查行内临时作用域 inline_archive = None if ' in ' in line and not line.startswith('in '): parts = line.split(' in ') cmd_part = parts[0].strip() archive_part = parts[1].strip() # 确保 archive_part 是单纯的文件名(不含空格) if ' ' not in archive_part: inline_archive = archive_part line = cmd_part # 解析指令 if line.startswith('to:') and not line.startswith('to.name:'): # 全局默认 targets = self._parse_targets(line[3:]) self.tasks.append({ 'type': 'global', 'targets': targets, 'scope': current_scope, 'inline_archive': inline_archive }) elif line.startswith('from:'): # 按格式批量转换 match = re.match(r'from:\s*(\S+)\s+to:\s*(.+)', line) if match: src_format = match.group(1).lower() targets = self._parse_targets(match.group(2)) self.tasks.append({ 'type': 'by_format', 'src_format': src_format, 'targets': targets, 'scope': current_scope, 'inline_archive': inline_archive }) elif line.startswith('from.name:'): # 精准文件名匹配,可能后跟多行 to.name: rest = line[10:].strip() if ' to.name:' in rest: # 单行重命名 parts = rest.split(' to.name:') src_name = parts[0].strip() dst_spec = parts[1].strip() dst_name, size = self._parse_dst_with_size(dst_spec) self.tasks.append({ 'type': 'rename', 'src_name': src_name, 'dst_name': dst_name, 'size': size, 'scope': current_scope, 'inline_archive': inline_archive }) else: # 可能后跟多行 to.name:(多版本导出) src_name = rest.split(' to:')[0].strip() if ' to:' in rest else rest # 检查下一行是否以 to.name: 开头 if i + 1 < len(lines) and lines[i+1].startswith('to.name:'): pending_rename = { 'src_name': src_name, 'targets': [], 'scope': current_scope, 'inline_archive': inline_archive } # 跳过当前 from.name 行,后续循环处理 to.name 行 i += 1 continue elif ' to:' in rest: # 普通精准转换 parts = rest.split(' to:') src_name = parts[0].strip() targets = self._parse_targets(parts[1]) self.tasks.append({ 'type': 'by_name', 'src_name': src_name, 'targets': targets, 'scope': current_scope, 'inline_archive': inline_archive }) else: # 只有 from.name: xxx,没有规则,无效 logger.warning(f"无效的 from.name 指令: {line}") elif line.startswith('to.name:') and pending_rename: # 多版本导出中的一行 dst_spec = line[8:].strip() dst_name, size = self._parse_dst_with_size(dst_spec) pending_rename['targets'].append({ 'dst_name': dst_name, 'size': size }) # 检查下一行是否还是 to.name: if i + 1 >= len(lines) or not lines[i+1].startswith('to.name:'): # 结束,将 pending_rename 转为多个 rename 任务 for tgt in pending_rename['targets']: self.tasks.append({ 'type': 'rename', 'src_name': pending_rename['src_name'], 'dst_name': tgt['dst_name'], 'size': tgt['size'], 'scope': pending_rename['scope'], 'inline_archive': pending_rename['inline_archive'] }) pending_rename = None else: logger.warning(f"无法识别的指令: {line}") i += 1 return self.tasks def _parse_targets(self, target_str: str) -> List[Tuple[str, Optional[Tuple]]]: """解析 "webp, png(16:9), jpg(800x600)" -> [('webp',None), ('png',('ratio',16,9)), ...]""" items = [t.strip() for t in target_str.split(',')] result = [] for item in items: size = None if '(' in item and item.endswith(')'): fmt, size_part = item.split('(', 1) size_str = size_part.rstrip(')') if ':' in size_str: w_ratio, h_ratio = map(int, size_str.split(':')) size = ('ratio', w_ratio, h_ratio) elif 'x' in size_str: w, h = map(int, size_str.split('x')) size = ('pixel', w, h) result.append((fmt.lower(), size)) else: result.append((item.lower(), None)) return result def _parse_dst_with_size(self, dst_spec: str) -> Tuple[str, Optional[Tuple]]: """解析 "banner.webp(800x600)" -> ('banner.webp', ('pixel',800,600))""" size = None if '(' in dst_spec and dst_spec.endswith(')'): name, size_part = dst_spec.split('(', 1) size_str = size_part.rstrip(')') if ':' in size_str: w, h = map(int, size_str.split(':')) size = ('ratio', w, h) elif 'x' in size_str: w, h = map(int, size_str.split('x')) size = ('pixel', w, h) return name.strip(), size else: return dst_spec.strip(), None