71 lines
2.7 KiB
Python
71 lines
2.7 KiB
Python
|
|
import os
|
||
|
|
import zipfile
|
||
|
|
import py7zr
|
||
|
|
import shutil
|
||
|
|
from config import Config
|
||
|
|
from logger import logger
|
||
|
|
|
||
|
|
class ZipHandler:
|
||
|
|
@staticmethod
|
||
|
|
def extract_zip(zip_path, extract_to):
|
||
|
|
"""解压ZIP文件"""
|
||
|
|
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||
|
|
zf.extractall(extract_to)
|
||
|
|
logger.info(f"解压ZIP: {zip_path} -> {extract_to}")
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def extract_7z(seven_z_path, extract_to):
|
||
|
|
"""解压7z文件"""
|
||
|
|
with py7zr.SevenZipFile(seven_z_path, mode='r') as sz:
|
||
|
|
sz.extractall(extract_to)
|
||
|
|
logger.info(f"解压7z: {seven_z_path} -> {extract_to}")
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def pack_to_zip(source_dir_or_file, output_zip_path):
|
||
|
|
"""将文件或目录打包为ZIP"""
|
||
|
|
with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||
|
|
if os.path.isfile(source_dir_or_file):
|
||
|
|
zf.write(source_dir_or_file, arcname=os.path.basename(source_dir_or_file))
|
||
|
|
else:
|
||
|
|
for root, _, files in os.walk(source_dir_or_file):
|
||
|
|
for file in files:
|
||
|
|
full_path = os.path.join(root, file)
|
||
|
|
arcname = os.path.relpath(full_path, start=os.path.dirname(source_dir_or_file))
|
||
|
|
zf.write(full_path, arcname)
|
||
|
|
logger.info(f"打包为ZIP: {output_zip_path}")
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def split_zip(zip_path, volume_size_mb):
|
||
|
|
"""将ZIP文件分卷(简单实现:使用split命令或手动分割)
|
||
|
|
由于Python标准库不支持直接生成多卷ZIP,这里采用事后分割:
|
||
|
|
生成一个大ZIP,然后按字节分割成 .zip.001, .zip.002 ...
|
||
|
|
注意:解压时需要合并 cat file.zip.* > file.zip 或使用专用工具。
|
||
|
|
"""
|
||
|
|
if volume_size_mb <= 0:
|
||
|
|
return [zip_path]
|
||
|
|
volume_size = volume_size_mb * 1024 * 1024
|
||
|
|
file_size = os.path.getsize(zip_path)
|
||
|
|
if file_size <= volume_size:
|
||
|
|
return [zip_path]
|
||
|
|
base_name = zip_path
|
||
|
|
part_num = 1
|
||
|
|
parts = []
|
||
|
|
with open(zip_path, 'rb') as f:
|
||
|
|
while True:
|
||
|
|
chunk = f.read(volume_size)
|
||
|
|
if not chunk:
|
||
|
|
break
|
||
|
|
part_path = f"{base_name}.{part_num:03d}"
|
||
|
|
with open(part_path, 'wb') as pf:
|
||
|
|
pf.write(chunk)
|
||
|
|
parts.append(part_path)
|
||
|
|
part_num += 1
|
||
|
|
# 删除原ZIP
|
||
|
|
os.remove(zip_path)
|
||
|
|
logger.info(f"分卷压缩完成: {parts}")
|
||
|
|
return parts
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def is_archive(file_path):
|
||
|
|
ext = os.path.splitext(file_path)[1].lower()
|
||
|
|
return ext in ['.zip', '.7z']
|