这是一个基于 Redis 和 MongoDB 的任务处理系统,具备多进程消费、去重、失败重试、失败持久化与任务恢复等完整的处理机制,适用于需要高可靠性和容错能力的异步任务处理场景。
功能概述:
任务队列处理(Worker):
从 Redis 中的
task_queue
拉取任务,支持并发多进程处理(通过multiprocessing
实现)。每个任务有唯一的
id
,通过 Redis 的dedup_set
去重,防止重复消费。成功处理的任务写入 MongoDB 的
tasks
集合。如果处理失败,任务会尝试最多
MAX_RETRIES
次,并以指数退避策略(exponential backoff)重试。重试仍失败的任务会被写入 Redis 的
failed_queue
和 MongoDB 的failed_tasks
集合。
失败任务恢复机制(Recovery Worker):
单独进程周期性扫描 Redis 中的
failed_queue
。将重试次数未超过上限的任务重新压入
task_queue
,以便再次消费处理。
系统配置集中管理:
所有配置参数集中在
Config
类中,包括 Redis 和 MongoDB 的连接信息、进程数量、重试逻辑参数等,便于统一修改和扩展。
日志记录:
所有关键事件(任务成功、失败、重试、恢复等)都会记录到
worker.log
日志中,方便追踪和排查问题。
import redis
import pymongo
import json
import multiprocessing
import time
import logging
from typing import Any
# === 配置项 ===
class Config:
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_TASK_QUEUE = 'task_queue'
REDIS_FAILED_QUEUE = 'failed_queue'
REDIS_DEDUP_KEY = 'dedup_set'
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DB = 'task_system'
MONGO_COLLECTION = 'tasks'
MONGO_FAILED_COLLECTION = 'failed_tasks'
PROCESS_COUNT = 4
MAX_RETRIES = 3
RETRY_BACKOFF = 2 # Exponential backoff
RECOVERY_INTERVAL = 30 # 秒
RECOVERY_BATCH_LIMIT = 50
# === 日志配置 ===
logging.basicConfig(
filename='worker.log',
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# === Redis / Mongo 初始化 ===
def get_redis_client() -> redis.Redis:
pool = redis.ConnectionPool(
host=Config.REDIS_HOST,
port=Config.REDIS_PORT,
db=Config.REDIS_DB,
decode_responses=True,
max_connections=20
)
return redis.Redis(connection_pool=pool)
def get_mongo_clients():
client = pymongo.MongoClient(Config.MONGO_URI, maxPoolSize=20)
db = client[Config.MONGO_DB]
return db[Config.MONGO_COLLECTION], db[Config.MONGO_FAILED_COLLECTION]
# === 基础工具 ===
def fetch_task(r: redis.Redis) -> Any:
task = r.lpop(Config.REDIS_TASK_QUEUE)
return json.loads(task) if task else None
def is_duplicate(r: redis.Redis, key: str) -> bool:
return r.sismember(Config.REDIS_DEDUP_KEY, key)
def mark_as_seen(r: redis.Redis, key: str):
r.sadd(Config.REDIS_DEDUP_KEY, key)
def process_task(task: dict) -> dict:
task['processed_at'] = time.time()
return task
# === Worker ===
def retry_task(task, redis_client, mongo_collection, dedup_set, task_id, worker_id):
for attempt in range(1, Config.MAX_RETRIES + 1):
try:
result = process_task(task)
mongo_collection.insert_one(result)
mark_as_seen(redis_client, task_id)
logging.info(f"[Worker-{worker_id}] Success: {task_id}")
return True
except Exception as e:
wait = Config.RETRY_BACKOFF ** attempt
logging.warning(f"[Worker-{worker_id}] Retry {attempt} for {task_id} failed: {e}, wait {wait}s")
time.sleep(wait)
return False
def worker(worker_id: int):
logging.info(f"[Worker-{worker_id}] Started.")
redis_client = get_redis_client()
mongo_collection, failed_collection = get_mongo_clients()
while True:
task = fetch_task(redis_client)
if not task:
time.sleep(1)
continue
task_id = str(task.get('id'))
if is_duplicate(redis_client, task_id):
logging.info(f"[Worker-{worker_id}] Duplicate: {task_id}")
continue
task.setdefault('retry_count', 0)
if retry_task(task, redis_client, mongo_collection, Config.REDIS_DEDUP_KEY, task_id, worker_id):
continue
# 持久化失败任务
task['retry_count'] += 1
redis_client.rpush(Config.REDIS_FAILED_QUEUE, json.dumps(task))
failed_collection.insert_one({
'task_id': task_id,
'task': task,
'failed_at': time.time(),
'reason': f"Retry limit exceeded ({Config.MAX_RETRIES})"
})
logging.error(f"[Worker-{worker_id}] Final failure: {task_id}")
# === 恢复器 ===
def recovery_worker():
logging.info("[Recovery] Started.")
redis_client = get_redis_client()
recovered = 0
while True:
for _ in range(Config.RECOVERY_BATCH_LIMIT):
task_raw = redis_client.lpop(Config.REDIS_FAILED_QUEUE)
if not task_raw:
break
try:
task = json.loads(task_raw)
retry_count = task.get('retry_count', 0)
task_id = str(task.get('id'))
if retry_count >= Config.MAX_RETRIES:
logging.warning(f"[Recovery] Discarded {task_id}, retry_count={retry_count}")
continue
redis_client.rpush(Config.REDIS_TASK_QUEUE, json.dumps(task))
logging.info(f"[Recovery] Requeued: {task_id}")
recovered += 1
except Exception as e:
logging.error(f"[Recovery] Task decode error: {e}")
if recovered == 0:
logging.info("[Recovery] No tasks recovered.")
time.sleep(Config.RECOVERY_INTERVAL)
# === 启动程序 ===
def main():
logging.info("System booting...")
processes = []
for i in range(Config.PROCESS_COUNT):
p = multiprocessing.Process(target=worker, args=(i,))
p.start()
processes.append(p)
recovery = multiprocessing.Process(target=recovery_worker)
recovery.start()
processes.append(recovery)
for p in processes:
p.join()
if __name__ == '__main__':
main()