The Queue class is the primary interface for the Platformatic Job Queue. It combines producer and consumer functionality, allowing you to both enqueue jobs and process them.
Class Signature
class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TResult>>
Type Parameters
TPayload - The type of the job payload
TResult - The type of the job result (defaults to void)
Constructor
new Queue<TPayload, TResult>(config: QueueConfig<TPayload, TResult>)
Creates a new Queue instance with the specified configuration.
Configuration
The storage backend instance (e.g., RedisStorage)
Unique identifier for this worker. Defaults to a random UUID.
Custom serializer/deserializer for job payloads. Defaults to JSON serialization.
Custom serializer/deserializer for job results. Defaults to JSON serialization.
Number of jobs to process in parallel. Controls how many jobs this worker will process simultaneously.
Blocking dequeue timeout in seconds. How long to wait for new jobs when polling the queue.
Default maximum number of retry attempts for failed jobs. Can be overridden per job in enqueue() options.
Maximum processing time in milliseconds before a job is considered stalled. After this timeout, the Reaper will requeue the job.
Time-to-live for stored results and errors in milliseconds. Defaults to 1 hour (3600000ms).
afterExecution
AfterExecutionHook<TPayload, TResult>
Hook function called after job execution and before persisting terminal state. Useful for logging, metrics, or custom cleanup.
Pino logger instance. Defaults to an abstract no-op logger.
Example
import { Queue } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue/storage/redis'
const queue = new Queue({
storage: new RedisStorage({ url: 'redis://localhost:6379' }),
workerId: 'worker-1',
concurrency: 5,
maxRetries: 3,
visibilityTimeout: 60000,
resultTTL: 3600000,
logger: pino()
})
Methods
start()
Starts the queue by connecting to storage and starting the consumer if a handler is registered.
Resolves when the queue has started successfully.
Example
await queue.start()
console.log('Queue started')
stop()
Stops the queue gracefully, disconnecting from storage and stopping the consumer.
Resolves when the queue has stopped completely.
Example
await queue.stop()
console.log('Queue stopped')
execute()
Registers a job handler function, turning this queue into a consumer.
execute(handler: JobHandler<TPayload, TResult>): void
handler
JobHandler<TPayload, TResult>
required
Function that processes jobs. Can be async/promise-based or callback-based.Promise-based signature:(job: Job<TPayload>) => Promise<TResult>
Callback-based signature:(job: Job<TPayload>, callback: (err: Error | null, result?: TResult) => void) => void
Job Object
The handler receives a Job object with the following properties:
id (string) - Unique job identifier
payload (TPayload) - The job’s payload data
attempts (number) - Current attempt number (1-indexed)
signal (AbortSignal) - Abort signal for cancellation
Example
interface EmailPayload {
to: string
subject: string
body: string
}
queue.execute(async (job) => {
console.log(`Processing job ${job.id}, attempt ${job.attempts}`)
await sendEmail({
to: job.payload.to,
subject: job.payload.subject,
body: job.payload.body
})
return { sent: true }
})
Callback-based Handler Example
queue.execute((job, callback) => {
processLegacyTask(job.payload, (err, result) => {
if (err) return callback(err)
callback(null, result)
})
})
enqueue()
Enqueues a job for processing (fire-and-forget).
enqueue(
id: string,
payload: TPayload,
options?: EnqueueOptions
): Promise<EnqueueResult<TResult>>
Unique identifier for the job. Must be unique across all jobs.
Maximum retry attempts for this specific job. Overrides the queue’s maxRetries setting.
TTL for this job’s result in milliseconds. Overrides the queue’s resultTTL setting.
status
'queued' | 'duplicate' | 'completed'
queued: Job successfully enqueued
duplicate: Job with this ID already exists (includes existingState)
completed: Job was already completed (includes result)
Example
const result = await queue.enqueue('email-123', {
to: 'user@example.com',
subject: 'Welcome',
body: 'Welcome to our service!'
})
if (result.status === 'queued') {
console.log('Job enqueued successfully')
} else if (result.status === 'duplicate') {
console.log('Job already exists with state:', result.existingState)
} else if (result.status === 'completed') {
console.log('Job already completed with result:', result.result)
}
enqueueAndWait()
Enqueues a job and waits for its result.
enqueueAndWait(
id: string,
payload: TPayload,
options?: EnqueueAndWaitOptions
): Promise<TResult>
Unique identifier for the job.
Maximum retry attempts for this job.
TTL for this job’s result in milliseconds.
Maximum time to wait for the result in milliseconds. If exceeded, rejects with a timeout error.
The job’s result value. Rejects if the job fails or times out.
Example
try {
const result = await queue.enqueueAndWait(
'report-456',
{ reportType: 'monthly', userId: '123' },
{ timeout: 30000 } // 30 second timeout
)
console.log('Report generated:', result)
} catch (error) {
console.error('Job failed or timed out:', error)
}
cancel()
Cancels a pending job.
cancel(id: string): Promise<CancelResult>
status
'cancelled' | 'not_found' | 'processing' | 'completed'
cancelled: Job successfully cancelled
not_found: Job doesn’t exist
processing: Job is currently being processed and cannot be cancelled
completed: Job already completed
Example
const result = await queue.cancel('job-789')
if (result.status === 'cancelled') {
console.log('Job cancelled successfully')
} else if (result.status === 'processing') {
console.log('Job is already being processed')
}
getResult()
Retrieves the result of a completed job.
getResult(id: string): Promise<TResult | null>
The job ID to retrieve the result for.
The job’s result if available, or null if the job hasn’t completed or the result has expired.
Example
const result = await queue.getResult('job-123')
if (result !== null) {
console.log('Job result:', result)
} else {
console.log('No result available')
}
updateResultTTL()
Updates the TTL for a completed job’s result or error.
updateResultTTL(id: string, ttlMs: number): Promise<UpdateResultTTLResult>
status
'updated' | 'not_found' | 'not_terminal' | 'missing_payload'
updated: TTL successfully updated
not_found: Job doesn’t exist
not_terminal: Job hasn’t completed yet (not in a terminal state)
missing_payload: Job is in terminal state but payload is missing
Example
const result = await queue.updateResultTTL('job-123', 7200000) // 2 hours
if (result.status === 'updated') {
console.log('TTL extended successfully')
}
getStatus()
Retrieves the current status of a job.
getStatus(id: string): Promise<MessageStatus<TResult> | null>
status
MessageStatus<TResult> | null
Status object containing:
id (string) - Job ID
state (MessageState) - Current state: ‘queued’, ‘processing’, ‘failing’, ‘completed’, or ‘failed’
createdAt (number) - Timestamp when job was created
attempts (number) - Number of attempts made
result (TResult, optional) - Result if completed
error (SerializedError, optional) - Error if failed
Returns null if the job doesn’t exist.
Example
const status = await queue.getStatus('job-123')
if (status) {
console.log(`Job ${status.id} is ${status.state}`)
console.log(`Attempts: ${status.attempts}`)
if (status.state === 'completed' && status.result) {
console.log('Result:', status.result)
} else if (status.state === 'failed' && status.error) {
console.log('Error:', status.error.message)
}
}
Events
The Queue class emits various events during its lifecycle. Listen to events using the standard EventEmitter API.
started
Emitted when the queue has successfully started.
queue.on('started', () => {
console.log('Queue started')
})
stopped
Emitted when the queue has successfully stopped.
queue.on('stopped', () => {
console.log('Queue stopped')
})
enqueued
Emitted when a job is successfully enqueued.
queue.on('enqueued', (id: string) => {
console.log(`Job ${id} enqueued`)
})
completed
Emitted when a job completes successfully.
queue.on('completed', (id: string, result: TResult) => {
console.log(`Job ${id} completed with result:`, result)
})
failed
Emitted when a job fails permanently (all retries exhausted).
queue.on('failed', (id: string, error: Error) => {
console.error(`Job ${id} failed:`, error.message)
})
failing
Emitted when a job fails but will be retried.
queue.on('failing', (id: string, error: Error, attempt: number) => {
console.warn(`Job ${id} failed attempt ${attempt}:`, error.message)
})
requeued
Emitted when a job is requeued for retry.
queue.on('requeued', (id: string) => {
console.log(`Job ${id} requeued for retry`)
})
cancelled
Emitted when a job is successfully cancelled.
queue.on('cancelled', (id: string) => {
console.log(`Job ${id} cancelled`)
})
error
Emitted when an error occurs in the queue’s internal operations.
queue.on('error', (error: Error) => {
console.error('Queue error:', error)
})
Complete Example
import { Queue } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue/storage/redis'
import pino from 'pino'
interface ProcessVideoPayload {
videoId: string
operations: string[]
}
interface ProcessVideoResult {
outputUrl: string
duration: number
}
const queue = new Queue<ProcessVideoPayload, ProcessVideoResult>({
storage: new RedisStorage({ url: 'redis://localhost:6379' }),
workerId: 'video-processor-1',
concurrency: 3,
maxRetries: 5,
visibilityTimeout: 300000, // 5 minutes
resultTTL: 86400000, // 24 hours
logger: pino(),
afterExecution: async (context) => {
// Custom metrics or logging
console.log(`Job ${context.id} took ${context.durationMs}ms`)
}
})
// Register event listeners
queue.on('completed', (id, result) => {
console.log(`Video ${id} processed: ${result.outputUrl}`)
})
queue.on('failed', (id, error) => {
console.error(`Video processing failed for ${id}:`, error.message)
})
queue.on('error', (error) => {
console.error('Queue error:', error)
})
// Register handler
queue.execute(async (job) => {
console.log(`Processing video ${job.payload.videoId}, attempt ${job.attempts}`)
// Check for cancellation
if (job.signal.aborted) {
throw new Error('Job was cancelled')
}
// Process video
const result = await processVideo(
job.payload.videoId,
job.payload.operations
)
return result
})
// Start the queue
await queue.start()
// Enqueue a job
await queue.enqueue('video-123', {
videoId: 'abc-def',
operations: ['transcode', 'thumbnail', 'compress']
})
// Or enqueue and wait for result
const result = await queue.enqueueAndWait('video-456', {
videoId: 'ghi-jkl',
operations: ['transcode']
}, { timeout: 60000 })
console.log('Processing complete:', result.outputUrl)
// Clean shutdown
process.on('SIGTERM', async () => {
await queue.stop()
process.exit(0)
})