Overview
The producer/consumer pattern separates the components that enqueue jobs (producers) from the components that process them (workers/consumers). This pattern is essential for building scalable, distributed job processing systems.
When to Use This Pattern
Use the producer/consumer pattern when:
You need to scale workers independently from your application servers
Multiple services need to enqueue jobs to the same queue
You want to isolate job processing from request handling
You need different machines or containers for CPU-intensive work
You want to deploy workers in different regions or availability zones
Architecture
┌──────────┐ ┌─────────────┐ ┌──────────┐
│ Producer │────>│ Storage │<────│ Worker │
│ (API) │ │ (Redis) │ │ (Process)│
└──────────┘ └─────────────┘ └──────────┘
▲
│
┌────┴─────┐
│ Worker │
│(Process) │
└──────────┘
Producer Setup
Producers only enqueue jobs. They don’t register handlers or process work.
producer.ts
producer-api.ts
import { Queue , RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const producer = new Queue ({ storage })
await producer . start ()
await producer . enqueue ( 'task-1' , { ... })
await producer . stop ()
Worker Setup
Workers register handlers and process jobs from the queue. Multiple worker instances can run concurrently.
worker.ts
worker-detailed.ts
import { Queue , RedisStorage , Reaper } from '@platformatic/job-queue'
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const queue = new Queue ({
storage ,
workerId: `worker- ${ process . pid } ` ,
concurrency: 10
})
const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000
})
queue . execute ( async ( job ) => {
// Process job
return result
})
await queue . start ()
await reaper . start ()
process . on ( 'SIGTERM' , async () => {
await queue . stop ()
await reaper . stop ()
})
Storage Configuration
For distributed setups, use RedisStorage which supports multiple producers and consumers:
import { RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage ({
url: 'redis://localhost:6379' ,
keyPrefix: 'myapp:' , // Namespace your keys
logger // Optional pino logger
})
RedisStorage uses atomic operations via Lua scripts and blocking dequeue with BLMOVE, making it safe for concurrent access from multiple processes.
Running Multiple Workers
Scale horizontally by running multiple worker processes:
Ensure unique worker IDs
Each worker should have a unique workerId. Use process PID or container ID: const queue = new Queue ({
storage ,
workerId: `worker- ${ process . pid } ` ,
concurrency: 10
})
Configure the Reaper
Run a Reaper instance to recover stalled jobs. Use leader election for high availability: const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000 ,
leaderElection: {
enabled: true
}
})
Deploy workers
Start multiple worker processes: # Terminal 1
node worker.js
# Terminal 2
node worker.js
# Terminal 3
node worker.js
Or use Docker/Kubernetes: version : '3'
services :
worker :
build : .
command : node worker.js
environment :
- REDIS_URL=redis://redis:6379
deploy :
replicas : 5
Monitoring and Observability
Track job processing across distributed workers:
import pino from 'pino'
const logger = pino ()
const queue = new Queue ({
storage ,
workerId: `worker- ${ process . pid } ` ,
logger
})
// Track metrics
let processedCount = 0
let failedCount = 0
queue . on ( 'completed' , ( id ) => {
processedCount ++
logger . info ({ jobId: id , total: processedCount }, 'Job completed' )
})
queue . on ( 'failed' , ( id , error ) => {
failedCount ++
logger . error ({
jobId: id ,
error: error . message ,
total: failedCount
}, 'Job failed' )
})
// Periodic status reporting
setInterval (() => {
logger . info ({
workerId: queue . workerId ,
processed: processedCount ,
failed: failedCount
}, 'Worker stats' )
}, 60000 )
Best Practices
Always run a Reaper when using multiple workers. Without it, jobs from crashed workers won’t be recovered.
Set appropriate visibility timeouts. The visibilityTimeout should be longer than your longest expected job duration. If a job runs longer, the Reaper will mark it as stalled.
Use unique job IDs to enable deduplication across producers
Set resultTTL based on how long you need cached results
Monitor the stalled event to detect worker issues
Use the same visibilityTimeout for both Queue and Reaper
Configure concurrency based on your job’s resource usage
Use leader election for Reaper in production environments
Differences from Single-Process Queue
Aspect Single Process Producer/Consumer Storage MemoryStorage OKRequires RedisStorage or FileStorage Scalability Limited to one process Horizontally scalable Failure Recovery Jobs lost on crash Reaper recovers stalled jobs Deployment Simple Requires coordination Use Case Development, testing Production, high-throughput
Example: Image Processing Service
Complete example of a distributed image processing system:
import express from 'express'
import { Queue , RedisStorage } from '@platformatic/job-queue'
const app = express ()
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
interface ImageJob {
imageUrl : string
operations : Array < 'resize' | 'watermark' | 'compress' >
}
interface ImageResult {
processedUrl : string
size : number
}
const queue = new Queue < ImageJob , ImageResult >({
storage ,
resultTTL: 30 * 60 * 1000 // Keep results for 30 minutes
})
await queue . start ()
app . post ( '/images/process' , async ( req , res ) => {
const jobId = `img- ${ Date . now () } `
try {
const result = await queue . enqueueAndWait ( jobId , {
imageUrl: req . body . imageUrl ,
operations: req . body . operations
}, {
timeout: 60000 // 1 minute timeout
})
res . json ( result )
} catch ( error ) {
res . status ( 500 ). json ({ error: error . message })
}
})
app . listen ( 3000 )
import { Queue , RedisStorage , Reaper } from '@platformatic/job-queue'
import sharp from 'sharp'
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const queue = new Queue < ImageJob , ImageResult >({
storage ,
workerId: `image-worker- ${ process . pid } ` ,
concurrency: 5 , // Process 5 images concurrently
visibilityTimeout: 120000 , // 2 minutes for image processing
maxRetries: 3
})
queue . execute ( async ( job ) => {
// Check for cancellation before expensive work
if ( job . signal . aborted ) {
throw new Error ( 'Job cancelled' )
}
// Download image
const response = await fetch ( job . payload . imageUrl )
const buffer = Buffer . from ( await response . arrayBuffer ())
// Process image
let image = sharp ( buffer )
for ( const operation of job . payload . operations ) {
if ( operation === 'resize' ) {
image = image . resize ( 800 , 600 )
} else if ( operation === 'watermark' ) {
// Add watermark
} else if ( operation === 'compress' ) {
image = image . jpeg ({ quality: 80 })
}
}
// Check again before final step
if ( job . signal . aborted ) {
throw new Error ( 'Job cancelled' )
}
const processed = await image . toBuffer ()
// Upload to storage
const processedUrl = await uploadToS3 ( processed )
return {
processedUrl ,
size: processed . length
}
})
const reaper = new Reaper ({
storage ,
visibilityTimeout: 120000 ,
leaderElection: { enabled: true }
})
await queue . start ()
await reaper . start ()
process . on ( 'SIGTERM' , async () => {
await queue . stop ()
await reaper . stop ()
process . exit ( 0 )
})
See Also