菜单

Administrator
发布于 2025-04-17 / 28 阅读
0
0

基于Python的多进程模板

这是一个基于 RedisMongoDB 的任务处理系统,具备多进程消费、去重、失败重试、失败持久化与任务恢复等完整的处理机制,适用于需要高可靠性和容错能力的异步任务处理场景。

功能概述:

  1. 任务队列处理(Worker):

    • 从 Redis 中的 task_queue 拉取任务,支持并发多进程处理(通过 multiprocessing 实现)。

    • 每个任务有唯一的 id,通过 Redis 的 dedup_set 去重,防止重复消费。

    • 成功处理的任务写入 MongoDB 的 tasks 集合。

    • 如果处理失败,任务会尝试最多 MAX_RETRIES 次,并以指数退避策略(exponential backoff)重试。

    • 重试仍失败的任务会被写入 Redis 的 failed_queue 和 MongoDB 的 failed_tasks 集合。

  2. 失败任务恢复机制(Recovery Worker):

    • 单独进程周期性扫描 Redis 中的 failed_queue

    • 将重试次数未超过上限的任务重新压入 task_queue,以便再次消费处理。

  3. 系统配置集中管理:

    • 所有配置参数集中在 Config 类中,包括 Redis 和 MongoDB 的连接信息、进程数量、重试逻辑参数等,便于统一修改和扩展。

  4. 日志记录:

    • 所有关键事件(任务成功、失败、重试、恢复等)都会记录到 worker.log 日志中,方便追踪和排查问题。

import redis
import pymongo
import json
import multiprocessing
import time
import logging
from typing import Any

# === 配置项 ===
class Config:
    REDIS_HOST = 'localhost'
    REDIS_PORT = 6379
    REDIS_DB = 0
    REDIS_TASK_QUEUE = 'task_queue'
    REDIS_FAILED_QUEUE = 'failed_queue'
    REDIS_DEDUP_KEY = 'dedup_set'

    MONGO_URI = 'mongodb://localhost:27017'
    MONGO_DB = 'task_system'
    MONGO_COLLECTION = 'tasks'
    MONGO_FAILED_COLLECTION = 'failed_tasks'

    PROCESS_COUNT = 4
    MAX_RETRIES = 3
    RETRY_BACKOFF = 2  # Exponential backoff
    RECOVERY_INTERVAL = 30  # 秒
    RECOVERY_BATCH_LIMIT = 50

# === 日志配置 ===
logging.basicConfig(
    filename='worker.log',
    level=logging.INFO,
    format='[%(asctime)s] [%(levelname)s] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# === Redis / Mongo 初始化 ===
def get_redis_client() -> redis.Redis:
    pool = redis.ConnectionPool(
        host=Config.REDIS_HOST,
        port=Config.REDIS_PORT,
        db=Config.REDIS_DB,
        decode_responses=True,
        max_connections=20
    )
    return redis.Redis(connection_pool=pool)

def get_mongo_clients():
    client = pymongo.MongoClient(Config.MONGO_URI, maxPoolSize=20)
    db = client[Config.MONGO_DB]
    return db[Config.MONGO_COLLECTION], db[Config.MONGO_FAILED_COLLECTION]

# === 基础工具 ===
def fetch_task(r: redis.Redis) -> Any:
    task = r.lpop(Config.REDIS_TASK_QUEUE)
    return json.loads(task) if task else None

def is_duplicate(r: redis.Redis, key: str) -> bool:
    return r.sismember(Config.REDIS_DEDUP_KEY, key)

def mark_as_seen(r: redis.Redis, key: str):
    r.sadd(Config.REDIS_DEDUP_KEY, key)

def process_task(task: dict) -> dict:
    task['processed_at'] = time.time()
    return task

# === Worker ===
def retry_task(task, redis_client, mongo_collection, dedup_set, task_id, worker_id):
    for attempt in range(1, Config.MAX_RETRIES + 1):
        try:
            result = process_task(task)
            mongo_collection.insert_one(result)
            mark_as_seen(redis_client, task_id)
            logging.info(f"[Worker-{worker_id}] Success: {task_id}")
            return True
        except Exception as e:
            wait = Config.RETRY_BACKOFF ** attempt
            logging.warning(f"[Worker-{worker_id}] Retry {attempt} for {task_id} failed: {e}, wait {wait}s")
            time.sleep(wait)
    return False

def worker(worker_id: int):
    logging.info(f"[Worker-{worker_id}] Started.")
    redis_client = get_redis_client()
    mongo_collection, failed_collection = get_mongo_clients()

    while True:
        task = fetch_task(redis_client)
        if not task:
            time.sleep(1)
            continue

        task_id = str(task.get('id'))
        if is_duplicate(redis_client, task_id):
            logging.info(f"[Worker-{worker_id}] Duplicate: {task_id}")
            continue

        task.setdefault('retry_count', 0)

        if retry_task(task, redis_client, mongo_collection, Config.REDIS_DEDUP_KEY, task_id, worker_id):
            continue

        # 持久化失败任务
        task['retry_count'] += 1
        redis_client.rpush(Config.REDIS_FAILED_QUEUE, json.dumps(task))
        failed_collection.insert_one({
            'task_id': task_id,
            'task': task,
            'failed_at': time.time(),
            'reason': f"Retry limit exceeded ({Config.MAX_RETRIES})"
        })
        logging.error(f"[Worker-{worker_id}] Final failure: {task_id}")

# === 恢复器 ===
def recovery_worker():
    logging.info("[Recovery] Started.")
    redis_client = get_redis_client()
    recovered = 0

    while True:
        for _ in range(Config.RECOVERY_BATCH_LIMIT):
            task_raw = redis_client.lpop(Config.REDIS_FAILED_QUEUE)
            if not task_raw:
                break
            try:
                task = json.loads(task_raw)
                retry_count = task.get('retry_count', 0)
                task_id = str(task.get('id'))
                if retry_count >= Config.MAX_RETRIES:
                    logging.warning(f"[Recovery] Discarded {task_id}, retry_count={retry_count}")
                    continue
                redis_client.rpush(Config.REDIS_TASK_QUEUE, json.dumps(task))
                logging.info(f"[Recovery] Requeued: {task_id}")
                recovered += 1
            except Exception as e:
                logging.error(f"[Recovery] Task decode error: {e}")
        if recovered == 0:
            logging.info("[Recovery] No tasks recovered.")
        time.sleep(Config.RECOVERY_INTERVAL)

# === 启动程序 ===
def main():
    logging.info("System booting...")
    processes = []

    for i in range(Config.PROCESS_COUNT):
        p = multiprocessing.Process(target=worker, args=(i,))
        p.start()
        processes.append(p)

    recovery = multiprocessing.Process(target=recovery_worker)
    recovery.start()
    processes.append(recovery)

    for p in processes:
        p.join()

if __name__ == '__main__':
    main()


评论