feat: add improved pi agent with observatory, dashboard, and pledge-now-pay-later

This commit is contained in:
Azreen Jamal
2026-03-01 23:41:24 +08:00
parent ae242436c9
commit f832b913d5
99 changed files with 20949 additions and 74 deletions

View File

@@ -2,8 +2,9 @@
* Agent Team — Dispatcher-only orchestrator with grid dashboard
*
* The primary Pi agent has NO codebase tools. It can ONLY delegate work
* to specialist agents via the `dispatch_agent` tool. Each specialist
* maintains its own Pi session for cross-invocation memory.
* to specialist agents via the `dispatch_agent` tool (single) or
* `dispatch_agents` tool (parallel batch). Each specialist maintains
* its own Pi session for cross-invocation memory.
*
* Loads agent definitions from agents/*.md, .claude/agents/*.md, .pi/agents/*.md.
* Teams are defined in .pi/agents/teams.yaml — on boot a select dialog lets
@@ -20,11 +21,16 @@
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { Type } from "@sinclair/typebox";
import { Text, type AutocompleteItem, truncateToWidth, visibleWidth } from "@mariozechner/pi-tui";
import { spawn } from "child_process";
import { spawn, type ChildProcess } from "child_process";
import { readdirSync, readFileSync, existsSync, mkdirSync, unlinkSync } from "fs";
import { join, resolve } from "path";
import { applyExtensionDefaults } from "./themeMap.ts";
// ── Constants ────────────────────────────────────
const AGENT_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes per dispatch
const WIDGET_THROTTLE_MS = 500; // max widget refresh rate
// ── Types ────────────────────────────────────────
interface AgentDef {
@@ -46,6 +52,7 @@ interface AgentState {
sessionFile: string | null;
runCount: number;
timer?: ReturnType<typeof setInterval>;
proc?: ChildProcess;
}
// ── Display Name Helper ──────────────────────────
@@ -136,6 +143,7 @@ function scanAgentDirs(cwd: string): AgentDef[] {
export default function (pi: ExtensionAPI) {
const agentStates: Map<string, AgentState> = new Map();
const activeProcesses: Set<ChildProcess> = new Set();
let allAgentDefs: AgentDef[] = [];
let teams: Record<string, string[]> = {};
let activeTeamName = "";
@@ -144,17 +152,40 @@ export default function (pi: ExtensionAPI) {
let sessionDir = "";
let contextWindow = 0;
// ── Throttled Widget Update ──────────────────
let widgetDirty = false;
let widgetTimer: ReturnType<typeof setTimeout> | null = null;
function scheduleWidgetUpdate() {
widgetDirty = true;
if (widgetTimer) return; // already scheduled
widgetTimer = setTimeout(() => {
widgetTimer = null;
if (widgetDirty) {
widgetDirty = false;
doUpdateWidget();
}
}, WIDGET_THROTTLE_MS);
}
function flushWidgetUpdate() {
if (widgetTimer) {
clearTimeout(widgetTimer);
widgetTimer = null;
}
widgetDirty = false;
doUpdateWidget();
}
function loadAgents(cwd: string) {
// Create session storage dir
sessionDir = join(cwd, ".pi", "agent-sessions");
if (!existsSync(sessionDir)) {
mkdirSync(sessionDir, { recursive: true });
}
// Load all agent definitions
allAgentDefs = scanAgentDirs(cwd);
// Load teams from .pi/agents/teams.yaml
const teamsPath = join(cwd, ".pi", "agents", "teams.yaml");
if (existsSync(teamsPath)) {
try {
@@ -166,7 +197,6 @@ export default function (pi: ExtensionAPI) {
teams = {};
}
// If no teams defined, create a default "all" team
if (Object.keys(teams).length === 0) {
teams = { all: allAgentDefs.map(d => d.name) };
}
@@ -196,11 +226,24 @@ export default function (pi: ExtensionAPI) {
});
}
// Auto-size grid columns based on team size
const size = agentStates.size;
gridCols = size <= 3 ? size : size === 4 ? 2 : 3;
}
// ── Kill all tracked child processes ─────────
function killAllAgents() {
for (const proc of activeProcesses) {
try { proc.kill("SIGTERM"); } catch {}
}
// Force kill after 3s
setTimeout(() => {
for (const proc of activeProcesses) {
try { proc.kill("SIGKILL"); } catch {}
}
}, 3000);
}
// ── Grid Rendering ───────────────────────────
function renderCard(state: AgentState, colWidth: number, theme: any): string[] {
@@ -223,7 +266,6 @@ export default function (pi: ExtensionAPI) {
const statusLine = theme.fg(statusColor, statusStr + timeStr);
const statusVisible = statusStr.length + timeStr.length;
// Context bar: 5 blocks + percent
const filled = Math.ceil(state.contextPct / 20);
const bar = "#".repeat(filled) + "-".repeat(5 - filled);
const ctxStr = `[${bar}] ${Math.ceil(state.contextPct)}%`;
@@ -252,7 +294,7 @@ export default function (pi: ExtensionAPI) {
];
}
function updateWidget() {
function doUpdateWidget() {
if (!widgetCtx) return;
widgetCtx.ui.setWidget("agent-team", (_tui: any, theme: any) => {
@@ -302,6 +344,7 @@ export default function (pi: ExtensionAPI) {
agentName: string,
task: string,
ctx: any,
signal?: AbortSignal,
): Promise<{ output: string; exitCode: number; elapsed: number }> {
const key = agentName.toLowerCase();
const state = agentStates.get(key);
@@ -321,29 +364,29 @@ export default function (pi: ExtensionAPI) {
});
}
// Reset state for new run
state.status = "running";
state.task = task;
state.toolCount = 0;
state.elapsed = 0;
state.lastWork = "";
state.contextPct = 0;
state.runCount++;
updateWidget();
scheduleWidgetUpdate();
const startTime = Date.now();
state.timer = setInterval(() => {
state.elapsed = Date.now() - startTime;
updateWidget();
scheduleWidgetUpdate();
}, 1000);
const model = ctx.model
? `${ctx.model.provider}/${ctx.model.id}`
: "openrouter/google/gemini-3-flash-preview";
// Session file for this agent
const agentKey = state.def.name.toLowerCase().replace(/\s+/g, "-");
const agentSessionFile = join(sessionDir, `${agentKey}.json`);
// Build args — first run creates session, subsequent runs resume
const args = [
"--mode", "json",
"-p",
@@ -355,7 +398,6 @@ export default function (pi: ExtensionAPI) {
"--session", agentSessionFile,
];
// Continue existing session if we have one
if (state.sessionFile) {
args.push("-c");
}
@@ -363,13 +405,44 @@ export default function (pi: ExtensionAPI) {
args.push(task);
const textChunks: string[] = [];
let resolved = false;
return new Promise((promiseResolve) => {
// Guard against double-resolve
const safeResolve = (val: { output: string; exitCode: number; elapsed: number }) => {
if (resolved) return;
resolved = true;
promiseResolve(val);
};
return new Promise((resolve) => {
const proc = spawn("pi", args, {
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env },
});
state.proc = proc;
activeProcesses.add(proc);
// ── Timeout guard ──
const timeout = setTimeout(() => {
try { proc.kill("SIGTERM"); } catch {}
// Force kill after 3s if still alive
setTimeout(() => { try { proc.kill("SIGKILL"); } catch {} }, 3000);
}, AGENT_TIMEOUT_MS);
// ── AbortSignal support ──
const onAbort = () => {
try { proc.kill("SIGTERM"); } catch {}
setTimeout(() => { try { proc.kill("SIGKILL"); } catch {} }, 3000);
};
if (signal) {
if (signal.aborted) {
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
}
let buffer = "";
proc.stdout!.setEncoding("utf-8");
@@ -388,23 +461,23 @@ export default function (pi: ExtensionAPI) {
const full = textChunks.join("");
const last = full.split("\n").filter((l: string) => l.trim()).pop() || "";
state.lastWork = last;
updateWidget();
scheduleWidgetUpdate();
}
} else if (event.type === "tool_execution_start") {
state.toolCount++;
updateWidget();
scheduleWidgetUpdate();
} else if (event.type === "message_end") {
const msg = event.message;
if (msg?.usage && contextWindow > 0) {
state.contextPct = ((msg.usage.input || 0) / contextWindow) * 100;
updateWidget();
scheduleWidgetUpdate();
}
} else if (event.type === "agent_end") {
const msgs = event.messages || [];
const last = [...msgs].reverse().find((m: any) => m.role === "assistant");
if (last?.usage && contextWindow > 0) {
state.contextPct = ((last.usage.input || 0) / contextWindow) * 100;
updateWidget();
scheduleWidgetUpdate();
}
}
} catch {}
@@ -415,6 +488,12 @@ export default function (pi: ExtensionAPI) {
proc.stderr!.on("data", () => {});
proc.on("close", (code) => {
clearTimeout(timeout);
if (signal) signal.removeEventListener?.("abort", onAbort);
activeProcesses.delete(proc);
state.proc = undefined;
// Process any remaining buffer
if (buffer.trim()) {
try {
const event = JSON.parse(buffer);
@@ -427,35 +506,45 @@ export default function (pi: ExtensionAPI) {
clearInterval(state.timer);
state.elapsed = Date.now() - startTime;
state.status = code === 0 ? "done" : "error";
// Mark session file as available for resume
const timedOut = state.elapsed >= AGENT_TIMEOUT_MS;
state.status = timedOut ? "error" : (code === 0 ? "done" : "error");
if (code === 0) {
state.sessionFile = agentSessionFile;
}
const full = textChunks.join("");
state.lastWork = full.split("\n").filter((l: string) => l.trim()).pop() || "";
updateWidget();
flushWidgetUpdate();
ctx.ui.notify(
`${displayName(state.def.name)} ${state.status} in ${Math.round(state.elapsed / 1000)}s`,
state.status === "done" ? "success" : "error"
);
const statusMsg = timedOut
? `${displayName(state.def.name)} timed out after ${Math.round(AGENT_TIMEOUT_MS / 1000)}s`
: `${displayName(state.def.name)} ${state.status} in ${Math.round(state.elapsed / 1000)}s`;
resolve({
output: full,
ctx.ui.notify(statusMsg, state.status === "done" ? "success" : "error");
const output = timedOut
? full + "\n\n[TIMED OUT after " + Math.round(AGENT_TIMEOUT_MS / 1000) + "s]"
: full;
safeResolve({
output,
exitCode: code ?? 1,
elapsed: state.elapsed,
});
});
proc.on("error", (err) => {
clearTimeout(timeout);
if (signal) signal.removeEventListener?.("abort", onAbort);
activeProcesses.delete(proc);
state.proc = undefined;
clearInterval(state.timer);
state.status = "error";
state.lastWork = `Error: ${err.message}`;
updateWidget();
resolve({
flushWidgetUpdate();
safeResolve({
output: `Error spawning agent: ${err.message}`,
exitCode: 1,
elapsed: Date.now() - startTime,
@@ -464,18 +553,18 @@ export default function (pi: ExtensionAPI) {
});
}
// ── dispatch_agent Tool (registered at top level) ──
// ── dispatch_agent Tool (single) ─────────────
pi.registerTool({
name: "dispatch_agent",
label: "Dispatch Agent",
description: "Dispatch a task to a specialist agent. The agent will execute the task and return the result. Use the system prompt to see available agent names.",
description: "Dispatch a task to a single specialist agent. The agent executes the task and returns the result. For dispatching multiple agents in parallel, use dispatch_agents instead.",
parameters: Type.Object({
agent: Type.String({ description: "Agent name (case-insensitive)" }),
task: Type.String({ description: "Task description for the agent to execute" }),
}),
async execute(_toolCallId, params, _signal, onUpdate, ctx) {
async execute(_toolCallId, params, signal, onUpdate, ctx) {
const { agent, task } = params as { agent: string; task: string };
try {
@@ -486,7 +575,7 @@ export default function (pi: ExtensionAPI) {
});
}
const result = await dispatchAgent(agent, task, ctx);
const result = await dispatchAgent(agent, task, ctx, signal);
const truncated = result.output.length > 8000
? result.output.slice(0, 8000) + "\n\n... [truncated]"
@@ -534,7 +623,6 @@ export default function (pi: ExtensionAPI) {
return new Text(text?.type === "text" ? text.text : "", 0, 0);
}
// Streaming/partial result while agent is still running
if (options.isPartial || details.status === "dispatching") {
return new Text(
theme.fg("accent", `${details.agent || "?"}`) +
@@ -560,6 +648,120 @@ export default function (pi: ExtensionAPI) {
},
});
// ── dispatch_agents Tool (parallel batch) ────
pi.registerTool({
name: "dispatch_agents",
label: "Dispatch Agents (Parallel)",
description: "Dispatch tasks to multiple specialist agents in parallel. All agents run simultaneously and results are returned together. Much faster than sequential dispatch_agent calls when tasks are independent.",
parameters: Type.Object({
dispatches: Type.Array(
Type.Object({
agent: Type.String({ description: "Agent name (case-insensitive)" }),
task: Type.String({ description: "Task description for the agent" }),
}),
{ description: "Array of {agent, task} pairs to dispatch in parallel", minItems: 1 },
),
}),
async execute(_toolCallId, params, signal, onUpdate, ctx) {
const { dispatches } = params as { dispatches: { agent: string; task: string }[] };
const agentNames = dispatches.map(d => d.agent).join(", ");
if (onUpdate) {
onUpdate({
content: [{ type: "text", text: `Dispatching ${dispatches.length} agents in parallel: ${agentNames}` }],
details: { dispatches, status: "dispatching", count: dispatches.length },
});
}
// Launch all in parallel
const promises = dispatches.map(({ agent, task }) =>
dispatchAgent(agent, task, ctx, signal).then(result => ({
agent,
task,
...result,
}))
);
const results = await Promise.all(promises);
const summaryParts: string[] = [];
const allDetails: any[] = [];
for (const r of results) {
const status = r.exitCode === 0 ? "done" : "error";
const truncated = r.output.length > 4000
? r.output.slice(0, 4000) + "\n... [truncated]"
: r.output;
summaryParts.push(`## [${r.agent}] ${status} in ${Math.round(r.elapsed / 1000)}s\n\n${truncated}`);
allDetails.push({
agent: r.agent,
task: r.task,
status,
elapsed: r.elapsed,
exitCode: r.exitCode,
fullOutput: r.output,
});
}
const doneCount = results.filter(r => r.exitCode === 0).length;
const header = `Parallel dispatch complete: ${doneCount}/${results.length} succeeded`;
return {
content: [{ type: "text", text: `${header}\n\n${summaryParts.join("\n\n---\n\n")}` }],
details: {
dispatches: allDetails,
status: "complete",
count: results.length,
succeeded: doneCount,
},
};
},
renderCall(args, theme) {
const dispatches = (args as any).dispatches || [];
const names = dispatches.map((d: any) => d.agent || "?").join(", ");
return new Text(
theme.fg("toolTitle", theme.bold("dispatch_agents ")) +
theme.fg("accent", `[${dispatches.length}] `) +
theme.fg("muted", names),
0, 0,
);
},
renderResult(result, options, theme) {
const details = result.details as any;
if (!details) {
const text = result.content[0];
return new Text(text?.type === "text" ? text.text : "", 0, 0);
}
if (options.isPartial || details.status === "dispatching") {
return new Text(
theme.fg("accent", `● Parallel dispatch`) +
theme.fg("dim", ` ${details.count || "?"} agents working...`),
0, 0,
);
}
const header = theme.fg("success", `${details.succeeded}`) +
theme.fg("dim", `/${details.count} agents completed`);
if (options.expanded && Array.isArray(details.dispatches)) {
const lines = details.dispatches.map((d: any) => {
const icon = d.status === "done" ? "✓" : "✗";
const color = d.status === "done" ? "success" : "error";
return theme.fg(color, ` ${icon} ${d.agent}`) +
theme.fg("dim", ` ${Math.round(d.elapsed / 1000)}s`);
});
return new Text(header + "\n" + lines.join("\n"), 0, 0);
}
return new Text(header, 0, 0);
},
});
// ── Commands ─────────────────────────────────
pi.registerCommand("agents-team", {
@@ -583,7 +785,7 @@ export default function (pi: ExtensionAPI) {
const idx = options.indexOf(choice);
const name = teamNames[idx];
activateTeam(name);
updateWidget();
flushWidgetUpdate();
ctx.ui.setStatus("agent-team", `Team: ${name} (${agentStates.size})`);
ctx.ui.notify(`Team: ${name}${Array.from(agentStates.values()).map(s => displayName(s.def.name)).join(", ")}`, "info");
},
@@ -619,7 +821,7 @@ export default function (pi: ExtensionAPI) {
if (n >= 1 && n <= 6) {
gridCols = n;
_ctx.ui.notify(`Grid set to ${gridCols} columns`, "info");
updateWidget();
flushWidgetUpdate();
} else {
_ctx.ui.notify("Usage: /agents-grid <1-6>", "error");
}
@@ -629,7 +831,6 @@ export default function (pi: ExtensionAPI) {
// ── System Prompt Override ───────────────────
pi.on("before_agent_start", async (_event, _ctx) => {
// Build dynamic agent catalog from active team only
const agentCatalog = Array.from(agentStates.values())
.map(s => `### ${displayName(s.def.name)}\n**Dispatch as:** \`${s.def.name}\`\n${s.def.description}\n**Tools:** ${s.def.tools}`)
.join("\n\n");
@@ -639,7 +840,7 @@ export default function (pi: ExtensionAPI) {
return {
systemPrompt: `You are a dispatcher agent. You coordinate specialist agents to accomplish tasks.
You do NOT have direct access to the codebase. You MUST delegate all work through
agents using the dispatch_agent tool.
agents using the dispatch_agent or dispatch_agents tools.
## Active Team: ${activeTeamName}
Members: ${teamMembers}
@@ -648,17 +849,20 @@ You can ONLY dispatch to agents listed below. Do not attempt to dispatch to agen
## How to Work
- Analyze the user's request and break it into clear sub-tasks
- Choose the right agent(s) for each sub-task
- Dispatch tasks using the dispatch_agent tool
- **Use dispatch_agents for independent parallel tasks** — this is much faster
- Use dispatch_agent for sequential tasks where order matters
- Review results and dispatch follow-up agents if needed
- If a task fails, try a different agent or adjust the task description
- Summarize the outcome for the user
## Rules
- NEVER try to read, write, or execute code directly — you have no such tools
- ALWAYS use dispatch_agent to get work done
- ALWAYS use dispatch_agent or dispatch_agents to get work done
- **Prefer dispatch_agents when tasks are independent** — parallelism saves time
- You can chain agents: use scout to explore, then builder to implement
- You can dispatch the same agent multiple times with different tasks
- Keep tasks focused — one clear objective per dispatch
- Each agent has a ${Math.round(AGENT_TIMEOUT_MS / 1000)}s timeout — break large tasks into smaller ones
## Agents
@@ -670,7 +874,6 @@ ${agentCatalog}`,
pi.on("session_start", async (_event, _ctx) => {
applyExtensionDefaults(import.meta.url, _ctx);
// Clear widgets from previous session
if (widgetCtx) {
widgetCtx.ui.setWidget("agent-team", undefined);
}
@@ -689,14 +892,12 @@ ${agentCatalog}`,
loadAgents(_ctx.cwd);
// Default to first team — use /agents-team to switch
const teamNames = Object.keys(teams);
if (teamNames.length > 0) {
activateTeam(teamNames[0]);
}
// Lock down to dispatcher-only (tool already registered at top level)
pi.setActiveTools(["dispatch_agent"]);
pi.setActiveTools(["dispatch_agent", "dispatch_agents"]);
_ctx.ui.setStatus("agent-team", `Team: ${activeTeamName} (${agentStates.size})`);
const members = Array.from(agentStates.values()).map(s => displayName(s.def.name)).join(", ");
@@ -708,9 +909,8 @@ ${agentCatalog}`,
`/agents-grid <1-6> Set grid column count`,
"info",
);
updateWidget();
flushWidgetUpdate();
// Footer: model | team | context bar
_ctx.ui.setFooter((_tui, theme, _footerData) => ({
dispose: () => {},
invalidate() {},
@@ -721,9 +921,13 @@ ${agentCatalog}`,
const filled = Math.round(pct / 10);
const bar = "#".repeat(filled) + "-".repeat(10 - filled);
const running = Array.from(agentStates.values()).filter(s => s.status === "running").length;
const runningStr = running > 0 ? theme.fg("accent", `${running} running`) : "";
const left = theme.fg("dim", ` ${model}`) +
theme.fg("muted", " · ") +
theme.fg("accent", activeTeamName);
theme.fg("accent", activeTeamName) +
runningStr;
const right = theme.fg("dim", `[${bar}] ${Math.round(pct)}% `);
const pad = " ".repeat(Math.max(1, width - visibleWidth(left) - visibleWidth(right)));
@@ -731,4 +935,10 @@ ${agentCatalog}`,
},
}));
});
// ── Cleanup on exit ──────────────────────────
process.on("exit", () => killAllAgents());
process.on("SIGINT", () => { killAllAgents(); process.exit(0); });
process.on("SIGTERM", () => { killAllAgents(); process.exit(0); });
}