my_meeting/web/server.py

450 lines
15 KiB
Python

import asyncio
import json
import queue
import shutil
import sys
import threading
import time
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Request
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from agents.chat import get_qwen_response
from prompt_loader import load_prompt
FRONTEND_DIR = PROJECT_ROOT / "frontend"
FRONTEND_ASSETS_DIR = FRONTEND_DIR / "assets"
DATA_ROOT = PROJECT_ROOT / "data"
DATA_DIR = DATA_ROOT / "meetings"
RESULTS_MD_DIR = PROJECT_ROOT / "data" / "results" / "md"
RESULTS_JSON_DIR = PROJECT_ROOT / "data" / "results" / "json"
TEMPLATE_DIR = PROJECT_ROOT / "template"
EXAMPLES_DIR = PROJECT_ROOT / "examples"
CONFIG_FILE = PROJECT_ROOT / "config.json"
APP_STATE_FILE = DATA_ROOT / "app_state.json"
DATA_DIR.mkdir(parents=True, exist_ok=True)
RESULTS_MD_DIR.mkdir(parents=True, exist_ok=True)
RESULTS_JSON_DIR.mkdir(parents=True, exist_ok=True)
FRONTEND_ASSETS_DIR.mkdir(parents=True, exist_ok=True)
ALL_RESULT_DIRS = (RESULTS_MD_DIR, RESULTS_JSON_DIR)
app = FastAPI(title="Meeting Summary Web")
app.mount("/assets", StaticFiles(directory=str(FRONTEND_ASSETS_DIR)), name="assets")
def _load_config() -> dict:
if CONFIG_FILE.exists():
return json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
return {
"api_base_url": "http://10.100.53.199:9527/v1",
"api_key": "unis123",
"model_name": "Qwen3.6-35B",
}
def _save_config(cfg: dict):
CONFIG_FILE.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8")
def _load_app_state() -> dict:
if APP_STATE_FILE.exists():
return json.loads(APP_STATE_FILE.read_text(encoding="utf-8"))
return {"active_meeting_id": None}
def _save_app_state(state: dict):
APP_STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
def _set_active_meeting(meeting_id: str | None):
state = _load_app_state()
state["active_meeting_id"] = meeting_id
_save_app_state(state)
def _get_llm_client(cfg: dict):
from openai import OpenAI
return OpenAI(api_key=cfg["api_key"], base_url=cfg["api_base_url"])
def _llm_stream(client, model, system_prompt, user_prompt, max_token=64000):
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=0.7,
max_tokens=max_token,
stream=True,
)
for chunk in response:
delta = chunk.choices[0].delta
if delta.content is None:
yield "reasoning", delta.reasoning
else:
yield "content", delta.content
def _read_meeting_meta(meeting_id: str) -> dict:
meta_path = DATA_DIR / meeting_id / "meta.json"
if meta_path.exists():
return json.loads(meta_path.read_text(encoding="utf-8"))
return {"name": meeting_id, "created_at": ""}
def _write_meeting_meta(meeting_id: str, meta: dict):
meta_path = DATA_DIR / meeting_id / "meta.json"
meta_path.parent.mkdir(parents=True, exist_ok=True)
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
def _find_transcript_file(meeting_id: str) -> Path | None:
mdir = DATA_DIR / meeting_id
for ext in (".txt", ".md"):
fp = mdir / f"transcript{ext}"
if fp.exists():
return fp
return None
def _meeting_summary(meeting_id: str) -> dict:
meta = _read_meeting_meta(meeting_id)
transcript_file = _find_transcript_file(meeting_id)
result_md = RESULTS_MD_DIR / meeting_id / "meeting_summary.md"
result_json = RESULTS_JSON_DIR / meeting_id / "sub_topic.json"
return {
"id": meeting_id,
"name": meta.get("name", meeting_id),
"created_at": meta.get("created_at", ""),
"original_filename": meta.get("original_filename", ""),
"transcript_filename": transcript_file.name if transcript_file else "",
"has_transcript": transcript_file is not None,
"has_summary": result_md.exists(),
"has_topics": result_json.exists(),
}
def _list_meeting_ids() -> list[str]:
if not DATA_DIR.exists():
return []
return sorted(
[p.name for p in DATA_DIR.iterdir() if p.is_dir()],
reverse=True,
)
def _list_meetings() -> list[dict]:
return [_meeting_summary(meeting_id) for meeting_id in _list_meeting_ids()]
@app.get("/")
async def index():
return FileResponse(str(FRONTEND_DIR / "index.html"))
@app.get("/api/settings")
async def get_settings():
return _load_config()
@app.put("/api/settings")
async def save_settings(cfg: dict):
required = {"api_base_url", "api_key", "model_name"}
if not required.issubset(cfg.keys()):
raise HTTPException(400, f"Missing fields: {required - set(cfg.keys())}")
_save_config(cfg)
return {"ok": True}
@app.get("/api/tree")
async def file_tree():
active_meeting_id = _load_app_state().get("active_meeting_id")
tree = {"name": "workspace", "type": "folder", "children": []}
def _build_branch(label, base_dir, prefix, delete_mode):
branch = {"name": label, "type": "folder", "children": []}
if base_dir.exists():
for md in sorted(base_dir.iterdir()):
if not md.is_dir():
continue
meta = _read_meeting_meta(md.name)
children = []
for f in sorted(md.iterdir()):
if f.is_file() and f.name != "meta.json":
children.append({
"name": f.name,
"type": "file",
"path": f"{prefix}/{md.name}/{f.name}",
})
branch["children"].append({
"name": meta.get("name", md.name),
"type": "folder",
"id": md.name,
"active": md.name == active_meeting_id,
"delete_mode": delete_mode,
"children": children,
})
tree["children"].append(branch)
_build_branch("会议原文", DATA_DIR, "meetings", "meeting")
_build_branch("处理结果MD", RESULTS_MD_DIR, "results_md", "results")
_build_branch("处理结果JSON", RESULTS_JSON_DIR, "results_json", "results")
return tree
@app.get("/api/meetings")
async def list_meetings():
active_meeting_id = _load_app_state().get("active_meeting_id")
return {
"active_meeting_id": active_meeting_id,
"meetings": _list_meetings(),
}
@app.get("/api/current-meeting")
async def get_current_meeting():
active_meeting_id = _load_app_state().get("active_meeting_id")
if not active_meeting_id:
return {"active_meeting_id": None, "meeting": None}
if not (DATA_DIR / active_meeting_id).exists():
_set_active_meeting(None)
return {"active_meeting_id": None, "meeting": None}
return {
"active_meeting_id": active_meeting_id,
"meeting": _meeting_summary(active_meeting_id),
}
@app.put("/api/current-meeting")
async def set_current_meeting(payload: dict):
meeting_id = payload.get("meeting_id")
if meeting_id is not None and not (DATA_DIR / meeting_id).exists():
raise HTTPException(404, "Meeting not found")
_set_active_meeting(meeting_id)
return {
"ok": True,
"active_meeting_id": meeting_id,
"meeting": _meeting_summary(meeting_id) if meeting_id else None,
}
@app.post("/api/meetings/import")
async def import_meeting(name: str = Form(...), file: UploadFile = File(...)):
if not file.filename:
raise HTTPException(400, "No file selected")
ext = Path(file.filename).suffix.lower()
if ext not in (".txt", ".md"):
raise HTTPException(400, "Only .txt and .md files are supported")
mid = str(int(time.time() * 1000))
mdir = DATA_DIR / mid
mdir.mkdir(parents=True, exist_ok=True)
content = await file.read()
try:
text = content.decode("utf-8")
except UnicodeDecodeError:
text = content.decode("gbk", errors="replace")
dest = "transcript" + ext
(mdir / dest).write_text(text, encoding="utf-8")
_write_meeting_meta(mid, {
"name": name,
"created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"original_filename": file.filename,
})
_set_active_meeting(mid)
return {"id": mid, "name": name}
@app.delete("/api/meetings/{meeting_id}")
async def delete_meeting(meeting_id: str):
if not (DATA_DIR / meeting_id).exists():
raise HTTPException(404, "Meeting not found")
active_meeting_id = _load_app_state().get("active_meeting_id")
for base in (DATA_DIR, RESULTS_MD_DIR, RESULTS_JSON_DIR):
bd = base / meeting_id
if bd.exists():
shutil.rmtree(str(bd))
if active_meeting_id == meeting_id:
remaining = _list_meeting_ids()
_set_active_meeting(remaining[0] if remaining else None)
return {"ok": True}
@app.delete("/api/meetings/{meeting_id}/results")
async def delete_meeting_results(meeting_id: str):
deleted = False
for base in (RESULTS_MD_DIR, RESULTS_JSON_DIR):
bd = base / meeting_id
if bd.exists():
shutil.rmtree(str(bd))
deleted = True
if not deleted:
raise HTTPException(404, "Meeting results not found")
return {"ok": True}
@app.get("/api/meetings/{meeting_id}/file/{filename:path}")
async def get_meeting_file(meeting_id: str, filename: str):
for base in (DATA_DIR, RESULTS_MD_DIR, RESULTS_JSON_DIR):
fp = base / meeting_id / filename
if fp.exists():
return {"content": fp.read_text(encoding="utf-8"), "filename": filename}
raise HTTPException(404, f"File not found: {filename}")
@app.get("/api/examples/{filename:path}")
async def get_example_file(filename: str):
fp = EXAMPLES_DIR / filename
if not fp.exists():
raise HTTPException(404, f"File not found: {filename}")
return {"content": fp.read_text(encoding="utf-8"), "filename": filename}
@app.get("/api/templates")
async def list_templates():
templates = []
if TEMPLATE_DIR.exists():
for f in sorted(TEMPLATE_DIR.iterdir()):
if f.is_file() and f.suffix == ".md":
templates.append({"name": f.name})
return templates
@app.get("/api/templates/{name}")
async def get_template(name: str):
fp = TEMPLATE_DIR / name
if not fp.exists():
raise HTTPException(404, f"Template not found: {name}")
return {"name": name, "content": fp.read_text(encoding="utf-8")}
@app.put("/api/templates/{name}")
async def save_template(name: str, payload: dict):
content = payload.get("content")
if content is None:
raise HTTPException(400, "Missing content field")
(TEMPLATE_DIR / name).write_text(content, encoding="utf-8")
return {"ok": True}
@app.get("/api/meetings/{meeting_id}/transcript")
async def get_meeting_transcript(meeting_id: str):
fp = _find_transcript_file(meeting_id)
if fp:
return {"content": fp.read_text(encoding="utf-8"), "filename": fp.name}
raise HTTPException(404, "No transcript found")
@app.get("/api/meetings/{meeting_id}/process")
async def process_meeting(meeting_id: str, request: Request, template_name: str = "template1.md"):
mdir = DATA_DIR / meeting_id
if not mdir.exists():
raise HTTPException(404, "Meeting not found")
_set_active_meeting(meeting_id)
transcript_file = _find_transcript_file(meeting_id)
if transcript_file is None:
raise HTTPException(400, "No transcript found")
transcript = transcript_file.read_text(encoding="utf-8")
tpl_path = TEMPLATE_DIR / template_name
if not tpl_path.exists():
raise HTTPException(404, f"Template not found: {template_name}")
template_content = tpl_path.read_text(encoding="utf-8")
prompt = load_prompt("meeting_summary", "zh")
cfg = _load_config()
model_name = cfg["model_name"]
eq = queue.Queue()
def run():
try:
client = _get_llm_client(cfg)
eq.put({"type": "status", "data": "preprocessing"})
sp = prompt["system"]["role"] + prompt["mode_contracts"]["data_preproces"]
up = prompt["user_template"]["article_preproces"].format(article=transcript)
sub = ""
for ct, cc in _llm_stream(client, model_name, sp, up):
if cc:
eq.put({"type": "chunk", "data": {"stage": 1, "chunk_type": ct, "text": str(cc)}})
if ct == "content":
sub += str(cc)
eq.put({"type": "status", "data": "preprocessing_done"})
rjdir = RESULTS_JSON_DIR / meeting_id
rjdir.mkdir(parents=True, exist_ok=True)
try:
sd = json.loads(sub)
(rjdir / "sub_topic.json").write_text(json.dumps(sd, ensure_ascii=False, indent=4), encoding="utf-8")
except Exception:
(rjdir / "sub_topic.json").write_text(sub, encoding="utf-8")
eq.put({"type": "status", "data": "summarizing"})
sp = prompt["system"]["role"] + prompt["mode_contracts"]["data_summary"].format(template=template_content)
up = prompt["user_template"]["article_summary"].format(article=transcript, sub_topices=sub)
result = ""
for ct, cc in _llm_stream(client, model_name, sp, up):
if cc:
eq.put({"type": "chunk", "data": {"stage": 2, "chunk_type": ct, "text": str(cc)}})
if ct == "content":
result += str(cc)
rmdir = RESULTS_MD_DIR / meeting_id
rmdir.mkdir(parents=True, exist_ok=True)
(rmdir / "meeting_summary.md").write_text(result, encoding="utf-8")
eq.put({"type": "done", "data": {"result": result}})
except Exception as e:
eq.put({"type": "error", "data": str(e)})
threading.Thread(target=run, daemon=True).start()
async def gen():
loop = asyncio.get_running_loop()
while True:
if await request.is_disconnected():
break
try:
evt = await loop.run_in_executor(None, eq.get, True, 0.5)
yield f"data: {json.dumps(evt, ensure_ascii=False)}\n\n"
if evt["type"] in ("done", "error"):
break
except queue.Empty:
yield ": heartbeat\n\n"
return StreamingResponse(
gen(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)