Skip to main content

Core Types

QueueConfig

Configuration object for creating a Queue instance.
storage
Storage
required
Storage backend for the queue (Memory, Redis, or File)
afterExecution
AfterExecutionHook<TPayload, TResult>
Hook called after job execution and before persisting terminal state
payloadSerde
Serde<TPayload>
Payload serializer (default: JSON)
resultSerde
Serde<TResult>
Result serializer (default: JSON)
workerId
string
Unique worker ID (default: random UUID)
concurrency
number
default:"1"
Number of jobs to process in parallel
blockTimeout
number
default:"5"
Blocking dequeue timeout in seconds
maxRetries
number
default:"3"
Default maximum retry attempts for failed jobs
visibilityTimeout
number
default:"30000"
Max processing time before job is considered stalled in milliseconds
processingQueueTTL
number
default:"604800000"
TTL for processing queue keys in milliseconds (default: 7 days)
resultTTL
number
default:"3600000"
TTL for stored results and errors in milliseconds (default: 1 hour)
logger
Logger
Pino logger instance (default: no-op logger)
import { Queue, RedisStorage } from '@platformatic/job-queue'
import Redis from 'ioredis'

const queue = new Queue({
  storage: new RedisStorage(new Redis()),
  concurrency: 10,
  maxRetries: 5,
  resultTTL: 7200000, // 2 hours
  logger: console
})

Job Types

Job

Job object passed to the handler function.
id
string
required
Unique job identifier
payload
TPayload
required
Job payload data
attempts
number
required
Number of times this job has been attempted
signal
AbortSignal
required
Abort signal for cancellation support
queue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  console.log(`Attempt ${job.attempts}`);
  console.log(`Payload:`, job.payload);
  
  // Check if cancelled
  if (job.signal.aborted) {
    throw new Error('Job was cancelled');
  }
  
  return { success: true };
});

QueueMessage

Internal message structure stored in the queue.
id
string
required
Unique message identifier
payload
TPayload
required
Message payload
createdAt
number
required
Timestamp when message was created (milliseconds since epoch)
attempts
number
required
Current attempt count
maxAttempts
number
required
Maximum number of retry attempts
resultTTL
number
TTL for result/error in milliseconds
correlationId
string
Optional correlation ID for tracking related jobs

Status Types

MessageState

Possible states for a job in the queue.
type MessageState = 'queued' | 'processing' | 'failing' | 'completed' | 'failed'
  • queued: Job is waiting to be processed
  • processing: Job is currently being processed
  • failing: Job failed and will be retried
  • completed: Job completed successfully
  • failed: Job failed permanently after all retries

MessageStatus

Detailed status information for a job.
id
string
required
Job identifier
state
MessageState
required
Current job state
createdAt
number
required
Timestamp when job was created (milliseconds since epoch)
attempts
number
required
Number of execution attempts
result
TResult
Job result (if completed successfully)
error
SerializedError
Error information (if failed)
const status = await queue.getStatus('job-123');

if (status) {
  console.log(`Job ${status.id} is ${status.state}`);
  
  if (status.state === 'completed') {
    console.log('Result:', status.result);
  } else if (status.state === 'failed') {
    console.log('Error:', status.error?.message);
  }
}

Enqueue Types

EnqueueOptions

Options for enqueuing a job.
maxAttempts
number
Maximum retry attempts (overrides queue default)
resultTTL
number
TTL for result/error in milliseconds (overrides queue default)
await queue.enqueue(
  { task: 'send-email', to: 'user@example.com' },
  { 
    maxAttempts: 5,
    resultTTL: 3600000 // 1 hour
  }
);

EnqueueAndWaitOptions

Options for enqueuing a job and waiting for completion.
maxAttempts
number
Maximum retry attempts (overrides queue default)
resultTTL
number
TTL for result/error in milliseconds (overrides queue default)
timeout
number
Maximum time to wait for result in milliseconds
try {
  const result = await queue.enqueueAndWait(
    { task: 'generate-report' },
    { 
      timeout: 30000,  // 30 seconds
      maxAttempts: 3
    }
  );
  console.log('Report generated:', result);
} catch (error) {
  if (error instanceof TimeoutError) {
    console.error('Report generation timed out');
  }
}

EnqueueResult

Result type returned by the enqueue() method.
type EnqueueResult<TResult = unknown> =
  | { status: 'queued' }
  | { status: 'duplicate'; existingState: MessageState }
  | { status: 'completed'; result: TResult }
status
'queued' | 'duplicate' | 'completed'
required
Result status
existingState
MessageState
Current state of existing job (when status is ‘duplicate’)
result
TResult
Job result (when status is ‘completed’)
const result = await queue.enqueue(
  { task: 'process-order' },
  { id: 'order-123' }
);

