PersistentAI API Documentation / @persistentai/fireflow-executor
@persistentai/fireflow-executor
Durable execution engine for PersistentAI's flow-based programming framework. Orchestrates flow executions with exactly-once semantics via DBOS, real-time event streaming over PostgreSQL LISTEN/NOTIFY, and distributed worker coordination.
Overview
This package is the execution core of PersistentAI. It takes a flow definition (a computational graph of nodes and edges) and executes it durably -- meaning that if a worker crashes mid-execution, DBOS automatically resumes from the last checkpoint with no data loss.
Three export paths serve different consumers:
| Export | Consumer | Purpose |
|---|---|---|
./server | fireflow-execution-api, fireflow-execution-worker | DBOS workflows, services, tRPC router, stores |
./client | fireflow-frontend | React tRPC hooks, WebSocket client |
./types | All consumers | ExecutionStatus, ExecutionTask, shared interfaces |
Key properties:
- Exactly-once execution via DBOS workflow IDs and deduplication
- Automatic crash recovery via PostgreSQL-backed checkpoints
- PostgreSQL-only (no Kafka dependency in DBOS mode)
- Signal pattern prevents race conditions between subscription and execution start
- Child execution spawning from Event Emitter nodes (up to depth 100)
Architecture
System Diagram
┌─────────────────────────┐
│ Frontend (React) │
│ @.../executor/client │
│ │
│ WebSocket subscription─┼───────────┐
│ tRPC mutations ────────┼──────┐ │
└─────────────────────────┘ │ │
│ │
▼ │
┌─────────────────────────────────────┴─────────┐
│ Execution API (tRPC Server) │
│ @.../executor/server (API mode) │
│ │
│ - createServicesForAPI() -- NO DBOS runtime │
│ - APITaskQueue enqueues via DBOSClient │
│ - DBOSClient.send() for START_SIGNAL │
│ - StreamBridge for event subscriptions │
└───────────────────┬────────────────────────────┘
│ PostgreSQL (DBOS durable queue)
▼
┌────────────────────────────────────────────────┐
│ Execution Worker (DBOS Runtime) │
│ @.../executor/server (Worker mode) │
│ │
│ - createServicesForWorker() -- full DBOS │
│ - ExecutionWorkflows.executeFireFlow() │
│ - DBOS.writeStream() → PostgreSQL │
│ - Spawns child executions │
└───────────────────┬────────────────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│ PostgreSQL │
│ - DBOS system tables (workflow checkpoints) │
│ - fireflow_executions (execution records) │
│ - DBOS streams (real-time event delivery) │
│ - LISTEN/NOTIFY (push notifications) │
└────────────────────────────────────────────────┘Execution Lifecycle (3-Phase Workflow)
The core workflow lives in server/dbos/workflows/ExecutionWorkflows.ts. Every flow execution follows three phases:
┌─────────────────────────────────────────────────────────────┐
│ Phase 1: Stream Initialization │
│ │
│ 1. Load execution row from database │
│ 2. Write EXECUTION_CREATED event (index -1) │
│ └─ Creates the DBOS stream immediately │
│ 3. Wait for START_SIGNAL via DBOS.recv() │
│ ├─ Parent executions: 5-minute timeout │
│ └─ Child executions: skip wait (auto-start) │
├─────────────────────────────────────────────────────────────┤
│ Phase 2: Flow Execution │
│ │
│ 1. updateToRunning() ─── durable checkpoint │
│ 2. Load flow via FlowCachedLoader (30min TTL) │
│ 3. Create services: DBOSContextService, VFS, VFSWrite │
│ 4. Create ExecutionContext with services │
│ 5. Create SPTreeExecutionEngine │
│ 6. Subscribe engine events → buffer → DBOS.writeStream() │
│ 7. engine.execute() ─── runs at WORKFLOW level │
│ 8. Flush event buffer, collect child tasks │
├─────────────────────────────────────────────────────────────┤
│ Phase 3: Child Spawning │
│ │
│ 1. Collect emitted events from context.emittedEvents │
│ 2. Generate child execution IDs (wrapped in steps) │
│ 3. Create child execution rows in database │
│ 4. Parallel spawn via DBOS.startWorkflow() + allSettled │
│ 5. Await all child results with Promise.allSettled() │
│ 6. updateToCompleted() ─── final durable checkpoint │
└─────────────────────────────────────────────────────────────┘Why execution runs at workflow level (not inside a step): Nodes need access to DBOS primitives via context.services.dbos.runStep(). Running inside a step would prevent nodes from creating their own durable checkpoints.
Signal Pattern
Solves the race condition where a client subscribes to events before the execution stream exists:
1. create() → Workflow starts → writes EXECUTION_CREATED → stream exists
2. subscribe() → Stream already exists → gets EXECUTION_CREATED immediately
3. start() → Sends START_SIGNAL → workflow continues to Phase 2Without this pattern, the client could subscribe to a nonexistent stream and miss the initial event.
Event Streaming Pipeline
ExecutionEngine
│ (synchronous events)
▼
In-memory buffer (pendingEvents[])
│ (async drain by long-running DBOS step)
▼
DBOS.writeStream('events', {...})
│ (persisted in PostgreSQL)
▼
PostgreSQL NOTIFY on channel 'ds_{workflowId}_{streamKey}'
│
▼
PGListenerPool (10 pg-listen connections)
│ Hash-based routing: FNV-1a(streamId) % 10
│ Fallback: least-loaded if listener at capacity
▼
StreamBridge → DBOSEventBus.subscribeToEvents()
│ Batching: max 100 events, 25ms timeout
▼
tRPC subscription (async generator)
│
▼
WebSocket → FrontendPool configuration (from server/implementations/dbos/streaming/types.ts):
| Parameter | Value | Description |
|---|---|---|
| Pool size | 10 | PGListener instances |
| Max streams/listener | 1000 | Before routing to next listener |
| Strategy | hash | Consistent routing via FNV-1a |
| Health check | 30s | Connection verification interval |
| Batch size | 100 | Max events per batch to consumers |
| Batch timeout | 25ms | Max wait before flushing batch |
| Channel prefix | ds_ | Fits PostgreSQL 63-byte NAMEDATALEN |
Package Exports
./server
Exports from server/index.ts:
DBOS Integration:
ExecutionWorkflows-- DBOS workflow class withexecuteFireFlow()DBOSExecutionWorker-- Worker lifecycle managementexecutionQueue,QUEUE_NAME-- Module-level workflow queueinitializeDBOS,shutdownDBOS-- DBOS lifecycleinitializeUpdateStatusSteps-- Inject store into durable stepsupdateToRunning,updateToCompleted,updateToFailed-- Durable status stepsCommandController(type) -- Shared command state interface
Implementations:
DBOSEventBus-- DBOS stream-based event pub/subDBOSTaskQueue-- Worker-mode task queue (directDBOS.startWorkflow())APITaskQueue-- API-mode task queue (enqueue viaDBOSClient)InMemoryEventBus,InMemoryTaskQueue-- Local development implementations
Services:
ExecutionService-- Main execution orchestratorRecoveryService-- Failed execution recoverycreateServices()-- Auto-detect initializationcreateServicesForAPI()-- API-only mode (no DBOS runtime)createServicesForWorker()-- Worker mode (full DBOS runtime)closeServices(),getServices()-- Lifecycle managementServiceInstances(type) -- Service container interface
Stores:
PostgresExecutionStore-- PostgreSQL execution persistencegetExecutionStore(),getFlowStore(),loadFlow()-- Store accessorsIExecutionStore(type) -- Store interfaceexecutionsTable,executionClaimsTable,executionRecoveryTable-- Drizzle schemasExecutionRow,ExecutionClaimRow,ExecutionRecoveryRow(types) -- Row types
tRPC:
executionRouter-- All execution procedurescreateTRPCContext-- Auth + services injectionExecutionRouter,TRPCContext,ExecutorContext(types)
Utilities:
config-- Environment configuration objectcreateLogger()-- Pino logger factorygetDatabaseMain(),getDatabaseExecutions()-- PostgreSQL poolscloseDatabaseMain(),closeDatabaseExecutions()-- Pool cleanupcreateWSServer()-- WebSocket server for tRPCExecutionMode(type) --'local' | 'dbos'
./client
Exports from client/index.ts:
| Export | Type | Description |
|---|---|---|
trpcReact | TRPCReact | tRPC React hooks instance |
createTRPCClient(opts) | Function | WebSocket client (keep-alive 5s, pong timeout 3s) |
TRPCProvider | React Component | Context provider wrapping tRPC + QueryClient |
useTRPC() | Hook | Access tRPC proxy in components |
useTRPCClient() | Hook | Access raw tRPC client |
getExecutorQueryClient() | Function | TanStack Query client singleton (staleTime: 1000ms) |
ReactQueryOptions | Type | Inferred React Query options |
RouterInputs | Type | Inferred router input types |
RouterOutputs | Type | Inferred router output types |
TRPCClient | Type | tRPC client type |
./types
Exports from types/:
ExecutionStatus (enum): Idle | Creating | Created | Running | Paused | Completed | Failed | Stopped
ExecutionCommandType (enum): CREATE | START | STOP | PAUSE | STEP | RESUME | HEARTBEAT
Interfaces:
ExecutionTask-- Task payload for queue (executionId, flowId, debug, retry config)ExecutionCommand-- Idempotent lifecycle command with payloadExecutionInstance-- Runtime state (task, row, context, flow, engine)ExecutionClaim-- Worker claim with heartbeat trackingExecutionTreeNode-- Hierarchical execution node (id, parentId, level)RootExecution-- Root execution with nested depth/countExecutionError-- Error with optional nodeId and stackExecutionEventMessage-- Event with executionId, timestamp, workerIdRetryHistoryEntry-- Retry attempt tracking
Service Factory & Initialization
Three initialization paths in server/services/ServiceFactory.ts:
| Function | Use Case | DBOS Runtime | Task Queue | Event Bus |
|---|---|---|---|---|
createServicesForAPI() | Execution API server | No | APITaskQueue (DBOSClient) | DBOSEventBus |
createServicesForWorker() | Execution worker | Yes (full) | DBOSTaskQueue | DBOSEventBus |
createServices() | General (auto-detect) | If ENABLE_DBOS_EXECUTION=true | Auto | Auto |
All factories are singletons -- calling them multiple times returns the same instance.
ServiceInstances Interface
interface ServiceInstances {
eventBus: IEventBus
taskQueue: ITaskQueue
executionStore: IExecutionStore
executionService: ExecutionService
flowStore: IFlowStore
authService: AuthService
ownershipResolver: IOwnershipResolver
dbosWorker?: DBOSExecutionWorker // Only in worker mode
dbosClient?: DBOSClient // Only in API mode
}Initialization Order (Critical)
For worker mode, the queue MUST be created at module level BEFORE DBOS.launch():
1. Import queue.ts → WorkflowQueue created at module level
2. Import ExecutionWorkflows → @DBOS.workflow() registered
3. Call initializeDBOS() → DBOS.launch() picks up queue + workflow
4. Create StreamBridge → 10 PGListeners for event streaming
5. Create DBOSEventBus → Wraps StreamBridge
6. initializeUpdateStatusSteps() → Inject store into durable steps
7. Create remaining servicesIf the queue is created AFTER DBOS.launch(), the worker can enqueue but cannot dequeue -- tasks will pile up indefinitely.
tRPC API Reference
All procedures are defined in server/trpc/router.ts.
Execution Lifecycle
| Procedure | Type | Auth | Input | Description |
|---|---|---|---|---|
create | Mutation | authed | { flowId, options?, integration?, events? } | Creates execution row + starts DBOS workflow (waits for signal) |
start | Mutation | executionContext | { executionId } | Sends START_SIGNAL to waiting workflow |
stop | Mutation | executionContext | { executionId, reason? } | Cancels DBOS workflow via cancelWorkflow() |
pause | Mutation | executionContext | { executionId, reason? } | Sends PAUSE command via DBOS messaging (debug mode) |
resume | Mutation | executionContext | { executionId } | Sends RESUME command via DBOS messaging (debug mode) |
Queries
| Procedure | Type | Auth | Input | Description |
|---|---|---|---|---|
getExecutionDetails | Query | executionContext | { executionId } | Returns full ExecutionRow |
getExecutionsTree | Query | executionContext | { executionId } | Returns parent + all child executions |
getRootExecutions | Query | authed | { flowId, limit, after? } | Paginated root executions for a flow |
Subscriptions (WebSocket)
| Procedure | Type | Auth | Input | Description |
|---|---|---|---|---|
subscribeToExecutionEvents | Subscription | executionContext | { executionId, fromIndex?, eventTypes?, batchSize?, batchTimeoutMs? } | Real-time execution event stream |
subscribeToPortStream | Subscription | executionContext | { executionId, workflowId, streamKey, fromOffset?, batchSize?, batchTimeoutMs? } | Port-level data stream |
DBOS Bridge (Mini-App)
Flow-scoped access to DBOS primitives for external mini-apps:
| Procedure | Type | Auth | Input | Description |
|---|---|---|---|---|
getEvent | Query | flowContext | { flowId, workflowId, key, timeoutSeconds? } | Get DBOS event value |
sendMessage | Mutation | flowContext | { flowId, workflowId, message, topic? } | Send message to workflow |
getWorkflowStatus | Query | flowContext | { flowId, workflowId } | Get DBOS workflow status |
subscribeToStream | Subscription | flowContext | { flowId, workflowId, streamKey, fromOffset?, batchSize?, batchTimeoutMs? } | Subscribe to any DBOS stream |
Auth Middleware
authedProcedure-- Validates JWT token (skipped if auth disabled or dev mode)flowContextProcedure-- ValidatesflowIdaccess (extends authed, admin bypass)executionContextProcedure-- ValidatesexecutionIdaccess via flow ownership (extends authed, admin bypass)
Context Services
Nodes access DBOS primitives at runtime via context.services. These are injected by the workflow during Phase 2.
DBOS Context Service (context.services.dbos)
Defined in server/services/context/DBOSContextService.ts:
| Method | Description |
|---|---|
runStep(fn, options?) | Execute durable step (name auto-prefixed with nodeId:step-N) |
startSubflow(context, params) | Start child flow, returns handle with getResult(), waitForOutput(), send() |
executeSubflow(context, params) | Start and await child flow result |
executeSubflowsParallel(context, subflows) | Start multiple child flows in parallel |
send(workflowId, message, topic?) | Send message to another workflow |
recv(topic?, timeoutSeconds?) | Receive message (WORKFLOW level only, not steps) |
setEvent(key, value) | Set a durable event value |
getEvent(workflowId, key, timeoutSeconds?) | Get an event value from any workflow |
writeStream(key, value) | Write to DBOS stream (allowed from steps) |
readStream(workflowId, key) | Read from DBOS stream |
closeStream(key) | Close a DBOS stream |
sleep(durationMs) | Durable sleep (survives replay) |
now() | Durable timestamp (returns same value on replay) |
getWorkflowStatus(workflowId) | Query workflow status |
Subflow limits (enforced by DBOSContextService):
- Max depth: 100 levels
- Max breadth: 10 concurrent subflows per parent
- Max total: 1000 subflows per execution
VFS Context Services
context.services.vfs(VFSContextService) -- Read-only virtual file system accesscontext.services.vfsWrite(VFSWriteContextService) -- Write virtual file system access
Both use the flow owner's userId for permission checks.
Database Schema
Defined in server/stores/postgres/schema.ts. All tables use the fireflow_ prefix.
fireflow_executions
| Column | Type | Description |
|---|---|---|
id | text PK | Execution ID (nanoid) |
flow_id | text | Flow being executed |
owner_id | text | Flow owner ID |
root_execution_id | text | Root of execution tree |
parent_execution_id | text | Parent execution (null for root) |
status | text enum | created, running, paused, completed, failed, stopped |
created_at | timestamp | Creation time |
updated_at | timestamp | Last update time |
started_at | timestamp | When execution began |
completed_at | timestamp | When execution finished |
error_message | text | Error message (if failed) |
error_node_id | text | Node that caused failure |
execution_depth | integer | Nesting level (0 = root) |
options | jsonb | ExecutionOptions |
integration | jsonb | IntegrationContext |
external_events | jsonb | ExecutionExternalEvent[] |
failure_count | integer | Number of failures (for recovery) |
last_failure_reason | text | Most recent failure reason |
last_failure_at | timestamp | Most recent failure time |
processing_started_at | timestamp | When worker claimed execution |
processing_worker_id | text | Worker currently processing |
Indexes (10 total):
executions_flow_depth_created_idx-- Flow + depth + created (tree queries)executions_root_execution_id_idx-- Tree traversalexecutions_parent_execution_id_idx-- Child lookupsexecutions_flow_id_idx-- Flow-scoped queriesexecutions_status_idx-- Status filteringexecutions_started_at_idx-- Time-range queriesexecutions_failure_count_idx-- Recovery scanningexecutions_processing_worker_idx-- Worker lookupsexecutions_status_failure_idx-- Recovery: status + failure countexecutions_last_failure_at_idx-- Recovery timing
fireflow_execution_claims
| Column | Type | Description |
|---|---|---|
execution_id | text PK | Claimed execution |
worker_id | text | Claiming worker |
claimed_at | timestamp | Claim time |
expires_at | timestamp | Claim expiration |
heartbeat_at | timestamp | Last heartbeat |
status | text | active, released, expired |
fireflow_execution_recovery
| Column | Type | Description |
|---|---|---|
id | serial PK | Auto-increment ID |
execution_id | text FK | Recovered execution |
recovered_at | timestamp | Recovery time |
recovered_by_worker | text | Worker that recovered |
recovery_reason | text | expired_claim, no_claim, stuck_running, retry_after_failure |
previous_status | text | Status before recovery |
previous_worker_id | text | Worker before recovery |
Migration Commands
# Push schema directly (development)
pnpm --filter @persistentai/fireflow-executor migrate:push
# Generate migration files
pnpm --filter @persistentai/fireflow-executor migrate:generate
# Run migrations
pnpm --filter @persistentai/fireflow-executor migrate:runConfiguration Reference
All environment variables from server/utils/config.ts:
Execution Mode
| Variable | Default | Description |
|---|---|---|
EXECUTION_MODE | local | 'local' or 'dbos' |
Database
| Variable | Default | Description |
|---|---|---|
DATABASE_URL | postgres://postgres@localhost:5432/fireflow | Main database |
DATABASE_URL_EXECUTIONS | Falls back to DATABASE_URL | Execution-specific database |
DBOS
| Variable | Default | Description |
|---|---|---|
ENABLE_DBOS_EXECUTION | false | Enable DBOS durable execution |
DBOS_APPLICATION_NAME | fireflow-executor | App name for DBOS identification |
DBOS_QUEUE_CONCURRENCY | 100 | Global concurrency limit across all workers |
DBOS_WORKER_CONCURRENCY | 5 | Per-worker concurrency limit |
DBOS_SYSTEM_DATABASE_POOL_SIZE | 10 | DBOS system database connection pool |
DBOS_ADMIN_ENABLED | true | Enable DBOS admin UI |
DBOS_ADMIN_PORT | 3022 | Admin UI port |
DBOS_CONDUCTOR_URL | (empty) | Remote DBOS Conductor URL (optional) |
DBOS_CONDUCTOR_KEY | (empty) | Conductor authentication key (optional) |
Note: DBOS_APPLICATION_VERSION is hardcoded in server/dbos/version.ts (not configurable via env var). This ensures API and Worker always use the same version, preventing DBOS replay mismatches.
Worker
| Variable | Default | Description |
|---|---|---|
WORKER_ID | HOSTNAME or random | Unique worker identifier |
WORKER_CONCURRENCY | 10 | Local execution concurrency |
WORKER_CLAIM_TIMEOUT_MS | 30000 | Claim duration before expiration |
WORKER_HEARTBEAT_INTERVAL_MS | 5000 | Heartbeat frequency |
WORKER_CLAIM_EXPIRATION_CHECK_MS | 10000 | Check interval for expired claims |
Metrics
| Variable | Default | Description |
|---|---|---|
ENABLE_METRICS | false | Enable metrics collection |
METRICS_LOG_LEVEL | debug | Metrics log level (debug, info, warn) |
METRICS_SAMPLING_ENABLED | false | Enable sampling |
METRICS_SAMPLING_RATE | 1.0 | Sampling rate (0.0 - 1.0) |
METRICS_BATCH_SIZE | 1 | Events per batch before logging |
METRICS_FLUSH_INTERVAL | 1000 | Batch flush interval (ms) |
METRICS_INCLUDE_MEMORY | false | Include memory snapshots in metrics |
Recovery
| Variable | Default | Description |
|---|---|---|
ENABLE_RECOVERY | true | Enable automatic recovery service |
RECOVERY_SCAN_INTERVAL_MS | 30000 | How often to scan for stuck executions |
RECOVERY_MAX_FAILURE_COUNT | 5 | Max failures before permanent failure |
Logging
| Variable | Default | Description |
|---|---|---|
LOG_LEVEL | info | Pino log level |
NODE_ENV | development | production enables compact logging |
Execution Engine
| Variable | Default | Description |
|---|---|---|
ENABLE_UNIFIED_EXECUTION | false | Enable SP-tree execution engine (required) |
DBOS Constraints & Gotchas
Workflow vs Step Restrictions
WORKFLOW context: send(), recv(), startWorkflow(), writeStream(),
readStream(), runStep(), setEvent(), getEvent() ── ALL allowed
STEP context: writeStream() ── ONLY this one is allowedEverything else (send, recv, startWorkflow, etc.) throws if called from a step. The codebase works around this by:
- Collecting child tasks during execution, spawning them at workflow level
- Using
CommandController(shared mutable state) to pass commands from workflow to step - Writing events via
DBOS.writeStream()from within the long-running step
Queue Creation Order
The workflow queue in server/dbos/queue.ts is created at module level (during import). This is intentional -- it MUST exist before DBOS.launch() for the worker to dequeue tasks. The file includes a runtime check:
const dbosAlreadyInitialized = DBOS.isInitialized()
if (dbosAlreadyInitialized) {
logger.error('CRITICAL: DBOS is already initialized! Queue will NOT be used for dequeue!')
}Step Name Determinism
DBOS replays steps by name. If step names differ between original execution and replay, cached results won't match. DBOSContextService ensures determinism by auto-prefixing:
stepName = `${nodeId}:${userProvidedName || `step-${counter}`}`The counter is per-node via AsyncLocalStorage, ensuring parallel node execution doesn't cause name collisions.
Application Version
DBOS_APPLICATION_VERSION is hardcoded in server/dbos/version.ts (currently '1.0.0'). This is deliberate -- using an environment variable would risk version drift between API and Worker deployments, causing DBOS to route tasks to the wrong worker version.
Debug Mode
Command polling (PAUSE/RESUME/STEP/STOP via DBOS.recv()) is only enabled when task.debug=true. In production mode (debug=false), there is zero polling overhead -- no recv() calls are made during execution.
Recovery Service
server/services/RecoveryService.ts handles stuck and failed executions:
- Uses PostgreSQL advisory locks so only one worker runs recovery at a time
- Scans for: expired claims, unclaimed
created-status executions, retry-after-failure - Max failure count (default 5) before marking execution as permanently failed
- Records all recovery actions in
fireflow_execution_recoverytable for audit - Configurable scan interval (default 30 seconds)
Directory Structure
fireflow-executor/
├── client/
│ ├── index.ts # tRPCReact hooks, TRPCProvider, type helpers
│ └── trpc.ts # WebSocket client setup, SuperJSON, keep-alive
├── server/
│ ├── index.ts # All server exports
│ ├── dbos/
│ │ ├── config.ts # DBOS initialization (launch/shutdown)
│ │ ├── DBOSExecutionWorker.ts # Worker lifecycle management
│ │ ├── queue.ts # Module-level WorkflowQueue (BEFORE DBOS.launch)
│ │ ├── version.ts # Hardcoded DBOS_APPLICATION_VERSION
│ │ ├── types.ts # ExecutionResult, DBOSQueueOptions
│ │ ├── workflows/
│ │ │ └── ExecutionWorkflows.ts # @DBOS.workflow() 3-phase orchestration
│ │ ├── steps/
│ │ │ └── UpdateStatusStep.ts # @DBOS.step() durable status updates
│ │ └── utils/
│ │ └── FlowCachedLoader.ts # Version-aware flow caching (30min TTL)
│ ├── implementations/
│ │ ├── dbos/
│ │ │ ├── APITaskQueue.ts # Enqueue via DBOSClient (API-only)
│ │ │ ├── DBOSEventBus.ts # DBOS stream event pub/sub
│ │ │ ├── DBOSTaskQueue.ts # Direct DBOS.startWorkflow() (worker)
│ │ │ ├── migrations/
│ │ │ │ └── PostgreSQLMigrations.ts # LISTEN/NOTIFY setup
│ │ │ └── streaming/
│ │ │ ├── DBOSStreamSubscriber.ts # Stream consumer
│ │ │ ├── PGListener.ts # Single LISTEN connection
│ │ │ ├── PGListenerPool.ts # 10-listener pool
│ │ │ ├── StreamBridge.ts # DBOS → event bus bridge
│ │ │ └── types.ts # Stream constants & config
│ │ └── local/
│ │ ├── InMemoryEventBus.ts # Dev-only event bus
│ │ └── InMemoryTaskQueue.ts # Dev-only task queue
│ ├── interfaces/
│ │ ├── IEventBus.ts # Event pub/sub interface
│ │ └── ITaskQueue.ts # Task queue interface
│ ├── services/
│ │ ├── ExecutionService.ts # Main execution orchestrator
│ │ ├── IExecutionService.ts # Service interface
│ │ ├── RecoveryService.ts # Failed execution recovery
│ │ ├── ServiceFactory.ts # DI container (3 init paths)
│ │ └── context/
│ │ ├── DBOSContextService.ts # DBOS primitives for nodes
│ │ ├── VFSContextService.ts # VFS read access
│ │ ├── VFSWriteContextService.ts # VFS write access
│ │ └── node-execution-scope.ts # AsyncLocalStorage scope
│ ├── stores/
│ │ ├── execution-store.ts # Store factory
│ │ ├── flow-store.ts # Flow persistence (from fireflow-trpc)
│ │ ├── user-store.ts # User auth (from fireflow-trpc)
│ │ ├── interfaces/
│ │ │ └── IExecutionStore.ts # Store interface
│ │ └── postgres/
│ │ ├── schema.ts # 3 Drizzle tables + 18 indexes
│ │ └── postgres-execution-store.ts # PostgreSQL implementation
│ ├── trpc/
│ │ ├── context.ts # Auth + services injection
│ │ └── router.ts # All execution procedures
│ ├── metrics/
│ │ ├── MetricsTracker.ts # Collection & aggregation
│ │ ├── helpers.ts # Metric helpers
│ │ └── types.ts # Metric type definitions
│ ├── utils/
│ │ ├── config.ts # Environment configuration
│ │ ├── db.ts # Database pool management
│ │ ├── logger.ts # Pino logger setup
│ │ └── serialization.ts # SuperJSON setup
│ ├── ws-server.ts # WebSocket server for tRPC
│ └── drizzle.config.ts # Drizzle Kit configuration
├── types/
│ ├── index.ts # Re-exports
│ ├── execution.ts # ExecutionStatus, ExecutionInstance, etc.
│ └── messages.ts # ExecutionCommand, ExecutionTask, etc.
├── package.json
├── tsconfig.json
└── .env.exampleDevelopment
Prerequisites
- PostgreSQL running (Docker or local)
- pnpm installed
- Node.js v24+ or Bun for development
Commands
# Build
pnpm --filter @persistentai/fireflow-executor build
# Type check
pnpm --filter @persistentai/fireflow-executor typecheck
# Development (watch mode)
pnpm --filter @persistentai/fireflow-executor dev
# Run worker in development
pnpm --filter @persistentai/fireflow-executor dev:worker
# Run event stream server
pnpm --filter @persistentai/fireflow-executor dev:stream
# Production start
pnpm --filter @persistentai/fireflow-executor start:worker
pnpm --filter @persistentai/fireflow-executor start:stream
# Database migrations
pnpm --filter @persistentai/fireflow-executor migrate:push
pnpm --filter @persistentai/fireflow-executor migrate:generate
pnpm --filter @persistentai/fireflow-executor migrate:runKey Dependencies
| Package | Version | Purpose |
|---|---|---|
@dbos-inc/dbos-sdk | ^4.7.9 | Durable execution framework |
@trpc/server | ^11.7.2 | Type-safe API procedures |
@trpc/client | ^11.7.2 | tRPC client with WebSocket link |
@trpc/react-query | ^11.7.2 | React hooks for tRPC |
drizzle-orm | ^0.44.5 | Type-safe PostgreSQL toolkit |
pg | ^8.16.3 | PostgreSQL driver |
pg-listen | ^1.7.0 | PostgreSQL LISTEN/NOTIFY |
pino | ^9.12.0 | Structured logging |
ws | ^8.18.3 | WebSocket server |
zod | ^3.25.76 | Runtime schema validation |
superjson | peer | Serialization (Date, Map, Set, BigInt) |
Workspace dependencies:
@persistentai/fireflow-types-- Execution engine, node types, decorators@persistentai/fireflow-nodes-- Pre-built node implementations@persistentai/fireflow-trpc-- Flow store, auth service, Drizzle schemas@persistentai/fireflow-vfs-- Virtual file system for node execution
Troubleshooting
| Symptom | Likely Cause | Solution |
|---|---|---|
"Queue will NOT be used for dequeue" in logs | queue.ts imported after DBOS.launch() | Ensure queue.ts is imported before initializeDBOS() in ServiceFactory |
| Events not streaming to frontend | ENABLE_DBOS_EXECUTION not set | Set ENABLE_DBOS_EXECUTION=true in environment |
| Child executions not starting | Depth/breadth/total limit exceeded | Check execution depth (max 100), breadth (max 10), total (max 1000) |
Execution stuck in created status | START_SIGNAL not sent within 5 minutes | Verify client calls start() after create() |
| Step replay returns wrong results | Non-deterministic step names | Use context.services.dbos.runStep() (auto-prefixes with nodeId) |
"Services not initialized" error | createServices*() not called before tRPC | Call factory function during server startup before creating tRPC context |
| PostgreSQL connection exhaustion | Too many PGListener connections | Check DBOS_SYSTEM_DATABASE_POOL_SIZE (default 10) and PGListenerPool (10 connections) |
"Failed to initialize execution engine" | ENABLE_UNIFIED_EXECUTION not set | Set ENABLE_UNIFIED_EXECUTION=true -- only SPTreeExecutionEngine is supported |
| Workflow version mismatch errors | API and Worker on different versions | Ensure both are built from the same commit (version is hardcoded in version.ts) |
"Execution start timeout" | Client never called start() | Signal pattern requires: create() → subscribe() → start() |
License
Business Source License 1.1 (BUSL-1.1) -- see LICENSE.txt