189 lines
6.5 KiB
Python
189 lines
6.5 KiB
Python
import hashlib
|
||
import json
|
||
import logging
|
||
import os
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _item_id(task: str, assignee: str) -> str:
|
||
raw = f"{task}|{assignee}"
|
||
return hashlib.md5(raw.encode("utf-8")).hexdigest()[:8]
|
||
|
||
|
||
def _metric_id(metric_name: str, owner: str) -> str:
|
||
raw = f"{metric_name}|{owner}"
|
||
return hashlib.md5(raw.encode("utf-8")).hexdigest()[:8]
|
||
|
||
|
||
class MeetingStateStore:
|
||
def __init__(self, state_path: str):
|
||
self.state_path = state_path
|
||
self._state = self._load()
|
||
|
||
def _load(self) -> dict:
|
||
if os.path.exists(self.state_path):
|
||
try:
|
||
with open(self.state_path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
logger.warning(f"加载状态文件失败,将创建新状态: {e}")
|
||
return {
|
||
"action_items": {},
|
||
"metrics": {},
|
||
"meeting_series": {},
|
||
"content_hashes": {},
|
||
}
|
||
|
||
def save(self):
|
||
os.makedirs(os.path.dirname(self.state_path), exist_ok=True)
|
||
with open(self.state_path, "w", encoding="utf-8") as f:
|
||
json.dump(self._state, f, ensure_ascii=False, indent=2)
|
||
|
||
def _ensure_series(self, meeting_title: str, meeting_date: str) -> str:
|
||
series_name = self._detect_series(meeting_title)
|
||
series = self._state["meeting_series"].get(series_name)
|
||
if not series:
|
||
series = {"latest_date": meeting_date, "processed_titles": []}
|
||
self._state["meeting_series"][series_name] = series
|
||
if meeting_date > series.get("latest_date", ""):
|
||
series["latest_date"] = meeting_date
|
||
if meeting_title not in series["processed_titles"]:
|
||
series["processed_titles"].append(meeting_title)
|
||
return series_name
|
||
|
||
def _detect_series(self, title: str) -> str:
|
||
import re
|
||
cleaned = re.sub(r"(\d{4}第\w+期)", "", title)
|
||
cleaned = re.sub(r"\(\d{4}第\w+期\)", "", cleaned)
|
||
cleaned = re.sub(r"\d{4}第\w+期", "", cleaned)
|
||
cleaned = re.sub(r"\d{4}年第\w+次", "", cleaned)
|
||
cleaned = cleaned.strip("-_ ")
|
||
return cleaned or title
|
||
|
||
def merge_action_items(
|
||
self,
|
||
new_items: List[dict],
|
||
meeting_title: str,
|
||
meeting_date: str,
|
||
meeting_filename: str,
|
||
) -> List[dict]:
|
||
series_name = self._ensure_series(meeting_title, meeting_date)
|
||
merged = []
|
||
|
||
for item in new_items:
|
||
task = item.get("task", "")
|
||
assignee = item.get("assignee", "")
|
||
iid = _item_id(task, assignee)
|
||
|
||
history_entry = {
|
||
"date": meeting_date,
|
||
"meeting": meeting_filename,
|
||
"status": item.get("status", "待办"),
|
||
"priority": item.get("priority", "中"),
|
||
"deadline": item.get("deadline", ""),
|
||
}
|
||
|
||
existing = self._state["action_items"].get(iid)
|
||
if existing:
|
||
existing["history"].append(history_entry)
|
||
existing["latest"] = history_entry
|
||
latest = existing["history"][-1]
|
||
item["_item_id"] = iid
|
||
item["_history"] = list(existing["history"])
|
||
item["status"] = latest["status"]
|
||
item["priority"] = latest["priority"]
|
||
item["deadline"] = latest["deadline"]
|
||
else:
|
||
self._state["action_items"][iid] = {
|
||
"item_id": iid,
|
||
"task": task,
|
||
"assignee": assignee,
|
||
"series": series_name,
|
||
"created_meeting": meeting_filename,
|
||
"history": [history_entry],
|
||
"latest": history_entry,
|
||
}
|
||
item["_item_id"] = iid
|
||
item["_history"] = [history_entry]
|
||
|
||
merged.append(item)
|
||
|
||
return merged
|
||
|
||
def merge_metrics(
|
||
self,
|
||
new_metrics: List[dict],
|
||
meeting_title: str,
|
||
meeting_date: str,
|
||
meeting_filename: str,
|
||
) -> List[dict]:
|
||
merged = []
|
||
|
||
for m in new_metrics:
|
||
metric_name = m.get("metric_name", "")
|
||
owner = m.get("owner", "")
|
||
mid = _metric_id(metric_name, owner)
|
||
|
||
history_entry = {
|
||
"date": meeting_date,
|
||
"meeting": meeting_filename,
|
||
"value": m.get("value", ""),
|
||
"target": m.get("target", ""),
|
||
"trend": m.get("trend", ""),
|
||
}
|
||
|
||
existing = self._state["metrics"].get(mid)
|
||
if existing:
|
||
existing["history"].append(history_entry)
|
||
existing["latest"] = history_entry
|
||
item = m
|
||
item["_metric_id"] = mid
|
||
item["_history"] = list(existing["history"])
|
||
else:
|
||
self._state["metrics"][mid] = {
|
||
"metric_id": mid,
|
||
"metric_name": metric_name,
|
||
"owner": owner,
|
||
"history": [history_entry],
|
||
"latest": history_entry,
|
||
}
|
||
m["_metric_id"] = mid
|
||
m["_history"] = [history_entry]
|
||
|
||
merged.append(m)
|
||
|
||
return merged
|
||
|
||
def get_action_item_history(self, item_id: str) -> Optional[dict]:
|
||
return self._state["action_items"].get(item_id)
|
||
|
||
def get_metric_history(self, metric_id: str) -> Optional[dict]:
|
||
return self._state["metrics"].get(metric_id)
|
||
|
||
def get_series_info(self, title: str) -> Optional[dict]:
|
||
series_name = self._detect_series(title)
|
||
return self._state["meeting_series"].get(series_name)
|
||
|
||
def has_content_hash(self, content_hash: str) -> bool:
|
||
return content_hash in self._state["content_hashes"]
|
||
|
||
def add_content_hash(self, content_hash: str, title: str, date: str, filename: str):
|
||
self._state["content_hashes"][content_hash] = {
|
||
"title": title,
|
||
"date": date,
|
||
"filename": filename,
|
||
}
|
||
|
||
def remove_content_hash(self, content_hash: str):
|
||
self._state["content_hashes"].pop(content_hash, None)
|
||
|
||
def get_stats(self) -> dict:
|
||
return {
|
||
"action_items_tracked": len(self._state["action_items"]),
|
||
"metrics_tracked": len(self._state["metrics"]),
|
||
"meeting_series": len(self._state["meeting_series"]),
|
||
"content_hashes": len(self._state["content_hashes"]),
|
||
} |