Quick Start
This guide walks you through creating a complete working job queue from scratch. You’ll learn how to create a queue, register a handler, enqueue jobs, and handle results.Prerequisites
- Node.js 22.19.0 or later installed
@platformatic/job-queuepackage installed (Installation Guide)
Complete Example
Let’s build an email processing queue that demonstrates all core features:Create the queue with storage
First, import the required classes and create a queue with in-memory storage:
index.ts
The generic types
<TPayload, TResult> provide full type safety for your job data and results.Register a job handler
Define how your queue should process jobs:The handler receives a
index.ts
Job object with:id- Unique job identifierpayload- The job data (typed as{ email: string })attempts- Current attempt number (starts at 1)signal- AbortSignal for cancellation
Enqueue and wait for results
For request/response patterns, use This will:
enqueueAndWait():index.ts
- Enqueue the job
- Wait for it to be processed
- Return the result (or throw if it fails)
Complete Working Code
Here’s the full example in one file:index.ts
Run the Example
Save the code above asindex.ts and run it:
Expected Output
Understanding the Flow
How enqueue() works
How enqueue() works
When you call
enqueue(id, payload, options?), the library:- Checks if a job with that ID already exists
- If it’s new, serializes the payload and adds it to the queue
- Returns immediately with status
'queued','duplicate', or'completed' - The consumer picks up the job asynchronously
How enqueueAndWait() works
How enqueueAndWait() works
When you call
enqueueAndWait(id, payload, options?), the library:- Enqueues the job (same as
enqueue()) - Subscribes to completion events for that job ID
- Waits until the job completes or times out
- Returns the result or throws an error
How deduplication works
How deduplication works
Jobs are identified by their ID. When you enqueue a job:
- If no job with that ID exists, it’s queued
- If a job is queued/processing, you get status
'duplicate' - If a job completed, you get the cached result immediately
resultTTL milliseconds (default: 1 hour).How graceful shutdown works
How graceful shutdown works
When you call
stop():- The queue stops accepting new jobs
- In-flight jobs continue processing
- The queue waits up to
visibilityTimeoutfor jobs to complete - Any incomplete jobs are returned to the queue
- Storage is disconnected
Configuration Options
TheQueue constructor accepts these options:
| Option | Type | Default | Description |
|---|---|---|---|
storage | Storage | required | Storage backend instance |
workerId | string | uuid() | Unique identifier for this worker |
concurrency | number | 1 | Number of jobs to process in parallel |
maxRetries | number | 3 | Maximum retry attempts for failed jobs |
blockTimeout | number | 5 | Seconds to wait when polling for jobs |
visibilityTimeout | number | 30000 | Milliseconds before a processing job is considered stalled |
resultTTL | number | 3600000 | Milliseconds to cache job results (1 hour) |
logger | pino.Logger | abstractLogger | Logger instance |
Error Handling
Handle errors using try-catch with typed error classes:Listening to Events
The queue emits events for monitoring:Next Steps
Now that you have a working queue, explore more advanced features:Storage Backends
Learn about Redis, FileStorage, and MemoryStorage
Queue Configuration
Configure retries, timeouts, and concurrency
Stalled Job Recovery
Set up the Reaper for automatic recovery
Producer/Consumer
Separate producers and consumers across services