416 lines
17 KiB
Python
416 lines
17 KiB
Python
import logging
|
|
import os
|
|
import shutil
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Set
|
|
|
|
from config import config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _sanitize_filename(name: str) -> str:
|
|
if not name:
|
|
return "未命名"
|
|
invalid = '<>:"/\\|?*'
|
|
for c in invalid:
|
|
name = name.replace(c, "")
|
|
name = name.replace(" ", "_").strip("._")
|
|
if not name:
|
|
return "未命名"
|
|
return name
|
|
|
|
|
|
def _safe_filename(name: str, max_len: int = 60) -> str:
|
|
safe = _sanitize_filename(name)
|
|
if len(safe) > max_len:
|
|
safe = safe[:max_len]
|
|
return safe
|
|
|
|
|
|
class ObsidianVaultManager:
|
|
def __init__(self):
|
|
self.vault_path = config.obsidian.vault_path
|
|
self.meetings_dir = os.path.join(self.vault_path, config.obsidian.meetings_dir)
|
|
self.entities_dir = os.path.join(self.vault_path, config.obsidian.entities_dir)
|
|
self.graphs_dir = os.path.join(self.vault_path, config.obsidian.graphs_dir)
|
|
self.raw_dir = os.path.join(self.vault_path, config.obsidian.raw_dir)
|
|
self._ensure_dirs()
|
|
|
|
def _ensure_dirs(self):
|
|
for d in [self.meetings_dir, self.entities_dir, self.graphs_dir, self.raw_dir]:
|
|
os.makedirs(d, exist_ok=True)
|
|
|
|
def save_raw_text(self, text: str, title: str = "", date: str = "") -> str:
|
|
date_str = date or datetime.now().strftime("%Y-%m-%d")
|
|
safe_title = _safe_filename(title or "未命名", 40)
|
|
filename = f"{date_str}_{safe_title}.md"
|
|
filepath = os.path.join(self.raw_dir, filename)
|
|
|
|
if os.path.exists(filepath):
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
existing = f.read()
|
|
if "status: processed" in existing:
|
|
logger.warning(f"原文文件已存在且已处理过,将被覆盖: {filepath}")
|
|
|
|
content = f"""---
|
|
title: "{title}"
|
|
date: "{date_str}"
|
|
tags: [raw]
|
|
status: unprocessed
|
|
---
|
|
|
|
# {title or "未命名"}
|
|
|
|
**日期**: {date_str}
|
|
|
|
## 原文
|
|
|
|
{text}
|
|
"""
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
logger.info(f"原文已保存: {filepath}")
|
|
return filepath
|
|
|
|
def mark_raw_processed(self, raw_filepath: str):
|
|
if not os.path.exists(raw_filepath):
|
|
return
|
|
with open(raw_filepath, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
content = content.replace("status: unprocessed", "status: processed")
|
|
with open(raw_filepath, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
def _ensure_obsidian_config(self):
|
|
obsidian_config = os.path.join(self.vault_path, ".obsidian", "app.json")
|
|
if not os.path.exists(obsidian_config):
|
|
os.makedirs(os.path.dirname(obsidian_config), exist_ok=True)
|
|
with open(obsidian_config, "w", encoding="utf-8") as f:
|
|
f.write('{\n "alwaysUpdateLinks": true,\n "newFileLocation": "current",\n "useMarkdownLinks": true\n}')
|
|
|
|
core_plugins = os.path.join(self.vault_path, ".obsidian", "core-plugins.json")
|
|
if not os.path.exists(core_plugins):
|
|
with open(core_plugins, "w", encoding="utf-8") as f:
|
|
f.write('{\n "file-explorer": true,\n "graph": true,\n "backlink": true,\n "tag-pane": true,\n "page-preview": true,\n "templates": true,\n "search": true\n}')
|
|
|
|
def _meeting_filename(self, data: dict) -> str:
|
|
date_str = data.get("date", datetime.now().strftime("%Y-%m-%d"))
|
|
title = data.get("title", "未命名会议")
|
|
safe_title = _safe_filename(title, 40)
|
|
return f"{date_str}_{safe_title}.md"
|
|
|
|
def _entity_path(self, name: str) -> str:
|
|
safe = _safe_filename(name, 60)
|
|
return os.path.join(self.entities_dir, f"{safe}.md")
|
|
|
|
def _entity_link(self, name: str) -> str:
|
|
safe = _safe_filename(name, 60)
|
|
return f"[[Entities/{safe}|{name}]]"
|
|
|
|
def _meeting_link(self, data: dict) -> str:
|
|
fname = self._meeting_filename(data).replace(".md", "")
|
|
title = data.get("title", "未命名会议")
|
|
return f"[[Meetings/{fname}|{title}]]"
|
|
|
|
def meeting_filepath(self, meeting_data: dict) -> str:
|
|
filename = self._meeting_filename(meeting_data)
|
|
return os.path.join(self.meetings_dir, filename)
|
|
|
|
def meeting_file_exists(self, meeting_data: dict) -> bool:
|
|
return os.path.exists(self.meeting_filepath(meeting_data))
|
|
|
|
def raw_filepath(self, meeting_data: dict) -> str:
|
|
date_str = meeting_data.get("date", datetime.now().strftime("%Y-%m-%d"))
|
|
title = meeting_data.get("title", "未命名")
|
|
safe_title = _safe_filename(title, 40)
|
|
filename = f"{date_str}_{safe_title}.md"
|
|
return os.path.join(self.raw_dir, filename)
|
|
|
|
def remove_meeting_note(self, meeting_data: dict):
|
|
paths = [
|
|
self.meeting_filepath(meeting_data),
|
|
self.raw_filepath(meeting_data),
|
|
]
|
|
for p in paths:
|
|
if os.path.exists(p):
|
|
os.remove(p)
|
|
logger.info(f"已删除: {p}")
|
|
|
|
def add_meeting(self, meeting_data: dict, original_text: str) -> str:
|
|
self._ensure_obsidian_config()
|
|
filename = self._meeting_filename(meeting_data)
|
|
filepath = os.path.join(self.meetings_dir, filename)
|
|
content = self._render_meeting_note(meeting_data, original_text)
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
logger.info(f"会议笔记已生成: {filepath}")
|
|
|
|
self._create_all_entity_notes(meeting_data)
|
|
self._update_graph_moc()
|
|
|
|
return filepath
|
|
|
|
def _render_meeting_note(self, data: dict, original_text: str) -> str:
|
|
lines = []
|
|
lines.append("---")
|
|
lines.append(f'title: "{data.get("title", "")}"')
|
|
lines.append(f'date: "{data.get("date", "")}"')
|
|
content_hash = data.get("_content_hash", "")
|
|
if content_hash:
|
|
lines.append(f'content_hash: "{content_hash}"')
|
|
lines.append("tags: [meeting]")
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
lines.append(f"# {data.get('title', '')}")
|
|
lines.append("")
|
|
|
|
if data.get("date"):
|
|
lines.append(f"**日期**: {data['date']}")
|
|
if data.get("participants"):
|
|
participants_links = [self._entity_link(p) for p in data["participants"]]
|
|
lines.append(f"**参会人**: {', '.join(participants_links)}")
|
|
lines.append("")
|
|
|
|
if data.get("summary"):
|
|
lines.append("## 摘要")
|
|
lines.append(data["summary"])
|
|
lines.append("")
|
|
|
|
lines.append("## 原文")
|
|
lines.append(original_text)
|
|
lines.append("")
|
|
|
|
if data.get("entities"):
|
|
lines.append("## 涉及实体")
|
|
for e in data["entities"]:
|
|
name = e.get("name", "")
|
|
if not name:
|
|
continue
|
|
lines.append(f"- {self._entity_link(name)} ({e.get('entity_type', '')}): {e.get('description', '')}")
|
|
lines.append("")
|
|
|
|
if data.get("action_items"):
|
|
lines.append("## 行动项")
|
|
for item in data["action_items"]:
|
|
task = item.get("task", "")
|
|
assignee_link = self._entity_link(item["assignee"]) if item.get("assignee") else "待确认"
|
|
deadline = item.get("deadline", "未指定")
|
|
priority = item.get("priority", "中")
|
|
status_emoji = "✅" if item.get("status") == "已完成" else "🔄"
|
|
lines.append(f"- {status_emoji} **{task}** | 负责人: {assignee_link} | 截止: {deadline} | 优先级: {priority}")
|
|
history = item.get("_history", [])
|
|
if len(history) > 1:
|
|
for h in history:
|
|
icon = "✅" if h.get("status") == "已完成" else "🔄"
|
|
lines.append(f" - {h.get('date', '')}: {icon} {h.get('status', '')} (优先级: {h.get('priority', '')})")
|
|
lines.append("")
|
|
|
|
if data.get("metrics"):
|
|
lines.append("## 指标跟踪")
|
|
lines.append("| 指标 | 当前值 | 目标值 | 趋势 | 负责人 |")
|
|
lines.append("|------|--------|--------|------|--------|")
|
|
for m in data["metrics"]:
|
|
trend_icon = {"向好": "📈", "持平": "➡️", "恶化": "📉"}.get(m.get("trend", ""), "")
|
|
owner_link = self._entity_link(m["owner"]) if m.get("owner") else "-"
|
|
lines.append(f"| {m['metric_name']} | {m.get('value', '')} | {m.get('target', '')} | {trend_icon}{m.get('trend', '')} | {owner_link} |")
|
|
lines.append("")
|
|
|
|
if data.get("decisions"):
|
|
lines.append("## 决策记录")
|
|
for d in data["decisions"]:
|
|
proposer_link = self._entity_link(d["proposer"]) if d.get("proposer") else "-"
|
|
status_badge = "✅ 已决" if d.get("status") == "已决" else "⏳ 待定"
|
|
lines.append(f"- {status_badge} {d['content']} ({proposer_link})")
|
|
lines.append("")
|
|
|
|
if data.get("relations"):
|
|
lines.append("## 关系图谱")
|
|
for r in data["relations"]:
|
|
sub = r.get("subject", "")
|
|
obj = r.get("object", "")
|
|
pred = r.get("predicate", "")
|
|
if sub and obj:
|
|
lines.append(f"- {self._entity_link(sub)} → **{pred}** → {self._entity_link(obj)}")
|
|
lines.append("")
|
|
|
|
lines.append("---")
|
|
date_tag = data.get("date", "").replace("-", "/")
|
|
lines.append(f"#meeting #{date_tag}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def _create_all_entity_notes(self, data: dict):
|
|
seen: Set[str] = set()
|
|
|
|
for entity in data.get("entities", []):
|
|
name = entity.get("name", "")
|
|
if name and name not in seen:
|
|
self._upsert_entity_note(name, entity.get("entity_type", "实体"), entity.get("description", ""), data)
|
|
seen.add(name)
|
|
|
|
for participant in data.get("participants", []):
|
|
if participant and participant not in seen:
|
|
self._upsert_entity_note(participant, "人物", f"{participant} (参会人)", data)
|
|
seen.add(participant)
|
|
|
|
for rel in data.get("relations", []):
|
|
for key in ["subject", "object"]:
|
|
name = rel.get(key, "")
|
|
if name and name not in seen:
|
|
etype = rel.get(f"{key}_type", "实体")
|
|
self._upsert_entity_note(name, etype, "", data)
|
|
seen.add(name)
|
|
|
|
for item in data.get("action_items", []):
|
|
name = item.get("assignee", "")
|
|
if name and name not in seen:
|
|
self._upsert_entity_note(name, "人物", f"{name} (行动项负责人)", data)
|
|
seen.add(name)
|
|
|
|
for m in data.get("metrics", []):
|
|
name = m.get("owner", "")
|
|
if name and name not in seen:
|
|
self._upsert_entity_note(name, "人物", f"{name} (指标负责人)", data)
|
|
seen.add(name)
|
|
|
|
for d in data.get("decisions", []):
|
|
name = d.get("proposer", "")
|
|
if name and name not in seen:
|
|
self._upsert_entity_note(name, "人物", f"{name} (决策提出人)", data)
|
|
seen.add(name)
|
|
|
|
def _upsert_entity_note(self, name: str, entity_type: str, description: str, meeting_data: dict):
|
|
filepath = self._entity_path(name)
|
|
meeting_link = self._meeting_link(meeting_data)
|
|
|
|
if os.path.exists(filepath):
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
existing = f.read()
|
|
|
|
if meeting_link not in existing:
|
|
idx = existing.find("## 相关会议")
|
|
if idx > 0:
|
|
section_end = existing.find("\n## ", idx + 10)
|
|
if section_end < 0:
|
|
section_end = len(existing)
|
|
new_section = existing[idx:section_end].rstrip() + f"\n - {meeting_link}\n"
|
|
existing = existing[:idx] + new_section + existing[section_end:]
|
|
else:
|
|
existing = existing.rstrip() + f"\n\n## 相关会议\n- {meeting_link}\n"
|
|
|
|
self._upsert_entity_action_items(existing, meeting_data, name, meeting_link, filepath)
|
|
return
|
|
|
|
rel_lines = []
|
|
for r in meeting_data.get("relations", []):
|
|
if r.get("subject") == name and r.get("object"):
|
|
rel_lines.append(f"- → **{r['predicate']}** → {self._entity_link(r['object'])}")
|
|
elif r.get("object") == name and r.get("subject"):
|
|
rel_lines.append(f"- {self._entity_link(r['subject'])} → **{r['predicate']}** →")
|
|
|
|
action_lines = []
|
|
for item in meeting_data.get("action_items", []):
|
|
if item.get("assignee") == name:
|
|
task = item.get("task", "")
|
|
status_emoji = "✅" if item.get("status") == "已完成" else "🔄"
|
|
action_lines.append(f"- {status_emoji} {task} (状态: {item.get('status', '待办')}, 源自: {meeting_link})")
|
|
history = item.get("_history", [])
|
|
if len(history) > 1:
|
|
for h in history:
|
|
icon = "✅" if h.get("status") == "已完成" else "🔄"
|
|
action_lines.append(f" - {h.get('date', '')}: {icon} {h.get('status', '')}")
|
|
|
|
content = f"""---
|
|
type: {_sanitize_filename(entity_type)}
|
|
entity_type: "{entity_type}"
|
|
tags: [entity, {_sanitize_filename(entity_type)}]
|
|
---
|
|
|
|
# {name}
|
|
|
|
**类型**: {entity_type}
|
|
**描述**: {description}
|
|
|
|
## 相关会议
|
|
- {meeting_link}
|
|
|
|
## 关系
|
|
{chr(10).join(rel_lines) if rel_lines else "(暂无)"}
|
|
|
|
## 行动项
|
|
{chr(10).join(action_lines) if action_lines else "(暂无)"}
|
|
"""
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
def _upsert_entity_action_items(self, existing: str, meeting_data: dict, entity_name: str, meeting_link: str, filepath: str):
|
|
action_lines = []
|
|
for item in meeting_data.get("action_items", []):
|
|
if item.get("assignee") == entity_name:
|
|
task = item.get("task", "")
|
|
status_emoji = "✅" if item.get("status") == "已完成" else "🔄"
|
|
action_lines.append(f"- {status_emoji} {task} (状态: {item.get('status', '待办')}, 源自: {meeting_link})")
|
|
history = item.get("_history", [])
|
|
if len(history) > 1:
|
|
for h in history:
|
|
icon = "✅" if h.get("status") == "已完成" else "🔄"
|
|
action_lines.append(f" - {h.get('date', '')}: {icon} {h.get('status', '')}")
|
|
|
|
action_section = "\n".join(action_lines) if action_lines else "(暂无)"
|
|
|
|
idx = existing.find("## 行动项")
|
|
if idx > 0:
|
|
section_end = existing.find("\n## ", idx + 10)
|
|
if section_end < 0:
|
|
section_end = len(existing)
|
|
new_section = f"## 行动项\n{action_section}"
|
|
existing = existing[:idx] + new_section + existing[section_end:]
|
|
else:
|
|
existing = existing.rstrip() + f"\n\n## 行动项\n{action_section}\n"
|
|
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(existing)
|
|
|
|
def _update_graph_moc(self):
|
|
meetings = [f for f in os.listdir(self.meetings_dir) if f.endswith(".md")]
|
|
entities = [f for f in os.listdir(self.entities_dir) if f.endswith(".md")]
|
|
|
|
lines = []
|
|
lines.append("---")
|
|
lines.append("tags: [moc, graph]")
|
|
lines.append("---")
|
|
lines.append("")
|
|
lines.append("# 知识图谱总览")
|
|
lines.append("")
|
|
lines.append("## 统计")
|
|
lines.append(f"- **会议数量**: {len(meetings)}")
|
|
lines.append(f"- **实体数量**: {len(entities)}")
|
|
lines.append("")
|
|
lines.append("## 最近会议")
|
|
for m in sorted(meetings, reverse=True)[:10]:
|
|
name = m.replace(".md", "")
|
|
link_text = name[11:] if len(name) > 11 else name
|
|
lines.append(f"- [[Meetings/{name}|{link_text}]]")
|
|
lines.append("")
|
|
lines.append("## 实体索引")
|
|
for e in sorted(entities):
|
|
name = e.replace(".md", "")
|
|
lines.append(f"- [[Entities/{name}|{name}]]")
|
|
|
|
with open(os.path.join(self.graphs_dir, "知识图谱总览.md"), "w", encoding="utf-8") as f:
|
|
f.write("\n".join(lines))
|
|
|
|
def rebuild_vault(self, meetings_data: List[dict]):
|
|
import shutil
|
|
if os.path.exists(self.vault_path):
|
|
shutil.rmtree(self.vault_path)
|
|
self._ensure_dirs()
|
|
self._ensure_obsidian_config()
|
|
|
|
for md in meetings_data:
|
|
self.add_meeting(md, md.get("_original_text", ""))
|
|
|
|
|
|
obsidian_manager = ObsidianVaultManager() |