from concurrent.futures import ThreadPoolExecutor from itertools import count from threading import Lock from typing import Any, Callable, Dict import traceback class KeyedBackgroundTaskRunner: """按 key 去重的后台任务执行器,避免同类长任务重复堆积。""" def __init__(self, max_workers: int, thread_name_prefix: str): self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=thread_name_prefix) self._lock = Lock() self._seq = count(1) self._tasks: Dict[str, Dict[str, Any]] = {} def submit(self, key: str, func: Callable[..., Any], *args: Any, **kwargs: Any) -> bool: with self._lock: current = self._tasks.get(key) if current and not current["future"].done(): return False token = next(self._seq) future = self._executor.submit(self._run_task, key, token, func, *args, **kwargs) self._tasks[key] = {"token": token, "future": future} return True def _run_task(self, key: str, token: int, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None: try: func(*args, **kwargs) except Exception: print(f"[BackgroundTaskRunner] Task failed, key={key}") traceback.print_exc() finally: with self._lock: current = self._tasks.get(key) if current and current["token"] == token: self._tasks.pop(key, None)