
Python实现断点续传:从文件分片到状态持久化的实战设计
你好,我是源码库的博主。今天我们来聊聊一个在文件传输中非常实用,但实现起来又有点“坑”的功能——断点续传。相信大家都遇到过下载大文件时网络突然中断,又得从头开始的抓狂经历。最近我在为一个内部文件管理系统设计上传模块时,就深度实践了Python下的断点续传。整个过程就像在搭积木,核心就是两块:文件分片和传输状态持久化。下面,我就把这次实战的设计思路、关键代码和踩过的坑,毫无保留地分享给你。
一、核心思路:把大象放进冰箱分三步
断点续传的逻辑并不复杂,可以概括为三个步骤:
- 分片:将一个大文件切割成多个固定大小(例如1MB或5MB)的小块(Chunk)。
- 记录:持久化记录每个分片的传输状态(成功、失败、进行中)。
- 续传:中断后重新发起请求时,先读取状态记录,只上传那些失败或未上传的分片。
这个设计的关键在于,服务端必须支持“分片上传”和“分片合并”的API,并且客户端(我们的Python程序)要能可靠地保存进度。
二、实战第一步:设计文件分片与上传
我们先来实现文件的分片逻辑。这里的关键是使用文件的seek和read方法,按指定大小读取字节。
import os
import hashlib
def split_file(file_path, chunk_size=1024*1024): # 默认1MB一片
"""将文件分片,返回分片信息列表"""
file_size = os.path.getsize(file_path)
chunks = []
with open(file_path, 'rb') as f:
index = 0
while True:
start = index * chunk_size
if start >= file_size:
break
# 定位到分片开始位置
f.seek(start)
# 读取分片数据
data = f.read(chunk_size)
if not data:
break
# 为每个分片生成唯一标识(这里用MD5,实际可用更简单方法)
chunk_hash = hashlib.md5(data).hexdigest()
chunk_info = {
'index': index,
'start': start,
'size': len(data),
'hash': chunk_hash,
'status': 'pending' # pending, uploading, success, failed
}
chunks.append(chunk_info)
index += 1
return chunks
# 模拟上传单个分片的函数
def upload_chunk(server_url, file_path, chunk_info):
"""上传一个分片到服务器"""
with open(file_path, 'rb') as f:
f.seek(chunk_info['start'])
data = f.read(chunk_info['size'])
# 这里应该使用requests.post等库将data发送到server_url
# 例如:requests.post(server_url, files={'chunk': data}, data={'chunk_info': chunk_info})
print(f"模拟上传分片 {chunk_info['index']}, 大小 {chunk_info['size']} 字节")
# 假设上传成功
return True
踩坑提示1:分片大小需要权衡。太小会导致请求次数过多,HTTP头开销大;太大会失去断点续传的灵活性,且单次失败代价高。我通常选择1-5MB。
三、实战核心:传输状态的持久化设计
这是断点续传的“大脑”。我们需要把分片信息、整体进度保存下来,即使程序重启也能恢复。我考察了JSON文件、SQLite数据库和Redis几种方案:
- JSON文件:简单直观,适合轻量级、单任务场景。但并发写入需加锁。
- SQLite:轻量级数据库,支持简单事务和查询,是我本次选择的方案,因为它平衡了易用性和可靠性。
- Redis:性能极高,适合高并发、分布式场景,但需要额外服务支撑。
下面我用SQLite来实现:
import sqlite3
import json
from datetime import datetime
class TransferStateManager:
def __init__(self, db_path='transfer_state.db'):
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
"""初始化数据库表"""
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS upload_tasks (
task_id TEXT PRIMARY KEY,
file_path TEXT NOT NULL,
file_size INTEGER,
total_chunks INTEGER,
uploaded_chunks INTEGER DEFAULT 0,
chunk_info TEXT, -- 存储所有分片信息的JSON
status TEXT, -- 'init', 'uploading', 'paused', 'completed', 'error'
created_at TIMESTAMP,
updated_at TIMESTAMP
)
''')
self.conn.commit()
def create_task(self, task_id, file_path, chunks):
"""创建一个新的上传任务记录"""
file_size = os.path.getsize(file_path)
chunk_info_json = json.dumps(chunks)
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO upload_tasks
(task_id, file_path, file_size, total_chunks, chunk_info, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (task_id, file_path, file_size, len(chunks), chunk_info_json, 'init',
datetime.now(), datetime.now()))
self.conn.commit()
return task_id
def update_chunk_status(self, task_id, chunk_index, status):
"""更新单个分片的状态"""
cursor = self.conn.cursor()
cursor.execute('SELECT chunk_info FROM upload_tasks WHERE task_id=?', (task_id,))
row = cursor.fetchone()
if row:
chunks = json.loads(row[0])
for chunk in chunks:
if chunk['index'] == chunk_index:
chunk['status'] = status
break
# 更新数据库
cursor.execute('''
UPDATE upload_tasks
SET chunk_info=?, updated_at=?
WHERE task_id=?
''', (json.dumps(chunks), datetime.now(), task_id))
self.conn.commit()
def get_pending_chunks(self, task_id):
"""获取所有未完成(pending/failed)的分片"""
cursor = self.conn.cursor()
cursor.execute('SELECT chunk_info FROM upload_tasks WHERE task_id=?', (task_id,))
row = cursor.fetchone()
if row:
all_chunks = json.loads(row[0])
return [chunk for chunk in all_chunks if chunk['status'] in ('pending', 'failed')]
return []
def close(self):
self.conn.close()
踩坑提示2:状态更新一定要及时且原子。比如一个分片上传成功,必须立即更新数据库,并考虑网络异常下的状态回滚(例如,标记为`uploading`,成功后改为`success`,失败或超时改回`failed`)。
四、整合与主控流程实现
现在我们把分片和状态管理组合起来,形成一个完整的、可断点续传的上传客户端。
import uuid
class ResumeUploadClient:
def __init__(self, server_base_url, state_db_path='transfer_state.db'):
self.server_base_url = server_base_url
self.state_mgr = TransferStateManager(state_db_path)
def upload(self, file_path, chunk_size=1024*1024, task_id=None):
"""主上传方法,支持传入已有的task_id进行续传"""
if not task_id:
# 新任务:分片并创建记录
task_id = str(uuid.uuid4())
print(f"开始新上传任务,ID: {task_id}")
chunks = split_file(file_path, chunk_size)
self.state_mgr.create_task(task_id, file_path, chunks)
else:
# 续传:从数据库加载任务
print(f"恢复上传任务,ID: {task_id}")
# 这里需要实现从数据库读取chunks的逻辑,为简化,我们直接重新分片(实际应读取保存的chunk_info)
chunks = split_file(file_path, chunk_size)
# 注意:实际项目中,分片参数(如chunk_size)必须与首次一致,否则会出错!
# 获取待上传的分片
pending_chunks = self.state_mgr.get_pending_chunks(task_id)
if not pending_chunks:
print("没有找到待上传的分片,任务可能已完成。")
return
print(f"共有 {len(pending_chunks)} 个分片需要上传。")
for chunk in pending_chunks:
try:
# 标记为上传中
self.state_mgr.update_chunk_status(task_id, chunk['index'], 'uploading')
# 构建上传该分片的特定URL(根据你的服务端API设计)
upload_url = f"{self.server_base_url}/upload_chunk?task_id={task_id}&chunk_index={chunk['index']}"
# 执行上传
success = upload_chunk(upload_url, file_path, chunk)
if success:
self.state_mgr.update_chunk_status(task_id, chunk['index'], 'success')
print(f"分片 {chunk['index']} 上传成功。")
else:
self.state_mgr.update_chunk_status(task_id, chunk['index'], 'failed')
print(f"分片 {chunk['index']} 上传失败。")
except Exception as e:
print(f"上传分片 {chunk['index']} 时发生异常: {e}")
self.state_mgr.update_chunk_status(task_id, chunk['index'], 'failed')
# 这里可以选择是否继续尝试后续分片
# break # 严重错误则中断
print(f"任务 {task_id} 处理完毕。")
self.state_mgr.close()
# 使用示例
if __name__ == '__main__':
client = ResumeUploadClient(server_base_url='http://your-server.com/api')
# 首次上传
client.upload('/path/to/your/large_file.zip')
# 如果中断,再次运行程序时,可以传入之前的task_id(这个id需要你保存下来)
# client.upload('/path/to/your/large_file.zip', task_id='之前保存的task-id')
五、服务端协作与注意事项
断点续传是客户端和服务端的“双人舞”。客户端设计好了,服务端也需要配合:
- 分片接收API:接收客户端上传的分片数据、任务ID和分片索引。
- 分片临时存储:将分片以`{task_id}_{chunk_index}.tmp`等形式保存在临时目录。
- 分片合并API:当客户端通知所有分片上传完成,或服务端检测到所有分片已就绪时,按索引顺序读取所有临时分片文件,合并成最终文件。
最后的忠告:
- 幂等性:分片上传接口要设计成幂等的,即同一分片重复上传不应导致错误,后到的覆盖先到的或直接返回成功即可。
- 清理机制:服务端需要定期清理陈旧的、未完成合并的临时分片文件,避免磁盘浪费。
- 完整性校验:所有分片上传完成后,最好能对合并后的文件计算哈希(如MD5、SHA256),与客户端提供的原始文件哈希对比,确保传输无误。
好了,以上就是用Python实现断点续传功能的核心设计与实战代码。从文件分片、SQLite状态持久化到主流程整合,我们一步步搭建了一个健壮的基础框架。你可以根据实际需求,在此基础上增加重试机制、并发上传、进度回调等高级功能。希望这篇教程能帮你避开我踩过的那些坑,顺利实现自己的断点续传系统。如果在实践中遇到问题,欢迎在源码库交流讨论!

评论(0)