Skip to content

PersistentAI API Documentation / @persistentai/fireflow-executor / server / DBOSEventBus

Class: DBOSEventBus

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:31

DBOS-based implementation of IEventBus

Simplified wrapper around StreamBridge for execution event streaming. Uses generic DBOS stream infrastructure with MultiChannel pattern.

Features:

  • Real-time streaming via PostgreSQL LISTEN/NOTIFY
  • Automatic sharding across PGListener pool
  • MultiChannel for efficient fan-out
  • Backward compatible with IEventBus interface

Implements

Constructors

Constructor

new DBOSEventBus(streamBridge): DBOSEventBus

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:35

Parameters

streamBridge

StreamBridge

Returns

DBOSEventBus

Methods

close()

close(): Promise<void>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:151

Close the event bus

Returns

Promise<void>

Implementation of

IEventBus.close


closeStream()

closeStream(executionId): Promise<void>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:301

Close stream for specific execution (no-op)

DBOS automatically closes streams when workflow terminates

Parameters

executionId

string

Returns

Promise<void>


publishEvent()

publishEvent(executionId, event): Promise<void>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:49

Publish an execution event to DBOS stream

Note: This continues to use DBOS.writeStream() directly to maintain existing execution flow (no changes to ExecutionWorkflow)

Parameters

executionId

string

Execution ID (also workflow ID)

event

ExecutionEventImpl

Event to publish

Returns

Promise<void>

Implementation of

IEventBus.publishEvent


subscribeToEvents()

subscribeToEvents(executionId, fromIndex?, batchConfig?): AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:78

Subscribe to execution events

Parameters

executionId

string

Execution ID

fromIndex?

number = 0

Starting event index (0-based)

batchConfig?

EventBatchConfig

Optional batching configuration

Returns

AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>

Async iterable of event batches

Implementation of

IEventBus.subscribeToEvents


subscribeToExecutionTree()

subscribeToExecutionTree(rootWorkflowId, fromIndex?, batchConfig?): AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:169

Subscribe to execution tree events.

Reads events from the root workflow AND all branch subworkflows. Branch discovery is recursive — sub-branches of branches are also included.

When the root writes to its 'branches' stream, this method discovers those branches and subscribes to their 'events' streams as well. All events are merged into a single output iterable.

Falls back to regular subscribeToEvents if no branches exist (backwards compatible with old engine).

Parameters

rootWorkflowId

string

fromIndex?

number = 0

batchConfig?

EventBatchConfig

Returns

AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>

Implementation of

IEventBus.subscribeToExecutionTree


subscribeToStream()

subscribeToStream<T>(workflowId, streamKey, fromOffset?, batchConfig?): AsyncIterable<T[]>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:113

Subscribe to an arbitrary DBOS stream by workflow ID and stream key. Used for port-level frontend streaming (STREAM_PUBLISHED events).

Type Parameters

T

T = any

Parameters

workflowId

string

streamKey

string

fromOffset?

number = 0

batchConfig?

EventBatchConfig

Returns

AsyncIterable<T[]>

Implementation of

IEventBus.subscribeToStream


unsubscribe()

unsubscribe(executionId): Promise<void>

Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:143

Unsubscribe from events

Parameters

executionId

string

Returns

Promise<void>

Implementation of

IEventBus.unsubscribe

Licensed under BUSL-1.1