Skip to content

@persistent-ai/fireflow-agui

Server-side implementation of the AG-UI protocol for FireFlow — the bridge between a running flow execution and an agent chat client.

What it is

AG-UI is a transport-agnostic, event-driven streaming protocol for bidirectional agent ↔ client communication. This package is FireFlow's server half of that protocol: it turns a flow execution into a conversational agent. A client POSTs a RunAgentInput, the package starts (or reconnects to) a FireFlow execution, and streams the execution's AG-UI events back over SSE or WebSocket. It does not run flows itself — that happens in @persistent-ai/fireflow-executor. This package routes requests, owns the conversation/run bookkeeping, and forwards the execution's agui:events DBOS stream to the client.

The package is intentionally decoupled from the executor and tRPC internals: it depends on those services only through structural interfaces (AgUIExecutionStore, AgUITaskQueue, AgUIEventBus, AgUIAuthService, AgUIThreadRunStore, ...) injected via AgUIHandlerDependencies. The execution-api passes its concrete instances, which satisfy these shapes.

How it works

FireFlowServerAgent

FireFlowServerAgent extends AbstractAgent from @ag-ui/client and implements run(input): Observable<BaseEvent>. Both the SSE handler and the WebSocket handler construct an instance per request and subscribe to it. Its executeRun() derives a deterministic execution ID from (agentId, userId, threadId, runId) and then takes one of three routes:

ConditionRouteBehavior
No existing execution + empty inputSubscribe-to-latestPage load / auto-connect. Finds the latest run for the thread (falling back to the previous completed run if the latest failed/stopped), replays its agui:events history one-shot, and closes. No execution is created.
No existing execution + has inputNew runInserts a thread_runs row (resolving the parent run for chaining), creates the execution row with the AG-UI payload as an external event, and publishes a task to the queue to start the DBOS workflow.
Existing executionReconnectVerifies ownership and streams that execution's events (one-shot replay if already terminal, otherwise live).

After routing it emits RUN_STARTED (carrying the executionId), then concurrently forwards the agui:events stream and watches lifecycle events. When the flow reaches flow:completed / flow:failed / flow:cancelled, it emits a terminal RUN_FINISHED or RUN_ERROR (synthesizing one if the flow did not emit its own) and updates the thread_runs status.

Transport teardown (page reload, navigation) aborts only the local stream — it deliberately does not cancel the DBOS workflow, so the execution survives and can be reconnected to. Only the explicit /cancel endpoint cancels the workflow.

Event translation

The execution emits AG-UI events into a DBOS stream keyed agui:events; the agent forwards them verbatim to the subscriber. The package also ships pure reducers that fold a flat event array back into the protocol's higher-level views — used for history reconstruction and client state:

  • eventsToMessages(events) — rebuilds the conversation message list, with provider-aware handling of native assistant content blocks (server tool use / results, reasoning, MCP, etc.).
  • eventsToState(events) — applies STATE_SNAPSHOT / STATE_DELTA to a final state object.
  • compactEvents(events) — collapses streaming chunk events into compact form.

A2UI reducer

A2UI (Google's declarative generative-UI model, via @a2ui/web_core) rides inside AG-UI ACTIVITY_SNAPSHOT / ACTIVITY_DELTA events under activityType === 'a2ui', whose content.messages are A2UI ServerToClientMessages. The reducer feeds those messages to a stateful A2uiMessageProcessor and exposes the resulting named UI surfaces:

  • processA2UIActivityEvent(event) — incremental: filters one event, processes its messages, returns a fresh surfaces snapshot (or null if not A2UI).
  • eventsToSurfaces(events) — folds a full event array into a surfaces map.
  • getA2UISurfaces() / getA2UIProcessor() / resetA2UIProcessor() — access and reset the shared processor.

Thread / run store

AgUIThreadRunStore (PostgreSQL impl PgThreadRunStore) indexes the chain of runs per conversation thread in fireflow_agui_thread_runs. Each row reuses the DBOS execution ID as its primary key and links to its parent via parentRunId / parentExecutionId, forming a linked list per thread. This is how parents are resolved when chaining a new run, how "latest run" is found for auto-connect, and how the thread list and per-thread history endpoints answer queries. Flows receive parentRunId + parentExecutionId in the run payload so they can read the previous execution's stream to reconstruct prior conversation.

SSE vs WebSocket

createAgUIHandler(deps) returns a Node.js HTTP handler mounting these routes:

RouteMethodPurpose
/ag-ui/:agentId/runPOSTValidate RunAgentInput, authenticate, run/subscribe/reconnect, stream events as SSE via @ag-ui/encoder.
/ag-ui/:agentId/resumePOSTSend a tool result { executionId, toolCallId, content } to a waiting DBOS workflow (topic derived from toolCallId). Requires a DBOS client.
/ag-ui/:agentId/cancelPOSTCancel a running execution's DBOS workflow.
/ag-ui/:agentId/capabilitiesGETDynamic agent capability discovery.
/ag-ui/:agentId/threadsGETList conversation threads for the agent + user.
/ag-ui/:agentId/threads/:threadIdGETGet a thread's history (messages + state).

createAgUIWSHandler(deps) provides the WebSocket alternative for the run/resume/cancel/capabilities loop, using a small typed message envelope (authrun / resume / cancel / capabilities, with event / error / run_started server messages). Both transports drive the same FireFlowServerAgent; SSE encodes events through EventEncoder, WS wraps each event in an { type: 'event', event } message.

Exports

EntryEnvironmentContents
.Client + ServerProtocol-adjacent helpers and types: A2UI reducer (eventsToSurfaces, processA2UIActivityEvent, ...) and A2UI types, event reducers (compactEvents, eventsToMessages, eventsToState), and shared config / WebSocket message types.
./serverServer onlyFireFlowServerAgent, FireFlowAgentSubscriber, deriveExecutionId, createAgUIHandler (SSE + REST routes), createAgUIWSHandler (WebSocket), and the dependency/store interfaces (AgUIHandlerDependencies, AgUIExecutionStore, ...).
./clientClientFireFlowAgent (client AbstractAgent with dual SSE/WS transport, used with CopilotKit), FireFlowToolSubscriber, ThreadsClient, and capability/thread/config types.
./dbServerPgThreadRunStore, the aguiThreadRunsTable Drizzle schema (fireflow_agui_thread_runs), the AgUIThreadRunStore interface, and row/summary types.

Database schema

fireflow_agui_thread_runs — one row per agent run within a thread:

ColumnNotes
execution_id (PK)Reuses the DBOS execution ID
agent_id, user_id, thread_id, run_idThread/run identity
parent_run_id, parent_execution_idChain links (null for the first run)
statusrunning / completed / failed / cancelled (default running)
created_at, completed_atTimestamps

Indexed for "all runs for a thread, ordered", "latest run for a thread", and a uniqueness constraint of one run_id per (agent_id, thread_id).

  • @persistent-ai/fireflow-agui-react — the React integration (FireFlowProvider, Effector stores, hooks, optional CopilotKit bridge) that consumes this server.
  • @persistent-ai/fireflow-executor — runs the flow whose events this package streams; the execution-api wires the injected dependencies.
  • AG-UI protocol: https://ag-ui.dev. See also the agui-protocol, agui-fireflow, and agui-sdk skills, and docs/developer/agui/.

License

Business Source License 1.1 (BUSL-1.1) — see LICENSE.txt


View source on GitHub →

Licensed under BUSL-1.1