from __future__ import annotations import json from pathlib import Path import sys import threading import time from typing import Any if __package__ in (None, ""): sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from core_agent.agent import CoreAgent from core_agent.config import apply_compat_env_aliases, build_core_agent_config, load_core_agent_env from core_agent.providers.openai_compatible import OpenAICompatibleProvider class TerminalRenderer: def __init__(self, heartbeat_seconds: float = 3.0) -> None: self.heartbeat_seconds = heartbeat_seconds self._last_activity = time.monotonic() self._stop = threading.Event() self._thread: threading.Thread | None = None self._printed_thinking = False self._printed_answer = False self._saw_text = False def start_turn(self) -> None: self._last_activity = time.monotonic() self._stop.clear() self._printed_thinking = False self._printed_answer = False self._saw_text = False print("Assistant: ", end="", flush=True) self._thread = threading.Thread(target=self._heartbeat_loop, daemon=True) self._thread.start() def stop_turn(self) -> None: self._stop.set() if self._thread is not None: self._thread.join(timeout=0.2) self._thread = None print() def on_reasoning(self, text: str) -> None: self._touch() if not text: return if not self._printed_thinking: print("\n[thinking] ", end="", flush=True) self._printed_thinking = True print(text, end="", flush=True) def on_content(self, text: str) -> None: self._touch() if not text: return if not self._printed_answer: prefix = "\n[answer] " if self._printed_thinking else "" print(prefix, end="", flush=True) self._printed_answer = True print(text, end="", flush=True) self._saw_text = True def on_tool_call(self, tool_name: str, tool_args: dict[str, Any]) -> None: self._touch() rendered_args = _compact_json(tool_args, limit=200) print(f"\n[tool] {tool_name} {rendered_args}", flush=True) def on_tool_result(self, tool_result: str) -> None: self._touch() print(f"[tool-result] {_summarize_tool_result(tool_result)}", flush=True) def on_final(self, final_response: str) -> None: self._touch() if final_response and not self._saw_text: if not self._printed_answer: prefix = "\n[answer] " if self._printed_thinking else "" print(prefix, end="", flush=True) self._printed_answer = True print(final_response, end="", flush=True) def _touch(self) -> None: self._last_activity = time.monotonic() def _heartbeat_loop(self) -> None: while not self._stop.wait(0.25): idle_for = time.monotonic() - self._last_activity if idle_for < self.heartbeat_seconds: continue label = "thinking" if self._printed_thinking and not self._printed_answer else "working" print(f"\n[{label}...]", end="", flush=True) self._last_activity = time.monotonic() def main() -> None: workspace = Path.cwd() load_core_agent_env() apply_compat_env_aliases() config = build_core_agent_config() provider = OpenAICompatibleProvider( model=config.model, api_key=config.api_key, base_url=config.base_url, timeout=config.timeout, ) agent = CoreAgent( provider=provider, workspace=workspace, skill_dirs=[workspace / "skills", Path(__file__).resolve().parent / "sample_skills"], ) session = agent.new_session() renderer = TerminalRenderer() print("Core Agent multi-turn chat. Type `exit` to quit.") while True: try: user_input = input("\nYou: ").strip() except EOFError: print() break if user_input.lower() in {"exit", "quit", "q"}: break if not user_input: continue renderer.start_turn() try: for event in session.stream_ask(user_input): if event.type == "reasoning": renderer.on_reasoning(event.delta) elif event.type == "content": renderer.on_content(event.delta) elif event.type == "tool_call": renderer.on_tool_call(event.tool_name, event.tool_args) elif event.type == "tool_result": renderer.on_tool_result(event.tool_result) elif event.type == "final": renderer.on_final(event.final_response) except Exception as exc: print(f"\n[error] {exc}", flush=True) finally: renderer.stop_turn() def _compact_json(value: Any, limit: int = 200) -> str: try: text = json.dumps(value, ensure_ascii=False) except Exception: text = str(value) if len(text) <= limit: return text return text[:limit] + "..." def _summarize_tool_result(tool_result: str, limit: int = 240) -> str: try: payload = json.loads(tool_result) if isinstance(payload, dict): if "stdout" in payload and payload["stdout"]: text = str(payload["stdout"]).strip() elif "content" in payload and payload["content"]: text = str(payload["content"]).strip() elif "entries" in payload and isinstance(payload["entries"], list): text = f"{len(payload['entries'])} entries" elif "tasks" in payload and isinstance(payload["tasks"], list): text = f"{len(payload['tasks'])} tasks" else: text = json.dumps(payload, ensure_ascii=False) else: text = str(payload) except Exception: text = tool_result text = " ".join(text.split()) if len(text) <= limit: return text return text[:limit] + "..." if __name__ == "__main__": main()