Skip to main content

Overview

The producer/consumer pattern separates the components that enqueue jobs (producers) from the components that process them (workers/consumers). This pattern is essential for building scalable, distributed job processing systems.

When to Use This Pattern

Use the producer/consumer pattern when:
  • You need to scale workers independently from your application servers
  • Multiple services need to enqueue jobs to the same queue
  • You want to isolate job processing from request handling
  • You need different machines or containers for CPU-intensive work
  • You want to deploy workers in different regions or availability zones

Architecture

┌──────────┐     ┌─────────────┐     ┌──────────┐
│ Producer │────>│   Storage   │<────│  Worker  │
│ (API)    │     │  (Redis)    │     │ (Process)│
└──────────┘     └─────────────┘     └──────────┘


                   ┌────┴─────┐
                   │  Worker  │
                   │(Process) │
                   └──────────┘

Producer Setup

Producers only enqueue jobs. They don’t register handlers or process work.
import { Queue, RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })
const producer = new Queue({ storage })

await producer.start()
await producer.enqueue('task-1', { ... })
await producer.stop()

Worker Setup

Workers register handlers and process jobs from the queue. Multiple worker instances can run concurrently.
import { Queue, RedisStorage, Reaper } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })
const queue = new Queue({
  storage,
  workerId: `worker-${process.pid}`,
  concurrency: 10
})

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000
})

queue.execute(async (job) => {
  // Process job
  return result
})

await queue.start()
await reaper.start()

process.on('SIGTERM', async () => {
  await queue.stop()
  await reaper.stop()
})

Storage Configuration

For distributed setups, use RedisStorage which supports multiple producers and consumers:
import { RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({
  url: 'redis://localhost:6379',
  keyPrefix: 'myapp:', // Namespace your keys
  logger // Optional pino logger
})
RedisStorage uses atomic operations via Lua scripts and blocking dequeue with BLMOVE, making it safe for concurrent access from multiple processes.

Running Multiple Workers

Scale horizontally by running multiple worker processes:
1

Ensure unique worker IDs

Each worker should have a unique workerId. Use process PID or container ID:
const queue = new Queue({
  storage,
  workerId: `worker-${process.pid}`,
  concurrency: 10
})
2

Configure the Reaper

Run a Reaper instance to recover stalled jobs. Use leader election for high availability:
const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true
  }
})
3

Deploy workers

Start multiple worker processes:
# Terminal 1
node worker.js

# Terminal 2
node worker.js

# Terminal 3
node worker.js
Or use Docker/Kubernetes:
docker-compose.yml
version: '3'
services:
  worker:
    build: .
    command: node worker.js
    environment:
      - REDIS_URL=redis://redis:6379
    deploy:
      replicas: 5

Monitoring and Observability

Track job processing across distributed workers:
import pino from 'pino'

const logger = pino()

const queue = new Queue({
  storage,
  workerId: `worker-${process.pid}`,
  logger
})

// Track metrics
let processedCount = 0
let failedCount = 0

queue.on('completed', (id) => {
  processedCount++
  logger.info({ jobId: id, total: processedCount }, 'Job completed')
})

queue.on('failed', (id, error) => {
  failedCount++
  logger.error({ 
    jobId: id, 
    error: error.message,
    total: failedCount 
  }, 'Job failed')
})

// Periodic status reporting
setInterval(() => {
  logger.info({
    workerId: queue.workerId,
    processed: processedCount,
    failed: failedCount
  }, 'Worker stats')
}, 60000)

Best Practices

Always run a Reaper when using multiple workers. Without it, jobs from crashed workers won’t be recovered.
Set appropriate visibility timeouts. The visibilityTimeout should be longer than your longest expected job duration. If a job runs longer, the Reaper will mark it as stalled.
  • Use unique job IDs to enable deduplication across producers
  • Set resultTTL based on how long you need cached results
  • Monitor the stalled event to detect worker issues
  • Use the same visibilityTimeout for both Queue and Reaper
  • Configure concurrency based on your job’s resource usage
  • Use leader election for Reaper in production environments

Differences from Single-Process Queue

AspectSingle ProcessProducer/Consumer
StorageMemoryStorage OKRequires RedisStorage or FileStorage
ScalabilityLimited to one processHorizontally scalable
Failure RecoveryJobs lost on crashReaper recovers stalled jobs
DeploymentSimpleRequires coordination
Use CaseDevelopment, testingProduction, high-throughput

Example: Image Processing Service

Complete example of a distributed image processing system:
api-server.ts
import express from 'express'
import { Queue, RedisStorage } from '@platformatic/job-queue'

const app = express()
const storage = new RedisStorage({ url: process.env.REDIS_URL })

interface ImageJob {
  imageUrl: string
  operations: Array<'resize' | 'watermark' | 'compress'>
}

interface ImageResult {
  processedUrl: string
  size: number
}

const queue = new Queue<ImageJob, ImageResult>({
  storage,
  resultTTL: 30 * 60 * 1000 // Keep results for 30 minutes
})

await queue.start()

app.post('/images/process', async (req, res) => {
  const jobId = `img-${Date.now()}`
  
  try {
    const result = await queue.enqueueAndWait(jobId, {
      imageUrl: req.body.imageUrl,
      operations: req.body.operations
    }, {
      timeout: 60000 // 1 minute timeout
    })
    
    res.json(result)
  } catch (error) {
    res.status(500).json({ error: error.message })
  }
})

app.listen(3000)
image-worker.ts
import { Queue, RedisStorage, Reaper } from '@platformatic/job-queue'
import sharp from 'sharp'

const storage = new RedisStorage({ url: process.env.REDIS_URL })

const queue = new Queue<ImageJob, ImageResult>({
  storage,
  workerId: `image-worker-${process.pid}`,
  concurrency: 5, // Process 5 images concurrently
  visibilityTimeout: 120000, // 2 minutes for image processing
  maxRetries: 3
})

queue.execute(async (job) => {
  // Check for cancellation before expensive work
  if (job.signal.aborted) {
    throw new Error('Job cancelled')
  }
  
  // Download image
  const response = await fetch(job.payload.imageUrl)
  const buffer = Buffer.from(await response.arrayBuffer())
  
  // Process image
  let image = sharp(buffer)
  
  for (const operation of job.payload.operations) {
    if (operation === 'resize') {
      image = image.resize(800, 600)
    } else if (operation === 'watermark') {
      // Add watermark
    } else if (operation === 'compress') {
      image = image.jpeg({ quality: 80 })
    }
  }
  
  // Check again before final step
  if (job.signal.aborted) {
    throw new Error('Job cancelled')
  }
  
  const processed = await image.toBuffer()
  
  // Upload to storage
  const processedUrl = await uploadToS3(processed)
  
  return {
    processedUrl,
    size: processed.length
  }
})

const reaper = new Reaper({
  storage,
  visibilityTimeout: 120000,
  leaderElection: { enabled: true }
})

await queue.start()
await reaper.start()

process.on('SIGTERM', async () => {
  await queue.stop()
  await reaper.stop()
  process.exit(0)
})

See Also