PersistentAI API Documentation / @persistentai/fireflow-executor / server / DBOSExecutionWorker
Class: DBOSExecutionWorker
Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:67
DBOS-based execution worker for fireflow flows
This worker replaces the Kafka-based ExecutionWorker with a DBOS-powered implementation that provides:
- Durable execution with automatic recovery
- Exactly-once semantics through idempotency
- Built-in concurrency control
- PostgreSQL-backed queue (no Kafka needed for task distribution)
Architecture:
tRPC API → DBOSClient.enqueue() or DBOS.startWorkflow()
↓
PostgreSQL (DBOS queue)
↓
DBOS Worker (auto-consumes)
↓
ExecutionWorkflows.executeFireFlow():
1. Update status to "running"
2. Execute flow atomically
3. Update status to "completed"Key Features:
- Uses class-based workflow with @DBOS.workflow() decorator
- Module-level queue created before DBOS.launch()
- No manual claim management (DBOS handles it)
- No manual recovery service (DBOS handles it)
- No manual offset commits (DBOS handles it)
Usage:
const worker = new DBOSExecutionWorker(store, null, {
concurrency: 100,
workerConcurrency: 5
});
await worker.start();
// Worker automatically processes queued executionsConstructors
Constructor
new DBOSExecutionWorker(
store,executionService,_options?):DBOSExecutionWorker
Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:77
Create a new DBOS execution worker
Parameters
store
Execution store for database operations
executionService
Execution service for flow execution (optional, can be set via initializeExecuteFlowStep)
ExecutionService | null
_options?
Worker configuration options (now ignored - config comes from queue.ts)
Returns
DBOSExecutionWorker
Methods
getQueueName()
getQueueName():
string
Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:179
Get the queue name
Returns
string
The queue name used by this worker
isWorkerRunning()
isWorkerRunning():
boolean
Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:187
Check if worker is running
Returns
boolean
True if worker is running
start()
start():
Promise<void>
Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:104
Start the DBOS execution worker
This initializes DBOS, registers workflows and steps, and starts processing queued executions automatically.
Returns
Promise<void>
stop()
stop():
Promise<void>
Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:156
Stop the DBOS execution worker gracefully
This shuts down DBOS, which will:
- Stop accepting new workflow executions
- Wait for in-progress workflows to complete (graceful)
- Close database connections
- Clean up resources
Returns
Promise<void>