Skip to main content
The Reaper class is a background service that monitors jobs and automatically recovers stalled ones. A job is considered stalled when it has been in the “processing” state longer than the visibility timeout, which typically happens when a worker crashes or loses connection.

Class Signature

class Reaper<TPayload> extends EventEmitter<ReaperEvents>

Type Parameters

  • TPayload - The type of the job payload

How It Works

The Reaper operates in two modes:
  1. Single Instance Mode (default): One Reaper instance actively monitors jobs
  2. Leader Election Mode: Multiple Reaper instances can run for high availability, with only one active at a time

Stall Detection

The Reaper detects stalled jobs by:
  1. Subscribing to job state change events (when a job starts processing)
  2. Starting a timer equal to the visibility timeout for each processing job
  3. When the timer expires, checking if the job is still in the “processing” state
  4. If stalled, recovering the job by requeueing it

Initial Scan

When the Reaper starts (or becomes leader), it performs an initial scan of all workers’ processing queues to catch any jobs that were stalled before the Reaper started.

Constructor

new Reaper<TPayload>(config: ReaperConfig<TPayload>)
Creates a new Reaper instance with the specified configuration.

Configuration

storage
Storage
required
The storage backend instance (must be the same instance used by the Queue).
payloadSerde
Serde<TPayload>
Custom serializer/deserializer for job payloads. Must match the Queue’s serializer. Defaults to JSON serialization.
visibilityTimeout
number
default:"30000"
Maximum processing time in milliseconds before a job is considered stalled. Should match the Queue’s visibilityTimeout setting.
leaderElection
LeaderElectionConfig
Leader election configuration for high availability deployments.
leaderElection.enabled
boolean
required
Whether to enable leader election. Set to true for multi-instance deployments.
leaderElection.lockTTL
number
default:"30000"
Time-to-live for the leader lock in milliseconds. The leader must renew the lock before this expires.
leaderElection.renewalInterval
number
default:"10000"
How often the leader renews its lock in milliseconds. Should be significantly less than lockTTL.
leaderElection.acquireRetryInterval
number
default:"5000"
How often followers attempt to acquire leadership in milliseconds.
logger
Logger
Pino logger instance. Defaults to an abstract no-op logger.

Example

import { Reaper } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'

const storage = new RedisStorage({ url: 'redis://localhost:6379' })

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  logger: pino()
})

Example with Leader Election

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true,
    lockTTL: 30000,
    renewalInterval: 10000,
    acquireRetryInterval: 5000
  },
  logger: pino()
})

Properties

reaperId

The unique identifier for this Reaper instance.
get reaperId(): string

Example

console.log('Reaper ID:', reaper.reaperId)

isLeader

Indicates whether this Reaper instance is currently the active leader (only relevant when leader election is enabled).
get isLeader(): boolean

Example

if (reaper.isLeader) {
  console.log('This reaper is the active leader')
}

Methods

start()

Starts the Reaper. If leader election is disabled, begins monitoring immediately. If leader election is enabled, attempts to acquire leadership.
start(): Promise<void>
void
Promise<void>
Resolves when the Reaper has started (but not necessarily acquired leadership).

Example

await reaper.start()
console.log('Reaper started')

stop()

Stops the Reaper gracefully. Clears all timers, unsubscribes from events, and releases leadership if held.
stop(): Promise<void>
void
Promise<void>
Resolves when the Reaper has stopped completely.

Example

await reaper.stop()
console.log('Reaper stopped')

Events

The Reaper class emits various events during its operation.

stalled

Emitted when a stalled job is detected and recovered.
reaper.on('stalled', (id: string) => {
  console.log(`Recovered stalled job: ${id}`)
})

leadershipAcquired

Emitted when this Reaper instance acquires leadership (only emitted when leader election is enabled).
reaper.on('leadershipAcquired', () => {
  console.log(`Reaper ${reaper.reaperId} became leader`)
})

leadershipLost

Emitted when this Reaper instance loses leadership (only emitted when leader election is enabled).
reaper.on('leadershipLost', () => {
  console.warn(`Reaper ${reaper.reaperId} lost leadership`)
})

error

Emitted when an error occurs in the Reaper’s internal operations.
reaper.on('error', (error: Error) => {
  console.error('Reaper error:', error)
})

Usage Patterns

Single Instance Deployment

For development or simple deployments with a single worker:
import { Queue, Reaper } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: 'redis://localhost:6379' })

const queue = new Queue({
  storage,
  visibilityTimeout: 30000
})

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000 // Must match queue's setting
})

// Register handler
queue.execute(async (job) => {
  // Process job
})

// Start both
await queue.start()
await reaper.start()

