这是一个站长之家ping接口的采集
对应的接口用websocket实现,具体思路:
1.查看websocket参数,发现有一个token需要提供。
2.查找token位置,搜索一下发现在网站返回的时候提供了。
3.提取token,提取该token。
4.正常返回数据,发现没有对应的地区,通过某ID进行关联补全字段。
5.将代码加入多进程模板。
import copy
import datetime
import re
import httpx
import redis
import pymongo
import json
import multiprocessing
from multiprocessing import Queue
import time
from typing import Any
from loguru import logger
import sys
from websockets.sync.client import connect as ws_connect
from lxml import etree
# === 配置项 ===
class Config:
REDIS_HOST = ''
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = ''
REDIS_TASK_QUEUE = ''
REDIS_FAILED_QUEUE = ''
REDIS_DEDUP_KEY = ''
REDIS_SUCCESS_QUEUE = ''
MONGO_URI = ''
MONGO_DB = ''
MONGO_COLLECTION = 'tasks'
MONGO_FAILED_COLLECTION = 'failed_tasks'
# PROCESS_COUNT = 2
PROCESS_COUNT = multiprocessing.cpu_count() - 1
MAX_RETRIES = 3
RETRY_BACKOFF = 2 # Exponential backoff
RECOVERY_INTERVAL = 30 # 秒
RECOVERY_BATCH_LIMIT = 50
# === 日志配置(使用 loguru)===
logger.remove() # 移除默认控制台输出(可选)
logger.add(sys.stdout, level="INFO", format="<green>[{time:YYYY-MM-DD HH:mm:ss}]</green> <level>[{level}]</level> <cyan>{message}</cyan>")
logger.add("worker.log", rotation="10 MB", retention="7 days", compression="zip", level="INFO",
format="[{time:YYYY-MM-DD HH:mm:ss}] [{level}] {message}")
# === Redis / Mongo 初始化 ===
def get_redis_client() -> redis.Redis:
pool = redis.ConnectionPool(
host=Config.REDIS_HOST,
port=Config.REDIS_PORT,
db=Config.REDIS_DB,
password=Config.REDIS_PASSWORD,
decode_responses=True,
max_connections=20
)
return redis.Redis(connection_pool=pool)
def get_mongo_clients():
client = pymongo.MongoClient(Config.MONGO_URI, maxPoolSize=20)
db = client[Config.MONGO_DB]
return db[Config.MONGO_COLLECTION], db[Config.MONGO_FAILED_COLLECTION]
# === 基础工具 ===
def fetch_task(r: redis.Redis) -> Any:
task = r.lpop(Config.REDIS_TASK_QUEUE)
return json.loads(task) if task else None
def is_duplicate(r: redis.Redis, key: str) -> bool:
return bool(r.sismember(Config.REDIS_DEDUP_KEY, key))
def mark_as_seen(r: redis.Redis, key: str):
r.sadd(Config.REDIS_DEDUP_KEY, key)
def post_with_single_retry(host, cookies, headers, data, logger):
url = f'https://ping.chinaz.com/{host}'
for attempt in range(10): # 最多尝试10次
try:
response = httpx.post(url, cookies=cookies, headers=headers, data=data, timeout=10)
response.raise_for_status()
return response
except httpx.HTTPStatusError as e:
logger.error(f"[Attempt {attempt + 1}] HTTP请求错误: {e}")
except httpx.RequestError as e:
logger.error(f"[Attempt {attempt + 1}] 请求失败: {e}")
if attempt == 0:
logger.info("重试一次...")
time.sleep(2) # 可选:稍等 2 秒再试
return None
def get_body(host):
cookies = {
'speedtesthost': '',
'pinghost': f'{host}',
'JSESSIONID': '',
}
headers = {
'referer': f'https://ping.chinaz.com/{host}',
'user-agent': '',
}
data = {
'host': f'{host}',
'type': '1',
'linetype': '全选,电信,移动,联通,多线,其他',
}
response = post_with_single_retry(host, cookies, headers, data, logger)
if response:
return response
else:
# 抛出异常,表示请求失败
raise Exception("请求失败")
# response = httpx.post(f'https://ping.chinaz.com/{host}', cookies=cookies, headers=headers, data=data)
# try:
# response.raise_for_status()
# except httpx.HTTPStatusError as e:
# logger.error(f"HTTP请求错误: {e}")
# return response
def get_token(res):
try:
token = re.search(r'let token = "(.*?)"', res.text).group(1)
except Exception as e:
print(f"Error: {e}")
return None
return token
def listen(host, tk,queue1):
uri = "wss://tooldata.chinaz.com/pingwebsocket"
payload = {
"keyword": f"{host}",
"token": f"{tk}",
"type": "1",
}
try:
# 创建同步WebSocket连接
websocket = ws_connect(uri)
websocket.send(json.dumps(payload))
# 设置接收超时30秒
websocket.timeout = 60
list_mess = []
while True:
try:
message = websocket.recv()
list_mess.append(message)
# logger.info(f"收到消息: {message}")
except TimeoutError:
logger.info(f"{host} 接收超时30秒,关闭连接")
break
except Exception as e:
logger.error(f"接收消息错误: {e}")
break
websocket.close()
queue1.put(list_mess)
except Exception as e:
logger.error(f"WebSocket错误 {host}: {e}")
def process_task(task: dict,queue1:Queue) -> dict:
task['processed_at'] = time.time()
host = task.get('domain')
logger.info(host)
bd = get_body(host)
tk = get_token(bd)
if tk:
hhtml = etree.HTML(bd.text)
listen(host, tk,queue1)
ll = queue1.get(timeout=30)
for i, l in enumerate(ll):
l = json.loads(l)
guid = l.get('guid')
data = hhtml.xpath(f'//*[@id="{guid}"]//*[contains(@class, "pl-[12px]")]/text()')
if data:
l['from'] = data
ll[i] = l
shanghai_tz = datetime.timezone(datetime.timedelta(hours=8))
now_shanghai = datetime.datetime.now(shanghai_tz)
task = {
'domain': host,
'created_at': now_shanghai,
'ping_results': ll,
'total_locations': len(ll),
'success_count': sum(1 for r in ll if r.get('from'))
}
return task
# === Worker ===
def retry_task(task, redis_client, mongo_collection, dedup_set, task_id, worker_id,queue1):
for attempt in range(1, Config.MAX_RETRIES + 1):
try:
result = process_task(task,queue1)
result_deep = copy.deepcopy(result)
mongo_collection.insert_one(result_deep)
result.pop('_id', None)
mark_as_seen(redis_client, task_id)
logger.info(f"[Worker-{worker_id}] Success: {task_id}")
return True
except Exception as e:
wait = Config.RETRY_BACKOFF ** attempt
logger.warning(f"[Worker-{worker_id}] Retry {attempt} for {task_id} failed: {e}, wait {wait}s")
time.sleep(wait)
return False
def worker(worker_id: int,queue1:Queue):
logger.info(f"[Worker-{worker_id}] Started.")
redis_client = get_redis_client()
mongo_collection, failed_collection = get_mongo_clients()
while True:
task = fetch_task(redis_client)
if not task:
time.sleep(1)
continue
task_id = str(task.get('id'))
if is_duplicate(redis_client, task_id):
logger.info(f"[Worker-{worker_id}] Duplicate: {task_id}")
continue
task.setdefault('retry_count', 0)
if retry_task(task, redis_client, mongo_collection, Config.REDIS_DEDUP_KEY, task_id, worker_id,queue1):
# 给成功队列存入redis
# todo
redis_client.rpush(Config.REDIS_SUCCESS_QUEUE, json.dumps(task))
continue
# 持久化失败任务
task['retry_count'] += 1
redis_client.rpush(Config.REDIS_FAILED_QUEUE, json.dumps(task))
# failed_collection.insert_one({
# 'task_id': task_id,
# 'task': task,
# 'failed_at': time.time(),
# 'reason': f"Retry limit exceeded ({Config.MAX_RETRIES})"
# })
logger.error(f"[Worker-{worker_id}] Final failure: {task_id}")
# === 恢复器 ===
def recovery_worker():
logger.info("[Recovery] Started.")
redis_client = get_redis_client()
recovered = 0
while True:
for _ in range(Config.RECOVERY_BATCH_LIMIT):
task_raw = redis_client.lpop(Config.REDIS_FAILED_QUEUE)
if not task_raw:
break
try:
task = json.loads(task_raw)
retry_count = task.get('retry_count', 0)
task_id = str(task.get('id'))
if retry_count >= Config.MAX_RETRIES:
logger.warning(f"[Recovery] Discarded {task_id}, retry_count={retry_count}")
# redis_client.rpush(Config.REDIS_END_FAILED_KEY,json.dumps(task))
continue
redis_client.rpush(Config.REDIS_TASK_QUEUE, json.dumps(task))
logger.info(f"[Recovery] Requeued: {task_id}")
recovered += 1
except Exception as e:
logger.error(f"[Recovery] Task decode error: {e}")
if recovered == 0:
logger.info("[Recovery] No tasks recovered.")
time.sleep(Config.RECOVERY_INTERVAL)
# === 启动程序 ===
def main():
logger.info("System booting...")
processes = []
queue1 = Queue()
for i in range(Config.PROCESS_COUNT):
p = multiprocessing.Process(target=worker, args=(i,queue1))
p.start()
processes.append(p)
recovery = multiprocessing.Process(target=recovery_worker)
recovery.start()
processes.append(recovery)
for p in processes:
p.join()
if __name__ == '__main__':
main()