Skip to content

PersistentAI API Documentation / @persistentai/fireflow-executor / server / DBOSTaskQueue

Class: DBOSTaskQueue

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:42

DBOS-based implementation of ITaskQueue interface

This implementation uses DBOS Durable Queues with direct DBOS.startWorkflow() calls. It provides a compatible interface with the existing ITaskQueue so it can be used as a drop-in replacement in the system.

Key Features:

  • Uses @DBOS.workflow() decorated class method directly
  • No manual offset management (DBOS handles it)
  • No consumer groups (DBOS handles distribution)
  • consumeTasks() is a no-op (DBOS auto-consumes via workflow registration)
  • Built-in exactly-once semantics through idempotency

Usage:

typescript
const taskQueue = new DBOSTaskQueue();

// Publish task - uses DBOS.startWorkflow() directly
await taskQueue.publishTask(task);

// Consumption is automatic - no need to call consumeTasks()

Implements

Constructors

Constructor

new DBOSTaskQueue(queueName?): DBOSTaskQueue

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:50

Create a DBOS task queue

Parameters

queueName?

string = QUEUE_NAME

Optional queue name (defaults to 'fireflow-executions')

Returns

DBOSTaskQueue

Methods

close()

close(): Promise<void>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:136

Close the queue and cleanup resources

NOTE: This is handled by DBOSExecutionWorker.stop(). No cleanup needed at the queue level.

Returns

Promise<void>

Implementation of

ITaskQueue.close


consumeTasks()

consumeTasks(_handler): Promise<void>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:100

Consume tasks from the queue

NOTE: This is a no-op for DBOS implementation because DBOS automatically consumes from the queue through workflow registration. Workers don't need to manually subscribe - DBOS handles consumption internally.

The handler parameter is ignored because DBOS workflows are registered via the @DBOS.workflow() decorator.

Parameters

_handler

TaskHandler

Task handler (unused in DBOS implementation)

Returns

Promise<void>

Implementation of

ITaskQueue.consumeTasks


getPendingCount()

getPendingCount(): Promise<number>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:125

Get the number of pending tasks in the queue

NOTE: This is optional in the ITaskQueue interface and not currently implemented for DBOS. DBOS workflows are tracked in system tables, but there's no simple API to count pending tasks.

Returns

Promise<number>

0 (not implemented)

Implementation of

ITaskQueue.getPendingCount


publishTask()

publishTask(task): Promise<void>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:64

Publish an execution task to the DBOS queue

Uses DBOS.startWorkflow() directly with the ExecutionWorkflows class. The task will be durably stored in PostgreSQL and eventually processed by a worker. DBOS guarantees at-least-once execution.

Parameters

task

ExecutionTask

Execution task to publish

Returns

Promise<void>

Implementation of

ITaskQueue.publishTask


stopConsuming()

stopConsuming(): Promise<void>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:111

Stop consuming tasks

NOTE: This is handled by DBOSExecutionWorker.stop() which shuts down the entire DBOS runtime. Individual task queues don't need to stop.

Returns

Promise<void>

Implementation of

ITaskQueue.stopConsuming

Licensed under BUSL-1.1