if (result.status === 'queued') {
  console.log('Job queued successfully');
} else if (result.status === 'duplicate') {
  console.log(`Job already exists with state: ${result.existingState}`);
} else if (result.status === 'completed') {
  console.log('Job already completed:', result.result);
}

Operation Result Types

CancelResult

Result type returned by the cancel() method.
type CancelResult =
  | { status: 'cancelled' }
  | { status: 'not_found' }
  | { status: 'processing' }
  | { status: 'completed' }
status
'cancelled' | 'not_found' | 'processing' | 'completed'
required
Cancellation result status
  • cancelled: Job was successfully cancelled
  • not_found: Job does not exist
  • processing: Job is currently being processed and cannot be cancelled
  • completed: Job already completed
const result = await queue.cancel('job-123');

switch (result.status) {
  case 'cancelled':
    console.log('Job cancelled');
    break;
  case 'processing':
    console.log('Job is currently processing, cannot cancel');
    break;
  case 'completed':
    console.log('Job already completed');
    break;
  case 'not_found':
    console.log('Job not found');
    break;
}

UpdateResultTTLResult

Result type returned by the updateResultTTL() method.
type UpdateResultTTLResult =
  | { status: 'updated' }
  | { status: 'not_found' }
  | { status: 'not_terminal' }
  | { status: 'missing_payload' }
status
'updated' | 'not_found' | 'not_terminal' | 'missing_payload'
required
Update result status
  • updated: TTL updated successfully
  • not_found: Job does not exist
  • not_terminal: Job is not in a terminal state (completed/failed)
  • missing_payload: Terminal state exists but payload is missing
const result = await queue.updateResultTTL('job-123', 7200000); // 2 hours

if (result.status === 'updated') {
  console.log('TTL updated successfully');
} else if (result.status === 'not_terminal') {
  console.log('Cannot update TTL: job is still processing');
}

Hook Types

AfterExecutionContext

Context object passed to the afterExecution hook.
id
string
required
Job identifier
payload
TPayload
required
Job payload
attempts
number
required
Number of execution attempts
maxAttempts
number
required
Maximum number of attempts
createdAt
number
required
Timestamp when job was created (milliseconds since epoch)
status
'completed' | 'failed'
required
Job execution outcome
result
TResult
Job result (if completed successfully)
error
Error
Error object (if failed)
ttl
number
required
TTL for result/error in milliseconds
workerId
string
required
Worker that processed the job
startedAt
number
required
Timestamp when execution started (milliseconds since epoch)
finishedAt
number
required
Timestamp when execution finished (milliseconds since epoch)
durationMs
number
required
Execution duration in milliseconds

AfterExecutionHook

Function type for the afterExecution hook.
type AfterExecutionHook<TPayload, TResult> = (
  context: AfterExecutionContext<TPayload, TResult>
) => void | Promise<void>
The hook is called after job execution completes (success or failure) but before the terminal state is persisted.
const queue = new Queue({
  storage,
  afterExecution: async (context) => {
    // Log to external system
    await logger.info({
      jobId: context.id,
      status: context.status,
      duration: context.durationMs,
      workerId: context.workerId
    });
    
    // Send metrics
    if (context.status === 'completed') {
      metrics.increment('jobs.completed');
      metrics.timing('jobs.duration', context.durationMs);
    } else {
      metrics.increment('jobs.failed');
      console.error(`Job ${context.id} failed:`, context.error);
    }
  }
});

Internal Types

SerializedError

Serialized error information stored with failed jobs.
message
string
required
Error message
code
string
Error code
stack
string
Stack trace

JobHandler

Function type for job handlers.
type JobHandler<TPayload, TResult> =
  | ((job: Job<TPayload>) => Promise<TResult>)
  | ((job: Job<TPayload>, callback: (err: Error | null, result?: TResult) => void) => void)
Supports both async/await and callback-style handlers.
// Async handler
queue.process(async (job) => {
  return await processData(job.payload);
});

// Callback handler
queue.process((job, callback) => {
  processData(job.payload, (err, result) => {
    if (err) callback(err);
    else callback(null, result);
  });
});

QueueEvents

Event types emitted by the Queue instance.
interface QueueEvents<TResult> {
  started: []
  stopped: []
  error: [error: Error]
  enqueued: [id: string]
  completed: [id: string, result: TResult]
  failed: [id: string, error: Error]
  failing: [id: string, error: Error, attempt: number]
  requeued: [id: string]
  cancelled: [id: string]
  stalled: [id: string]
}
See Events for detailed event documentation.