111 lines
3.3 KiB
Python
111 lines
3.3 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from dataclasses import asdict, dataclass, field
|
||
|
|
from enum import Enum
|
||
|
|
from typing import Any, Dict, List, Optional
|
||
|
|
from uuid import uuid4
|
||
|
|
|
||
|
|
|
||
|
|
class TaskStatus(str, Enum):
|
||
|
|
PENDING = "pending"
|
||
|
|
READY = "ready"
|
||
|
|
RUNNING = "running"
|
||
|
|
BLOCKED = "blocked"
|
||
|
|
DONE = "done"
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(slots=True)
|
||
|
|
class Task:
|
||
|
|
id: str
|
||
|
|
title: str
|
||
|
|
description: str
|
||
|
|
status: TaskStatus = TaskStatus.PENDING
|
||
|
|
parent_id: Optional[str] = None
|
||
|
|
depends_on: List[str] = field(default_factory=list)
|
||
|
|
assignee: Optional[str] = None
|
||
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
data = asdict(self)
|
||
|
|
data["status"] = self.status.value
|
||
|
|
return data
|
||
|
|
|
||
|
|
|
||
|
|
class TaskDispatcher:
|
||
|
|
"""Minimal task board for decomposition and scheduling."""
|
||
|
|
|
||
|
|
def __init__(self) -> None:
|
||
|
|
self._tasks: Dict[str, Task] = {}
|
||
|
|
|
||
|
|
def create_task(
|
||
|
|
self,
|
||
|
|
*,
|
||
|
|
title: str,
|
||
|
|
description: str,
|
||
|
|
parent_id: Optional[str] = None,
|
||
|
|
depends_on: Optional[List[str]] = None,
|
||
|
|
assignee: Optional[str] = None,
|
||
|
|
metadata: Optional[Dict[str, Any]] = None,
|
||
|
|
) -> Task:
|
||
|
|
task = Task(
|
||
|
|
id=f"task_{uuid4().hex[:8]}",
|
||
|
|
title=title,
|
||
|
|
description=description,
|
||
|
|
status=TaskStatus.PENDING,
|
||
|
|
parent_id=parent_id,
|
||
|
|
depends_on=list(depends_on or []),
|
||
|
|
assignee=assignee,
|
||
|
|
metadata=dict(metadata or {}),
|
||
|
|
)
|
||
|
|
self._tasks[task.id] = task
|
||
|
|
self._recompute_readiness()
|
||
|
|
return task
|
||
|
|
|
||
|
|
def list_tasks(self) -> List[Dict[str, Any]]:
|
||
|
|
self._recompute_readiness()
|
||
|
|
return [task.to_dict() for task in self._tasks.values()]
|
||
|
|
|
||
|
|
def get_task(self, task_id: str) -> Task:
|
||
|
|
try:
|
||
|
|
return self._tasks[task_id]
|
||
|
|
except KeyError as exc:
|
||
|
|
raise KeyError(f"Unknown task: {task_id}") from exc
|
||
|
|
|
||
|
|
def update_task(
|
||
|
|
self,
|
||
|
|
task_id: str,
|
||
|
|
*,
|
||
|
|
status: Optional[str] = None,
|
||
|
|
metadata: Optional[Dict[str, Any]] = None,
|
||
|
|
assignee: Optional[str] = None,
|
||
|
|
) -> Task:
|
||
|
|
task = self.get_task(task_id)
|
||
|
|
if status is not None:
|
||
|
|
task.status = TaskStatus(status)
|
||
|
|
if metadata:
|
||
|
|
task.metadata.update(metadata)
|
||
|
|
if assignee is not None:
|
||
|
|
task.assignee = assignee
|
||
|
|
self._recompute_readiness()
|
||
|
|
return task
|
||
|
|
|
||
|
|
def next_ready_task(self, assignee: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
||
|
|
self._recompute_readiness()
|
||
|
|
for task in self._tasks.values():
|
||
|
|
if task.status != TaskStatus.READY:
|
||
|
|
continue
|
||
|
|
if assignee and task.assignee not in (None, assignee):
|
||
|
|
continue
|
||
|
|
return task.to_dict()
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _recompute_readiness(self) -> None:
|
||
|
|
for task in self._tasks.values():
|
||
|
|
if task.status in {TaskStatus.RUNNING, TaskStatus.BLOCKED, TaskStatus.DONE}:
|
||
|
|
continue
|
||
|
|
ready = all(
|
||
|
|
dep_id in self._tasks and self._tasks[dep_id].status == TaskStatus.DONE
|
||
|
|
for dep_id in task.depends_on
|
||
|
|
)
|
||
|
|
task.status = TaskStatus.READY if ready else TaskStatus.PENDING
|