Skip to content

PersistentAI API Documentation / @persistentai/fireflow-executor / server / DBOSExecutionWorker

Class: DBOSExecutionWorker

Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:67

DBOS-based execution worker for fireflow flows

This worker replaces the Kafka-based ExecutionWorker with a DBOS-powered implementation that provides:

  • Durable execution with automatic recovery
  • Exactly-once semantics through idempotency
  • Built-in concurrency control
  • PostgreSQL-backed queue (no Kafka needed for task distribution)

Architecture:

tRPC API → DBOSClient.enqueue() or DBOS.startWorkflow()

         PostgreSQL (DBOS queue)

         DBOS Worker (auto-consumes)

         ExecutionWorkflows.executeFireFlow():
           1. Update status to "running"
           2. Execute flow atomically
           3. Update status to "completed"

Key Features:

  • Uses class-based workflow with @DBOS.workflow() decorator
  • Module-level queue created before DBOS.launch()
  • No manual claim management (DBOS handles it)
  • No manual recovery service (DBOS handles it)
  • No manual offset commits (DBOS handles it)

Usage:

typescript
const worker = new DBOSExecutionWorker(store, null, {
  concurrency: 100,
  workerConcurrency: 5
});

await worker.start();
// Worker automatically processes queued executions

Constructors

Constructor

new DBOSExecutionWorker(store, executionService, _options?): DBOSExecutionWorker

Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:77

Create a new DBOS execution worker

Parameters

store

IExecutionStore

Execution store for database operations

executionService

Execution service for flow execution (optional, can be set via initializeExecuteFlowStep)

ExecutionService | null

_options?

DBOSWorkerOptions

Worker configuration options (now ignored - config comes from queue.ts)

Returns

DBOSExecutionWorker

Methods

getQueueName()

getQueueName(): string

Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:179

Get the queue name

Returns

string

The queue name used by this worker


isWorkerRunning()

isWorkerRunning(): boolean

Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:187

Check if worker is running

Returns

boolean

True if worker is running


start()

start(): Promise<void>

Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:104

Start the DBOS execution worker

This initializes DBOS, registers workflows and steps, and starts processing queued executions automatically.

Returns

Promise<void>


stop()

stop(): Promise<void>

Defined in: packages/fireflow-executor/server/dbos/DBOSExecutionWorker.ts:156

Stop the DBOS execution worker gracefully

This shuts down DBOS, which will:

  • Stop accepting new workflow executions
  • Wait for in-progress workflows to complete (graceful)
  • Close database connections
  • Clean up resources

Returns

Promise<void>

Licensed under BUSL-1.1