Skip to main content
The Queue class is the main entry point for @platformatic/job-queue. It combines producer and consumer functionality into a single interface, making it easy to both enqueue jobs and process them.

Architecture Overview

Internally, the Queue is composed of two main components:
  • Producer: Handles job enqueueing and result retrieval
  • Consumer: Handles job processing and execution
When you call queue.execute(), the Queue becomes both a producer and consumer. Without calling execute(), it operates in producer-only mode.
import { Queue, RedisStorage } from '@platformatic/job-queue'

const queue = new Queue<{ email: string }, { sent: boolean }>({
  storage: new RedisStorage({ url: 'redis://localhost:6379' }),
  concurrency: 5,
  maxRetries: 3
})

// Register handler (makes this a consumer)
queue.execute(async (job) => {
  console.log(`Processing ${job.id}:`, job.payload)
  return { sent: true }
})

await queue.start()

Configuration Options

The Queue accepts a QueueConfig<TPayload, TResult> object with the following options:
storage
Storage
required
Storage backend instance (MemoryStorage, RedisStorage, or FileStorage)
workerId
string
default:"randomUUID()"
Unique identifier for this worker instance. Used to track which worker is processing which job.
concurrency
number
default:1
Number of jobs to process in parallel. Each concurrent job runs in its own worker loop.
blockTimeout
number
default:5
Seconds to wait when polling for jobs. Uses blocking operations where supported (e.g., BLMOVE in Redis).
maxRetries
number
default:3
Default maximum retry attempts for failed jobs. Can be overridden per-job via EnqueueOptions.maxAttempts.
visibilityTimeout
number
default:30000
Milliseconds before a processing job is considered stalled. The Reaper uses this to detect and recover crashed jobs.
resultTTL
number
default:3600000
Milliseconds to cache job results and errors (1 hour default). Can be overridden per-job or in afterExecution hook.
payloadSerde
Serde<TPayload>
default:"JsonSerde"
Custom serializer for job payloads. Defaults to JSON serialization.
resultSerde
Serde<TResult>
default:"JsonSerde"
Custom serializer for job results. Defaults to JSON serialization.
afterExecution
AfterExecutionHook<TPayload, TResult>
Hook called after job execution and before persisting terminal state. Can modify result TTL or transform results.
logger
pino.Logger
default:"abstractLogger"
Pino-compatible logger instance. Defaults to a no-op logger.

Lifecycle Methods

start()

Connects to storage and starts consuming jobs if a handler is registered.
await queue.start()
Implementation details (from src/queue.ts:72):
  1. Connects to storage backend
  2. Sets internal #started flag
  3. If a handler was registered via execute(), starts the Consumer
  4. Emits 'started' event

stop()

Gracefully stops processing jobs and disconnects from storage.
await queue.stop()
Implementation details (from src/queue.ts:94):
  1. Stops the Consumer (waits for active jobs with visibilityTimeout limit)
  2. Disconnects from storage
  3. Sets #started to false
  4. Emits 'stopped' event
The stop() method waits up to visibilityTimeout milliseconds for active jobs to complete. Jobs that don’t finish in time will be aborted and requeued by the Reaper.

execute(handler)

Registers a job handler function. Can be called before or after start().
queue.execute(async (job) => {
  // job.id: string
  // job.payload: TPayload
  // job.attempts: number (starts at 1)
  // job.signal: AbortSignal
  
  return result // TResult
})
Job object structure (from src/types.ts:88):
interface Job<TPayload> {
  id: string
  payload: TPayload
  attempts: number        // Current attempt number (1-indexed)
  signal: AbortSignal     // Aborted on cancellation or visibility timeout
}

Producer Methods

These methods are available on all Queue instances, even without calling execute().

enqueue()

Enqueue a job for processing (fire-and-forget).
const result = await queue.enqueue('job-123', { email: 'user@example.com' }, {
  maxAttempts: 5,
  resultTTL: 300000  // 5 minutes
})

if (result.status === 'queued') {
  console.log('Job added to queue')
} else if (result.status === 'completed') {
  console.log('Job already completed:', result.result)
} else if (result.status === 'duplicate') {
  console.log('Job already exists with state:', result.existingState)
}
Return type (from src/types.ts:71):
type EnqueueResult<TResult> =
  | { status: 'queued' }
  | { status: 'duplicate'; existingState: MessageState }
  | { status: 'completed'; result: TResult }

enqueueAndWait()

Enqueue a job and wait for the result (request/response pattern).
try {
  const result = await queue.enqueueAndWait('req-123', { url: 'https://api.example.com' }, {
    timeout: 30000,     // 30 seconds
    maxAttempts: 3,
    resultTTL: 60000    // 1 minute
  })
  console.log('Success:', result)
} catch (error) {
  if (error instanceof TimeoutError) {
    console.log('Request timed out')
  } else if (error instanceof JobFailedError) {
    console.log('Job failed:', error.originalError)
  }
}
See Request/Response Pattern for detailed usage.