// Monitor for stalled jobs
reaper.on('stalled', (id) => {
  console.log(`Job ${id} was stalled and recovered`)
})

High Availability Deployment

For production deployments with multiple workers and Reaper instances:
import { Reaper } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'

const storage = new RedisStorage({ url: 'redis://localhost:6379' })
const logger = pino()

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true,
    lockTTL: 30000,        // Leader lock expires after 30s
    renewalInterval: 10000, // Leader renews every 10s
    acquireRetryInterval: 5000 // Followers check every 5s
  },
  logger
})

// Monitor leadership changes
reaper.on('leadershipAcquired', () => {
  logger.info({ reaperId: reaper.reaperId }, 'Became leader')
})

reaper.on('leadershipLost', () => {
  logger.warn({ reaperId: reaper.reaperId }, 'Lost leadership')
})

reaper.on('stalled', (id) => {
  logger.info({ jobId: id }, 'Recovered stalled job')
})

reaper.on('error', (error) => {
  logger.error({ err: error }, 'Reaper error')
})

await reaper.start()

// In this configuration:
// - Multiple Reaper instances can run safely
// - Only one will be active (the leader)
// - If the leader crashes, another will take over within ~5-10s
// - The leader renews its lock every 10s
// - The lock expires after 30s of no renewal

Separate Reaper Process

You can run the Reaper in a separate process from your workers:
// reaper-service.ts
import { Reaper } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'

const logger = pino()
const storage = new RedisStorage({ 
  url: process.env.REDIS_URL || 'redis://localhost:6379' 
})

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true,
    lockTTL: 30000,
    renewalInterval: 10000,
    acquireRetryInterval: 5000
  },
  logger
})

reaper.on('stalled', (id) => {
  logger.info({ jobId: id }, 'Recovered stalled job')
})

reaper.on('error', (error) => {
  logger.error({ err: error }, 'Reaper error')
})

async function main() {
  await storage.connect()
  await reaper.start()
  
  logger.info('Reaper service started')
}

// Graceful shutdown
process.on('SIGTERM', async () => {
  logger.info('Shutting down reaper service')
  await reaper.stop()
  await storage.disconnect()
  process.exit(0)
})

main().catch((error) => {
  logger.error({ err: error }, 'Failed to start reaper service')
  process.exit(1)
})

Best Practices

Visibility Timeout

Set the visibilityTimeout to a value slightly longer than your longest expected job duration:
// If jobs typically take 1-2 minutes
const visibilityTimeout = 180000 // 3 minutes

const queue = new Queue({ storage, visibilityTimeout })
const reaper = new Reaper({ storage, visibilityTimeout })

Leader Election Timing

For leader election, ensure proper timing relationships:
// Good configuration:
const leaderElection = {
  enabled: true,
  lockTTL: 30000,           // 30 seconds
  renewalInterval: 10000,    // Renew every 10s (1/3 of TTL)
  acquireRetryInterval: 5000 // Check every 5s
}

// renewalInterval should be < lockTTL / 2 for safety margin
// acquireRetryInterval determines failover speed

Error Handling

Always listen for errors:
reaper.on('error', (error) => {
  // Log to monitoring system
  logger.error({ err: error }, 'Reaper encountered an error')
  
  // Optionally alert on critical errors
  if (error.message.includes('connection')) {
    alertOps('Reaper lost connection to storage')
  }
})

Storage Connection

Ensure storage is connected before starting the Reaper:
const storage = new RedisStorage({ url: 'redis://localhost:6379' })

await storage.connect()

const reaper = new Reaper({ storage })
await reaper.start()

Graceful Shutdown

Always stop the Reaper gracefully:
process.on('SIGTERM', async () => {
  await reaper.stop()
  await storage.disconnect()
  process.exit(0)
})

Monitoring

Track Reaper health and effectiveness:
const metrics = {
  stalledJobsRecovered: 0,
  leadershipChanges: 0,
  errors: 0
}

reaper.on('stalled', (id) => {
  metrics.stalledJobsRecovered++
  logger.info({ 
    jobId: id, 
    total: metrics.stalledJobsRecovered 
  }, 'Stalled job recovered')
})

reaper.on('leadershipAcquired', () => {
  metrics.leadershipChanges++
})

reaper.on('leadershipLost', () => {
  metrics.leadershipChanges++
})

reaper.on('error', (error) => {
  metrics.errors++
  logger.error({ err: error, total: metrics.errors }, 'Reaper error')
})

// Periodically log metrics
setInterval(() => {
  logger.info({ 
    metrics,
    isLeader: reaper.isLeader,
    reaperId: reaper.reaperId
  }, 'Reaper metrics')
}, 60000)