@persistent-ai/fireflow-agui
Server-side implementation of the AG-UI protocol for FireFlow — the bridge between a running flow execution and an agent chat client.
What it is
AG-UI is a transport-agnostic, event-driven streaming protocol for bidirectional agent ↔ client communication. This package is FireFlow's server half of that protocol: it turns a flow execution into a conversational agent. A client POSTs a RunAgentInput, the package starts (or reconnects to) a FireFlow execution, and streams the execution's AG-UI events back over SSE or WebSocket. It does not run flows itself — that happens in @persistent-ai/fireflow-executor. This package routes requests, owns the conversation/run bookkeeping, and forwards the execution's agui:events DBOS stream to the client.
The package is intentionally decoupled from the executor and tRPC internals: it depends on those services only through structural interfaces (AgUIExecutionStore, AgUITaskQueue, AgUIEventBus, AgUIAuthService, AgUIThreadRunStore, ...) injected via AgUIHandlerDependencies. The execution-api passes its concrete instances, which satisfy these shapes.
How it works
FireFlowServerAgent
FireFlowServerAgent extends AbstractAgent from @ag-ui/client and implements run(input): Observable<BaseEvent>. Both the SSE handler and the WebSocket handler construct an instance per request and subscribe to it. Its executeRun() derives a deterministic execution ID from (agentId, userId, threadId, runId) and then takes one of three routes:
| Condition | Route | Behavior |
|---|---|---|
| No existing execution + empty input | Subscribe-to-latest | Page load / auto-connect. Finds the latest run for the thread (falling back to the previous completed run if the latest failed/stopped), replays its agui:events history one-shot, and closes. No execution is created. |
| No existing execution + has input | New run | Inserts a thread_runs row (resolving the parent run for chaining), creates the execution row with the AG-UI payload as an external event, and publishes a task to the queue to start the DBOS workflow. |
| Existing execution | Reconnect | Verifies ownership and streams that execution's events (one-shot replay if already terminal, otherwise live). |
After routing it emits RUN_STARTED (carrying the executionId), then concurrently forwards the agui:events stream and watches lifecycle events. When the flow reaches flow:completed / flow:failed / flow:cancelled, it emits a terminal RUN_FINISHED or RUN_ERROR (synthesizing one if the flow did not emit its own) and updates the thread_runs status.
Transport teardown (page reload, navigation) aborts only the local stream — it deliberately does not cancel the DBOS workflow, so the execution survives and can be reconnected to. Only the explicit /cancel endpoint cancels the workflow.
Event translation
The execution emits AG-UI events into a DBOS stream keyed agui:events; the agent forwards them verbatim to the subscriber. The package also ships pure reducers that fold a flat event array back into the protocol's higher-level views — used for history reconstruction and client state:
eventsToMessages(events)— rebuilds the conversation message list, with provider-aware handling of native assistant content blocks (server tool use / results, reasoning, MCP, etc.).eventsToState(events)— appliesSTATE_SNAPSHOT/STATE_DELTAto a final state object.compactEvents(events)— collapses streaming chunk events into compact form.
A2UI reducer
A2UI (Google's declarative generative-UI model, via @a2ui/web_core) rides inside AG-UI ACTIVITY_SNAPSHOT / ACTIVITY_DELTA events under activityType === 'a2ui', whose content.messages are A2UI ServerToClientMessages. The reducer feeds those messages to a stateful A2uiMessageProcessor and exposes the resulting named UI surfaces:
processA2UIActivityEvent(event)— incremental: filters one event, processes its messages, returns a fresh surfaces snapshot (ornullif not A2UI).eventsToSurfaces(events)— folds a full event array into a surfaces map.getA2UISurfaces()/getA2UIProcessor()/resetA2UIProcessor()— access and reset the shared processor.
Thread / run store
AgUIThreadRunStore (PostgreSQL impl PgThreadRunStore) indexes the chain of runs per conversation thread in fireflow_agui_thread_runs. Each row reuses the DBOS execution ID as its primary key and links to its parent via parentRunId / parentExecutionId, forming a linked list per thread. This is how parents are resolved when chaining a new run, how "latest run" is found for auto-connect, and how the thread list and per-thread history endpoints answer queries. Flows receive parentRunId + parentExecutionId in the run payload so they can read the previous execution's stream to reconstruct prior conversation.
SSE vs WebSocket
createAgUIHandler(deps) returns a Node.js HTTP handler mounting these routes:
| Route | Method | Purpose |
|---|---|---|
/ag-ui/:agentId/run | POST | Validate RunAgentInput, authenticate, run/subscribe/reconnect, stream events as SSE via @ag-ui/encoder. |
/ag-ui/:agentId/resume | POST | Send a tool result { executionId, toolCallId, content } to a waiting DBOS workflow (topic derived from toolCallId). Requires a DBOS client. |
/ag-ui/:agentId/cancel | POST | Cancel a running execution's DBOS workflow. |
/ag-ui/:agentId/capabilities | GET | Dynamic agent capability discovery. |
/ag-ui/:agentId/threads | GET | List conversation threads for the agent + user. |
/ag-ui/:agentId/threads/:threadId | GET | Get a thread's history (messages + state). |
createAgUIWSHandler(deps) provides the WebSocket alternative for the run/resume/cancel/capabilities loop, using a small typed message envelope (auth → run / resume / cancel / capabilities, with event / error / run_started server messages). Both transports drive the same FireFlowServerAgent; SSE encodes events through EventEncoder, WS wraps each event in an { type: 'event', event } message.
Exports
| Entry | Environment | Contents |
|---|---|---|
. | Client + Server | Protocol-adjacent helpers and types: A2UI reducer (eventsToSurfaces, processA2UIActivityEvent, ...) and A2UI types, event reducers (compactEvents, eventsToMessages, eventsToState), and shared config / WebSocket message types. |
./server | Server only | FireFlowServerAgent, FireFlowAgentSubscriber, deriveExecutionId, createAgUIHandler (SSE + REST routes), createAgUIWSHandler (WebSocket), and the dependency/store interfaces (AgUIHandlerDependencies, AgUIExecutionStore, ...). |
./client | Client | FireFlowAgent (client AbstractAgent with dual SSE/WS transport, used with CopilotKit), FireFlowToolSubscriber, ThreadsClient, and capability/thread/config types. |
./db | Server | PgThreadRunStore, the aguiThreadRunsTable Drizzle schema (fireflow_agui_thread_runs), the AgUIThreadRunStore interface, and row/summary types. |
Database schema
fireflow_agui_thread_runs — one row per agent run within a thread:
| Column | Notes |
|---|---|
execution_id (PK) | Reuses the DBOS execution ID |
agent_id, user_id, thread_id, run_id | Thread/run identity |
parent_run_id, parent_execution_id | Chain links (null for the first run) |
status | running / completed / failed / cancelled (default running) |
created_at, completed_at | Timestamps |
Indexed for "all runs for a thread, ordered", "latest run for a thread", and a uniqueness constraint of one run_id per (agent_id, thread_id).
Related
@persistent-ai/fireflow-agui-react— the React integration (FireFlowProvider, Effector stores, hooks, optional CopilotKit bridge) that consumes this server.@persistent-ai/fireflow-executor— runs the flow whose events this package streams; the execution-api wires the injected dependencies.- AG-UI protocol: https://ag-ui.dev. See also the
agui-protocol,agui-fireflow, andagui-sdkskills, anddocs/developer/agui/.
License
Business Source License 1.1 (BUSL-1.1) — see LICENSE.txt