import * as tg from "./telegram.js"; import * as store from "./store.js"; import { runAgent, isWaitingForUser, resolveUserResponse } from "./agent-worker.js"; // Track which agents are running (in-process) const runningAgents = new Set(); // Track "talk mode" โ€” chatId -> agentId they're talking to const talkMode = new Map(); const STATUS_EMOJI: Record = { spawning: "๐Ÿ”„", working: "โšก", waiting: "โ“", done: "โœ…", error: "โŒ", killed: "๐Ÿ›‘", }; async function handleMessage(msg: NonNullable) { const chatId = msg.chat.id; const text = (msg.text || "").trim(); if (!tg.isAllowed(chatId)) return; // Check if user is in talk mode with an agent if (talkMode.has(chatId) && !text.startsWith("/")) { const agentId = talkMode.get(chatId)!; talkMode.delete(chatId); if (isWaitingForUser(agentId)) { resolveUserResponse(agentId, text); await tg.send(`๐Ÿ’ฌ Sent to Agent #${agentId}`, chatId); } else { // Agent is working but user wants to interject โ€” add as follow-up // For now just queue it store.addLog(agentId, "user", text); await tg.send( `๐Ÿ“ Noted for Agent #${agentId}. It's currently working โ€” your message will be seen when it next checks.`, chatId ); } return; } // Check if this is a reply to an agent thread if (msg.reply_to_message && !text.startsWith("/")) { const replyToId = msg.reply_to_message.message_id; const agent = store.findAgentByThreadMsg(replyToId); if (agent && isWaitingForUser(agent.id)) { resolveUserResponse(agent.id, text); await tg.send(`๐Ÿ’ฌ Sent to Agent #${agent.id}`, chatId); return; } // Also check all agents for this chat โ€” find closest thread const agents = store.listAgents(String(chatId)); for (const a of agents) { if (a.thread_msg_id === replyToId && isWaitingForUser(a.id)) { resolveUserResponse(a.id, text); await tg.send(`๐Ÿ’ฌ Sent to Agent #${a.id}`, chatId); return; } } } // Commands if (text.startsWith("/new ") || text.startsWith("/new@")) { const task = text.replace(/^\/new(@\w+)?\s*/, "").trim(); if (!task) { await tg.send("Usage: `/new `", chatId); return; } await spawnAgent(chatId, task); return; } if (text === "/board" || text.startsWith("/board@")) { await showBoard(chatId); return; } if (text.startsWith("/kill ") || text.startsWith("/kill@")) { const idStr = text.replace(/^\/kill(@\w+)?\s*/, "").trim(); const id = parseInt(idStr); if (isNaN(id)) { await tg.send("Usage: `/kill `", chatId); return; } await killAgent(chatId, id); return; } if (text.startsWith("/logs ") || text.startsWith("/logs@")) { const idStr = text.replace(/^\/logs(@\w+)?\s*/, "").trim(); const id = parseInt(idStr); if (isNaN(id)) { await tg.send("Usage: `/logs `", chatId); return; } await showLogs(chatId, id); return; } if (text.startsWith("/talk ") || text.startsWith("/talk@")) { const idStr = text.replace(/^\/talk(@\w+)?\s*/, "").trim(); const id = parseInt(idStr); if (isNaN(id)) { await tg.send("Usage: `/talk `", chatId); return; } talkMode.set(chatId, id); await tg.send( `๐Ÿ’ฌ You're now talking to *Agent #${id}*. Send your message (or /cancel):`, chatId ); return; } if (text === "/cancel" || text.startsWith("/cancel@")) { if (talkMode.has(chatId)) { talkMode.delete(chatId); await tg.send("Cancelled talk mode.", chatId); } return; } if (text === "/clear" || text.startsWith("/clear@")) { await clearChat(chatId, msg.message_id); return; } if (text === "/help" || text === "/start" || text.startsWith("/help@") || text.startsWith("/start@")) { await tg.send( `๐Ÿค– *Agent Orchestrator* Commands: /new โ€” Spawn a new agent /board โ€” View all agents /logs โ€” Agent activity log /talk โ€” Send message to agent /kill โ€” Stop an agent /clear โ€” Clear chat messages /help โ€” This message Or *reply* to any agent message to talk to it directly.`, chatId ); return; } // Unknown if (text.startsWith("/")) { await tg.send("Unknown command. Try /help", chatId); } } async function handleCallback(cb: NonNullable) { const chatId = cb.message?.chat?.id; if (!chatId || !tg.isAllowed(chatId)) return; const data = cb.data || ""; await tg.answerCallback(cb.id); if (data.startsWith("logs_")) { const id = parseInt(data.replace("logs_", "")); await showLogs(chatId, id); } else if (data.startsWith("talk_")) { const id = parseInt(data.replace("talk_", "")); talkMode.set(chatId, id); await tg.send( `๐Ÿ’ฌ Talking to *Agent #${id}*. Send your message:`, chatId ); } else if (data.startsWith("kill_")) { const id = parseInt(data.replace("kill_", "")); await killAgent(chatId, id); } } async function spawnAgent(chatId: number, task: string) { const agent = store.createAgent(task, String(chatId)); // Send initial message and save its ID for threading const res = await tg.send( `๐Ÿค– *Agent #${agent.id}* spawned\n*Task:* ${task}\n*Status:* spawning...`, chatId, { keyboard: tg.agentKeyboard(agent.id) } ); if (res?.result?.message_id) { store.updateAgent(agent.id, { thread_msg_id: res.result.message_id }); } // Run agent in background (non-blocking) runningAgents.add(agent.id); runAgent(agent.id) .catch((e) => console.error(`[agent ${agent.id}] fatal:`, e)) .finally(() => runningAgents.delete(agent.id)); } async function showBoard(chatId: number) { const agents = store.listAgents(String(chatId)); if (agents.length === 0) { await tg.send("No agents yet. Use `/new ` to spawn one.", chatId); return; } let board = "๐Ÿ“‹ *Agent Board*\n\n"; board += "```\n"; board += " # | Status | Task\n"; board += "----|--------|---------------------------\n"; for (const a of agents.slice(0, 20)) { const emoji = STATUS_EMOJI[a.status] || "โ”"; const taskShort = a.task.length > 30 ? a.task.slice(0, 27) + "..." : a.task; board += ` ${String(a.id).padStart(2)} | ${emoji} ${a.status.padEnd(4).slice(0, 4)} | ${taskShort}\n`; } board += "```\n"; // Show summaries for done agents const done = agents.filter((a) => a.summary); if (done.length > 0) { board += "\n*Completed:*\n"; for (const a of done.slice(0, 5)) { board += `โ€ข #${a.id}: ${a.summary}\n`; } } await tg.send(board, chatId); } async function killAgent(chatId: number, agentId: number) { const agent = store.getAgent(agentId); if (!agent) { await tg.send(`Agent #${agentId} not found.`, chatId); return; } if (agent.chat_id !== String(chatId)) { await tg.send(`Agent #${agentId} doesn't belong to you.`, chatId); return; } if (agent.status === "done" || agent.status === "killed") { await tg.send(`Agent #${agentId} is already ${agent.status}.`, chatId); return; } store.updateAgent(agentId, { status: "killed", summary: "Killed by user" }); store.addLog(agentId, "system", "Killed by user"); // If waiting for user, resolve with cancellation if (isWaitingForUser(agentId)) { resolveUserResponse(agentId, "[USER CANCELLED THIS AGENT]"); } await tg.send(`๐Ÿ›‘ Agent #${agentId} killed.`, chatId); } async function showLogs(chatId: number, agentId: number) { const agent = store.getAgent(agentId); if (!agent) { await tg.send(`Agent #${agentId} not found.`, chatId); return; } const logs = store.getLogs(agentId, 15); if (logs.length === 0) { await tg.send(`No logs for Agent #${agentId}.`, chatId); return; } let text = `๐Ÿ“‹ *Logs โ€” Agent #${agentId}*\n_${agent.task}_\n\n`; for (const log of logs.reverse()) { const role = log.role.toUpperCase().padEnd(6).slice(0, 6); const content = log.content.length > 150 ? log.content.slice(0, 147) + "..." : log.content; text += `\`${role}\` ${content}\n\n`; } await tg.send(text, chatId, { keyboard: tg.agentKeyboard(agentId) }); } async function clearChat(chatId: number, commandMsgId: number) { // Delete the /clear command message itself first await tg.deleteMessage(chatId, commandMsgId); // Telegram only allows deleting messages less than 48h old. // We walk backwards from the command message ID, trying to delete recent messages. const statusMsg = await tg.send("๐Ÿงน Clearing chat...", chatId); const statusMsgId: number | undefined = statusMsg?.result?.message_id; let deleted = 0; let misses = 0; const MAX_MISSES = 10; // stop after 10 consecutive failures (hit old messages or gap) // Walk backwards from the /clear message for (let id = commandMsgId - 1; id > 0 && misses < MAX_MISSES; id--) { const ok = await tg.deleteMessage(chatId, id); if (ok) { deleted++; misses = 0; } else { misses++; } } // Delete the status message too, then send a clean confirmation if (statusMsgId) await tg.deleteMessage(chatId, statusMsgId); await tg.send(`๐Ÿงน Cleared ${deleted} messages.`, chatId); } // --- Main loop --- async function main() { console.log("๐Ÿค– Telegram Agent Orchestrator starting..."); console.log(` Polling for updates...`); await tg.send("๐Ÿค– Agent Orchestrator is online! Send /help to get started."); while (true) { try { const updates = await tg.poll(); for (const update of updates) { if (update.message) { handleMessage(update.message).catch((e) => console.error("[handle msg]", e) ); } if (update.callback_query) { handleCallback(update.callback_query).catch((e) => console.error("[handle cb]", e) ); } } } catch (e) { console.error("[main loop]", e); await new Promise((r) => setTimeout(r, 5000)); } } } main();