cancel()

Cancel a queued job. Cannot cancel jobs that are currently processing.
const result = await queue.cancel('job-123')
// result.status: 'cancelled' | 'not_found' | 'processing' | 'completed'

getStatus()

Get the current status of a job.
const status = await queue.getStatus('job-123')
// {
//   id: 'job-123',
//   state: 'completed',
//   createdAt: 1234567890,
//   attempts: 2,
//   result?: { ... },
//   error?: { message: '...', code?: '...', stack?: '...' }
// }

getResult()

Retrieve the cached result of a completed job.
const result = await queue.getResult('job-123')
if (result) {
  console.log('Cached result:', result)
}

updateResultTTL()

Update the TTL for a completed or failed job’s cached payload.
const update = await queue.updateResultTTL('job-123', 7200000) // 2 hours
// update.status: 'updated' | 'not_found' | 'not_terminal' | 'missing_payload'
See Deduplication for more details.

Events

The Queue extends EventEmitter and emits the following events:

Lifecycle Events

queue.on('started', () => {
  console.log('Queue started')
})

queue.on('stopped', () => {
  console.log('Queue stopped')
})

queue.on('error', (error: Error) => {
  console.error('Queue error:', error)
})

Job Events

queue.on('enqueued', (id: string) => {
  console.log(`Job ${id} was enqueued`)
})

queue.on('completed', (id: string, result: TResult) => {
  console.log(`Job ${id} completed:`, result)
})

queue.on('failed', (id: string, error: Error) => {
  console.log(`Job ${id} failed permanently:`, error)
})

queue.on('failing', (id: string, error: Error, attempt: number) => {
  console.log(`Job ${id} failed attempt ${attempt}, will retry:`, error)
})

queue.on('requeued', (id: string) => {
  console.log(`Job ${id} was requeued (e.g., during graceful shutdown)`)
})

queue.on('cancelled', (id: string) => {
  console.log(`Job ${id} was cancelled`)
})

queue.on('stalled', (id: string) => {
  console.log(`Job ${id} was stalled and recovered`)
})
Event definitions (from src/types.ts:172):
interface QueueEvents<TResult> {
  started: []
  stopped: []
  error: [error: Error]
  enqueued: [id: string]
  completed: [id: string, result: TResult]
  failed: [id: string, error: Error]
  failing: [id: string, error: Error, attempt: number]
  requeued: [id: string]
  cancelled: [id: string]
  stalled: [id: string]
}

AfterExecution Hook

The afterExecution hook is called after job execution completes (success or failure) but before the terminal state is persisted. This allows you to:
  • Dynamically adjust result/error TTL based on job outcome
  • Transform or enrich results before storage
  • Perform side effects (logging, metrics, notifications)
const queue = new Queue<{ url: string }, { body: string; cacheFor?: number }>({
  storage,
  resultTTL: 60000, // Default 1 minute
  afterExecution: async (context) => {
    // Context is mutable (passed by reference)
    console.log(`Job ${context.id} finished in ${context.durationMs}ms`)
    
    if (context.status === 'completed' && context.result?.cacheFor) {
      // Override TTL based on result
      context.ttl = context.result.cacheFor
    }
    
    if (context.status === 'failed' && context.error?.message.includes('temporary')) {
      // Cache temporary errors for shorter time
      context.ttl = 5000
    }
  }
})
AfterExecutionContext structure (from src/types.ts:98):
interface AfterExecutionContext<TPayload, TResult> {
  id: string
  payload: TPayload
  attempts: number
  maxAttempts: number
  createdAt: number
  status: 'completed' | 'failed'
  result?: TResult
  error?: Error
  ttl: number              // Mutable: modify to change stored TTL
  workerId: string
  startedAt: number
  finishedAt: number
  durationMs: number
}
If the hook throws an error, the original TTL is restored and the error is logged. The job’s terminal state is still persisted.

Producer/Consumer Separation

You can run producers and consumers as separate processes:
// producer.ts (API server)
import { Queue, RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })
const producer = new Queue({ storage })

await producer.start()
await producer.enqueue('task-1', { email: 'user@example.com' })
await producer.stop()
// worker.ts (background worker)
import { Queue, RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })
const worker = new Queue({
  storage,
  workerId: `worker-${process.pid}`,
  concurrency: 10
})

worker.execute(async (job) => {
  // Process job
  return result
})

await worker.start()

process.on('SIGTERM', async () => {
  await worker.stop()
  process.exit(0)
})