import logging import os import shutil from datetime import datetime from typing import Dict, List, Optional, Set from config import config logger = logging.getLogger(__name__) def _sanitize_filename(name: str) -> str: if not name: return "未命名" invalid = '<>:"/\\|?*' for c in invalid: name = name.replace(c, "") name = name.replace(" ", "_").strip("._") if not name: return "未命名" return name def _safe_filename(name: str, max_len: int = 60) -> str: safe = _sanitize_filename(name) if len(safe) > max_len: safe = safe[:max_len] return safe class ObsidianVaultManager: def __init__(self): self.vault_path = config.obsidian.vault_path self.meetings_dir = os.path.join(self.vault_path, config.obsidian.meetings_dir) self.entities_dir = os.path.join(self.vault_path, config.obsidian.entities_dir) self.graphs_dir = os.path.join(self.vault_path, config.obsidian.graphs_dir) self.raw_dir = os.path.join(self.vault_path, config.obsidian.raw_dir) self._ensure_dirs() def _ensure_dirs(self): for d in [self.meetings_dir, self.entities_dir, self.graphs_dir, self.raw_dir]: os.makedirs(d, exist_ok=True) def save_raw_text(self, text: str, title: str = "", date: str = "") -> str: date_str = date or datetime.now().strftime("%Y-%m-%d") safe_title = _safe_filename(title or "未命名", 40) filename = f"{date_str}_{safe_title}.md" filepath = os.path.join(self.raw_dir, filename) if os.path.exists(filepath): with open(filepath, "r", encoding="utf-8") as f: existing = f.read() if "status: processed" in existing: logger.warning(f"原文文件已存在且已处理过,将被覆盖: {filepath}") content = f"""--- title: "{title}" date: "{date_str}" tags: [raw] status: unprocessed --- # {title or "未命名"} **日期**: {date_str} ## 原文 {text} """ with open(filepath, "w", encoding="utf-8") as f: f.write(content) logger.info(f"原文已保存: {filepath}") return filepath def mark_raw_processed(self, raw_filepath: str): if not os.path.exists(raw_filepath): return with open(raw_filepath, "r", encoding="utf-8") as f: content = f.read() content = content.replace("status: unprocessed", "status: processed") with open(raw_filepath, "w", encoding="utf-8") as f: f.write(content) def _ensure_obsidian_config(self): obsidian_config = os.path.join(self.vault_path, ".obsidian", "app.json") if not os.path.exists(obsidian_config): os.makedirs(os.path.dirname(obsidian_config), exist_ok=True) with open(obsidian_config, "w", encoding="utf-8") as f: f.write('{\n "alwaysUpdateLinks": true,\n "newFileLocation": "current",\n "useMarkdownLinks": true\n}') core_plugins = os.path.join(self.vault_path, ".obsidian", "core-plugins.json") if not os.path.exists(core_plugins): with open(core_plugins, "w", encoding="utf-8") as f: f.write('{\n "file-explorer": true,\n "graph": true,\n "backlink": true,\n "tag-pane": true,\n "page-preview": true,\n "templates": true,\n "search": true\n}') def _meeting_filename(self, data: dict) -> str: date_str = data.get("date", datetime.now().strftime("%Y-%m-%d")) title = data.get("title", "未命名会议") safe_title = _safe_filename(title, 40) return f"{date_str}_{safe_title}.md" def _entity_path(self, name: str) -> str: safe = _safe_filename(name, 60) return os.path.join(self.entities_dir, f"{safe}.md") def _entity_link(self, name: str) -> str: safe = _safe_filename(name, 60) return f"[[Entities/{safe}|{name}]]" def _meeting_link(self, data: dict) -> str: fname = self._meeting_filename(data).replace(".md", "") title = data.get("title", "未命名会议") return f"[[Meetings/{fname}|{title}]]" def meeting_filepath(self, meeting_data: dict) -> str: filename = self._meeting_filename(meeting_data) return os.path.join(self.meetings_dir, filename) def meeting_file_exists(self, meeting_data: dict) -> bool: return os.path.exists(self.meeting_filepath(meeting_data)) def raw_filepath(self, meeting_data: dict) -> str: date_str = meeting_data.get("date", datetime.now().strftime("%Y-%m-%d")) title = meeting_data.get("title", "未命名") safe_title = _safe_filename(title, 40) filename = f"{date_str}_{safe_title}.md" return os.path.join(self.raw_dir, filename) def remove_meeting_note(self, meeting_data: dict): paths = [ self.meeting_filepath(meeting_data), self.raw_filepath(meeting_data), ] for p in paths: if os.path.exists(p): os.remove(p) logger.info(f"已删除: {p}") def add_meeting(self, meeting_data: dict, original_text: str) -> str: self._ensure_obsidian_config() filename = self._meeting_filename(meeting_data) filepath = os.path.join(self.meetings_dir, filename) content = self._render_meeting_note(meeting_data, original_text) with open(filepath, "w", encoding="utf-8") as f: f.write(content) logger.info(f"会议笔记已生成: {filepath}") self._create_all_entity_notes(meeting_data) self._update_graph_moc() return filepath def _render_meeting_note(self, data: dict, original_text: str) -> str: lines = [] lines.append("---") lines.append(f'title: "{data.get("title", "")}"') lines.append(f'date: "{data.get("date", "")}"') content_hash = data.get("_content_hash", "") if content_hash: lines.append(f'content_hash: "{content_hash}"') lines.append("tags: [meeting]") lines.append("---") lines.append("") lines.append(f"# {data.get('title', '')}") lines.append("") if data.get("date"): lines.append(f"**日期**: {data['date']}") if data.get("participants"): participants_links = [self._entity_link(p) for p in data["participants"]] lines.append(f"**参会人**: {', '.join(participants_links)}") lines.append("") if data.get("summary"): lines.append("## 摘要") lines.append(data["summary"]) lines.append("") lines.append("## 原文") lines.append(original_text) lines.append("") if data.get("entities"): lines.append("## 涉及实体") for e in data["entities"]: name = e.get("name", "") if not name: continue lines.append(f"- {self._entity_link(name)} ({e.get('entity_type', '')}): {e.get('description', '')}") lines.append("") if data.get("action_items"): lines.append("## 行动项") for item in data["action_items"]: task = item.get("task", "") assignee_link = self._entity_link(item["assignee"]) if item.get("assignee") else "待确认" deadline = item.get("deadline", "未指定") priority = item.get("priority", "中") status_emoji = "✅" if item.get("status") == "已完成" else "🔄" lines.append(f"- {status_emoji} **{task}** | 负责人: {assignee_link} | 截止: {deadline} | 优先级: {priority}") history = item.get("_history", []) if len(history) > 1: for h in history: icon = "✅" if h.get("status") == "已完成" else "🔄" lines.append(f" - {h.get('date', '')}: {icon} {h.get('status', '')} (优先级: {h.get('priority', '')})") lines.append("") if data.get("metrics"): lines.append("## 指标跟踪") lines.append("| 指标 | 当前值 | 目标值 | 趋势 | 负责人 |") lines.append("|------|--------|--------|------|--------|") for m in data["metrics"]: trend_icon = {"向好": "📈", "持平": "➡️", "恶化": "📉"}.get(m.get("trend", ""), "") owner_link = self._entity_link(m["owner"]) if m.get("owner") else "-" lines.append(f"| {m['metric_name']} | {m.get('value', '')} | {m.get('target', '')} | {trend_icon}{m.get('trend', '')} | {owner_link} |") lines.append("") if data.get("decisions"): lines.append("## 决策记录") for d in data["decisions"]: proposer_link = self._entity_link(d["proposer"]) if d.get("proposer") else "-" status_badge = "✅ 已决" if d.get("status") == "已决" else "⏳ 待定" lines.append(f"- {status_badge} {d['content']} ({proposer_link})") lines.append("") if data.get("relations"): lines.append("## 关系图谱") for r in data["relations"]: sub = r.get("subject", "") obj = r.get("object", "") pred = r.get("predicate", "") if sub and obj: lines.append(f"- {self._entity_link(sub)} → **{pred}** → {self._entity_link(obj)}") lines.append("") lines.append("---") date_tag = data.get("date", "").replace("-", "/") lines.append(f"#meeting #{date_tag}") return "\n".join(lines) def _create_all_entity_notes(self, data: dict): seen: Set[str] = set() for entity in data.get("entities", []): name = entity.get("name", "") if name and name not in seen: self._upsert_entity_note(name, entity.get("entity_type", "实体"), entity.get("description", ""), data) seen.add(name) for participant in data.get("participants", []): if participant and participant not in seen: self._upsert_entity_note(participant, "人物", f"{participant} (参会人)", data) seen.add(participant) for rel in data.get("relations", []): for key in ["subject", "object"]: name = rel.get(key, "") if name and name not in seen: etype = rel.get(f"{key}_type", "实体") self._upsert_entity_note(name, etype, "", data) seen.add(name) for item in data.get("action_items", []): name = item.get("assignee", "") if name and name not in seen: self._upsert_entity_note(name, "人物", f"{name} (行动项负责人)", data) seen.add(name) for m in data.get("metrics", []): name = m.get("owner", "") if name and name not in seen: self._upsert_entity_note(name, "人物", f"{name} (指标负责人)", data) seen.add(name) for d in data.get("decisions", []): name = d.get("proposer", "") if name and name not in seen: self._upsert_entity_note(name, "人物", f"{name} (决策提出人)", data) seen.add(name) def _upsert_entity_note(self, name: str, entity_type: str, description: str, meeting_data: dict): filepath = self._entity_path(name) meeting_link = self._meeting_link(meeting_data) if os.path.exists(filepath): with open(filepath, "r", encoding="utf-8") as f: existing = f.read() if meeting_link not in existing: idx = existing.find("## 相关会议") if idx > 0: section_end = existing.find("\n## ", idx + 10) if section_end < 0: section_end = len(existing) new_section = existing[idx:section_end].rstrip() + f"\n - {meeting_link}\n" existing = existing[:idx] + new_section + existing[section_end:] else: existing = existing.rstrip() + f"\n\n## 相关会议\n- {meeting_link}\n" self._upsert_entity_action_items(existing, meeting_data, name, meeting_link, filepath) return rel_lines = [] for r in meeting_data.get("relations", []): if r.get("subject") == name and r.get("object"): rel_lines.append(f"- → **{r['predicate']}** → {self._entity_link(r['object'])}") elif r.get("object") == name and r.get("subject"): rel_lines.append(f"- {self._entity_link(r['subject'])} → **{r['predicate']}** →") action_lines = [] for item in meeting_data.get("action_items", []): if item.get("assignee") == name: task = item.get("task", "") status_emoji = "✅" if item.get("status") == "已完成" else "🔄" action_lines.append(f"- {status_emoji} {task} (状态: {item.get('status', '待办')}, 源自: {meeting_link})") history = item.get("_history", []) if len(history) > 1: for h in history: icon = "✅" if h.get("status") == "已完成" else "🔄" action_lines.append(f" - {h.get('date', '')}: {icon} {h.get('status', '')}") content = f"""--- type: {_sanitize_filename(entity_type)} entity_type: "{entity_type}" tags: [entity, {_sanitize_filename(entity_type)}] --- # {name} **类型**: {entity_type} **描述**: {description} ## 相关会议 - {meeting_link} ## 关系 {chr(10).join(rel_lines) if rel_lines else "(暂无)"} ## 行动项 {chr(10).join(action_lines) if action_lines else "(暂无)"} """ with open(filepath, "w", encoding="utf-8") as f: f.write(content) def _upsert_entity_action_items(self, existing: str, meeting_data: dict, entity_name: str, meeting_link: str, filepath: str): action_lines = [] for item in meeting_data.get("action_items", []): if item.get("assignee") == entity_name: task = item.get("task", "") status_emoji = "✅" if item.get("status") == "已完成" else "🔄" action_lines.append(f"- {status_emoji} {task} (状态: {item.get('status', '待办')}, 源自: {meeting_link})") history = item.get("_history", []) if len(history) > 1: for h in history: icon = "✅" if h.get("status") == "已完成" else "🔄" action_lines.append(f" - {h.get('date', '')}: {icon} {h.get('status', '')}") action_section = "\n".join(action_lines) if action_lines else "(暂无)" idx = existing.find("## 行动项") if idx > 0: section_end = existing.find("\n## ", idx + 10) if section_end < 0: section_end = len(existing) new_section = f"## 行动项\n{action_section}" existing = existing[:idx] + new_section + existing[section_end:] else: existing = existing.rstrip() + f"\n\n## 行动项\n{action_section}\n" with open(filepath, "w", encoding="utf-8") as f: f.write(existing) def _update_graph_moc(self): meetings = [f for f in os.listdir(self.meetings_dir) if f.endswith(".md")] entities = [f for f in os.listdir(self.entities_dir) if f.endswith(".md")] lines = [] lines.append("---") lines.append("tags: [moc, graph]") lines.append("---") lines.append("") lines.append("# 知识图谱总览") lines.append("") lines.append("## 统计") lines.append(f"- **会议数量**: {len(meetings)}") lines.append(f"- **实体数量**: {len(entities)}") lines.append("") lines.append("## 最近会议") for m in sorted(meetings, reverse=True)[:10]: name = m.replace(".md", "") link_text = name[11:] if len(name) > 11 else name lines.append(f"- [[Meetings/{name}|{link_text}]]") lines.append("") lines.append("## 实体索引") for e in sorted(entities): name = e.replace(".md", "") lines.append(f"- [[Entities/{name}|{name}]]") with open(os.path.join(self.graphs_dir, "知识图谱总览.md"), "w", encoding="utf-8") as f: f.write("\n".join(lines)) def rebuild_vault(self, meetings_data: List[dict]): import shutil if os.path.exists(self.vault_path): shutil.rmtree(self.vault_path) self._ensure_dirs() self._ensure_obsidian_config() for md in meetings_data: self.add_meeting(md, md.get("_original_text", "")) obsidian_manager = ObsidianVaultManager()