Skip to main content
The Reaper is a monitoring component that detects and recovers stalled jobs. A job becomes “stalled” when a worker crashes or hangs while processing it. The Reaper automatically requeues stalled jobs so they can be processed by healthy workers.

What is a Stalled Job?

A stalled job is one that has been in the “processing” state longer than the visibilityTimeout:
// Worker starts processing job at 12:00:00
// visibilityTimeout = 30000 (30 seconds)

// Worker crashes at 12:00:15 (still processing)

// At 12:00:30, the Reaper detects the job is stalled
// Job is requeued and picked up by another worker
Why jobs stall:
  • Worker process crashes (OOM, unhandled exception)
  • Worker server dies (hardware failure, power loss)
  • Worker hangs (infinite loop, deadlock)
  • Network partition (worker loses connection to storage)

How the Reaper Works

The Reaper uses an event-based monitoring approach with per-job timers:

1. Subscribe to Job Events

On startup, the Reaper subscribes to job state change events:
const reaper = new Reaper({ storage, visibilityTimeout: 30000 })
await reaper.start()

// Internally subscribes to all job events
const unsubscribe = await storage.subscribeToEvents((id, event) => {
  if (event === 'processing') {
    reaper.#startTimer(id)  // Start visibility timeout timer
  } else if (event === 'completed' || event === 'failed') {
    reaper.#cancelTimer(id)  // Cancel timer - job finished
  }
})

2. Set Per-Job Timers

When a job transitions to “processing”, the Reaper starts a timer: Implementation (from src/reaper.ts:295):
#startTimer(id: string): void {
  this.#cancelTimer(id)  // Cancel any existing timer

  const timer = setTimeout(() => {
    this.#processingTimers.delete(id)
    this.#checkJob(id).catch(err => {
      this.#emitError(err, 'Failed checking job after visibility timer.')
    })
  }, this.#visibilityTimeout)

  this.#processingTimers.set(id, timer)
}

3. Check and Recover Stalled Jobs

When the timer fires, the Reaper checks if the job is truly stalled: Implementation (from src/reaper.ts:323):
async #checkJob(id: string): Promise<void> {
  const state = await this.#storage.getJobState(id)
  if (!state) return

  const { status, timestamp, workerId } = parseState(state)

  if (status !== 'processing') {
    // Job already finished, nothing to do
    return
  }

  // Check if visibility timeout has elapsed
  const elapsed = Date.now() - timestamp
  if (elapsed < this.#visibilityTimeout) {
    // Not yet stalled, restart timer for remaining time
    const remaining = this.#visibilityTimeout - elapsed
    setTimeout(() => this.#checkJob(id), remaining)
    return
  }

  // Job is stalled - recover it
  await this.#recoverStalledJob(id, workerId)
}

4. Requeue Stalled Jobs

Recovery involves finding the job in the worker’s processing queue and requeueing it: Implementation (from src/reaper.ts:359):
async #recoverStalledJob(id: string, workerId?: string): Promise<void> {
  // Get the job from the worker's processing queue
  const processingJobs = await this.#storage.getProcessingJobs(workerId)

  // Find the message for this job
  let jobMessage: Buffer | null = null
  for (const message of processingJobs) {
    const queueMessage = this.#payloadSerde.deserialize(message)
    if (queueMessage.id === id) {
      jobMessage = message
      break
    }
  }

  if (!jobMessage) return  // Already processed

  // Requeue the job
  await this.#storage.requeue(id, jobMessage, workerId)

  // Update state to reflect retry
  const newState = `failing:${Date.now()}:${queueMessage.attempts + 1}`
  await this.#storage.setJobState(id, newState)

  this.emit('stalled', id)
}

Visibility Timeout

The visibilityTimeout determines how long a job can be “in-flight” before it’s considered stalled:
const queue = new Queue({
  storage,
  visibilityTimeout: 30000  // 30 seconds
})

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000  // MUST match the queue's timeout
})
The Reaper’s visibilityTimeout MUST match the Queue’s visibilityTimeout. If they differ, jobs may be recovered prematurely or too late.

Choosing a Timeout Value

Consider your typical job duration:
// Fast jobs (API calls, cache lookups): 10-30 seconds
const reaper = new Reaper({ storage, visibilityTimeout: 30000 })

// Medium jobs (database queries, file processing): 1-5 minutes
const reaper = new Reaper({ storage, visibilityTimeout: 120000 })

// Slow jobs (report generation, video encoding): 5-30 minutes
const reaper = new Reaper({ storage, visibilityTimeout: 600000 })
Set the timeout to ~2x your P95 job duration. This gives jobs enough time to complete while detecting stalls quickly.

When to Run the Reaper

Single Worker (No Reaper Needed)

If you have a single worker, you don’t need the Reaper:
const queue = new Queue({ storage })
queue.execute(async (job) => { /* ... */ })
await queue.start()

// No reaper needed - if this process crashes,
// jobs will be reprocessed on restart
With multiple workers, run a Reaper to handle worker crashes:
// worker-1.ts
const queue1 = new Queue({ storage, workerId: 'worker-1' })
queue1.execute(handler)
await queue1.start()

// worker-2.ts
const queue2 = new Queue({ storage, workerId: 'worker-2' })
queue2.execute(handler)
await queue2.start()

// reaper.ts (separate process)
const reaper = new Reaper({ storage, visibilityTimeout: 30000 })
await reaper.start()

Embedded Reaper

You can also run the Reaper in the same process as a worker:
const queue = new Queue({ storage, workerId: `worker-${process.pid}` })
const reaper = new Reaper({ storage, visibilityTimeout: 30000 })

queue.execute(handler)

await Promise.all([
  queue.start(),
  reaper.start()
])

process.on('SIGTERM', async () => {
  await Promise.all([
    queue.stop(),
    reaper.stop()
  ])
  process.exit(0)
})

Leader Election for High Availability

Run multiple Reaper instances with leader election enabled. Only one Reaper is active at a time:
const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true,
    lockTTL: 30000,              // Lock expires after 30s
    renewalInterval: 10000,      // Leader renews every 10s
    acquireRetryInterval: 5000   // Followers retry every 5s
  }
})

reaper.on('leadershipAcquired', () => {
  console.log(`Reaper ${reaper.reaperId} is now the leader`)
})

reaper.on('leadershipLost', () => {
  console.log(`Reaper ${reaper.reaperId} lost leadership`)
})

await reaper.start()

Leader Election Configuration

enabled
boolean
default:false
Enable leader election. When disabled, all Reaper instances are active (use for single Reaper deployments).
lockTTL
number
default:30000
Lock expiry time in milliseconds. If the leader crashes, the lock expires and a follower takes over.
renewalInterval
number
default:10000
How often the leader renews the lock (should be ~1/3 of lockTTL).
acquireRetryInterval
number
default:5000
How often followers attempt to acquire leadership.

Leader Election Flow

Leader lifecycle:
1. Reaper startsTry to acquire lock
2. If acquiredBecome leader
   - Subscribe to job events
   - Start monitoring timers
   - Renew lock every renewalInterval
3. If renewal failsLose leadership
   - Unsubscribe from events
   - Clear all timers
   - Switch to follower mode
4. FollowerRetry acquisition every acquireRetryInterval
Implementation (from src/reaper.ts:194):
if (this.#isLeader) {
  // Renew the lock
  const renewed = await this.#tryRenewLock(lockTTL)
  if (!renewed) {
    // Lost leadership - transition to follower
    await this.#transitionToFollower()
  }
} else {
  // Try to acquire lock
  const acquired = await this.#tryAcquireLock(lockTTL)
  if (acquired) {
    // Became leader - transition to leader
    await this.#transitionToLeader()
  }
}

Storage Backend Support

RedisStorage - Full support using SET NX PX pattern:
// Acquire: SET key value NX PX ttl
await redis.set('jq:reaper:lock', reaperId, 'NX', 'PX', 30000)

// Renew: Lua script to check owner and extend TTL
local current = redis.call('GET', KEYS[1])
if current == ARGV[1] then
  redis.call('PEXPIRE', KEYS[1], ARGV[2])
  return 1
end
return 0
FileStorage - Supported using exclusive file creation:
// Acquire: write lock file with wx flag (fails if exists)
await writeFile(lockPath, JSON.stringify({ ownerId, expiresAt }), { flag: 'wx' })
MemoryStorage - Not supported (single process, no distributed locking needed)

Initial Scan for Stalled Jobs

On startup, the Reaper scans all workers’ processing queues to catch jobs that were stalled before the Reaper started: Implementation (from src/reaper.ts:403):
async #checkStalledJobs(): Promise<void> {
  const workers = await this.#storage.getWorkers()

  for (const workerId of workers) {
    await this.#checkWorkerProcessingQueue(workerId)
  }
}

async #checkWorkerProcessingQueue(workerId: string): Promise<void> {
  const processingJobs = await this.#storage.getProcessingJobs(workerId)

  for (const message of processingJobs) {
    const queueMessage = this.#payloadSerde.deserialize(message)
    const state = await this.#storage.getJobState(queueMessage.id)

    if (!state) continue

    const { status, timestamp } = parseState(state)

    if (status === 'processing') {
      const elapsed = Date.now() - timestamp
      if (elapsed >= this.#visibilityTimeout) {
        // Job is stalled
        await this.#recoverStalledJob(queueMessage.id, workerId)
      } else {
        // Start timer for remaining time
        const remaining = this.#visibilityTimeout - elapsed
        setTimeout(() => this.#checkJob(queueMessage.id), remaining)
      }
    }
  }
}
This ensures no stalled jobs are missed, even if they stalled before the Reaper started.

Reaper Configuration

const reaper = new Reaper<TPayload>({
  storage,                    // Storage backend (required)
  visibilityTimeout: 30000,   // Milliseconds before job is stalled (default: 30000)
  payloadSerde,               // Custom payload serializer (default: JSON)
  logger,                     // Pino logger (default: no-op)
  leaderElection: {           // Optional leader election config
    enabled: false,
    lockTTL: 30000,
    renewalInterval: 10000,
    acquireRetryInterval: 5000
  }
})

Reaper Events

Monitor Reaper activity with events:
reaper.on('stalled', (id: string) => {
  console.log(`Job ${id} was stalled and requeued`)
})

reaper.on('error', (error: Error) => {
  console.error('Reaper error:', error)
})

reaper.on('leadershipAcquired', () => {
  console.log('This reaper is now the leader')
})

reaper.on('leadershipLost', () => {
  console.log('This reaper lost leadership')
})
Event definitions (from src/reaper.ts:26):
interface ReaperEvents {
  error: [error: Error]
  stalled: [id: string]
  leadershipAcquired: []
  leadershipLost: []
}

Resource Cleanup

The Reaper maintains per-job timers, so proper cleanup is important:
await reaper.start()
// Creates timers for processing jobs

await reaper.stop()
// 1. Clears all timers
// 2. Unsubscribes from events
// 3. Releases leader lock (if leader)
From src/reaper.ts:141:
async #becomeInactive(): Promise<void> {
  // Clear all processing timers
  for (const timer of this.#processingTimers.values()) {
    clearTimeout(timer)
  }
  this.#processingTimers.clear()

  // Unsubscribe from events
  if (this.#unsubscribe) {
    await this.#unsubscribe()
    this.#unsubscribe = null
  }
}
Always call reaper.stop() during graceful shutdown to clean up timers and release locks.

Example: Production Deployment

Deploy 3 workers and 2 Reapers with leader election:
// worker.ts (run 3 instances)
import { Queue, RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })
const queue = new Queue({
  storage,
  workerId: `worker-${process.env.HOSTNAME}-${process.pid}`,
  concurrency: 10,
  visibilityTimeout: 60000  // 1 minute
})

queue.execute(async (job) => {
  // Process job
  return result
})

await queue.start()

process.on('SIGTERM', async () => {
  await queue.stop()
  process.exit(0)
})
// reaper.ts (run 2 instances for HA)
import { Reaper, RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })
const reaper = new Reaper({
  storage,
  visibilityTimeout: 60000,  // MUST match worker timeout
  leaderElection: {
    enabled: true,
    lockTTL: 30000,
    renewalInterval: 10000,
    acquireRetryInterval: 5000
  }
})

reaper.on('leadershipAcquired', () => {
  console.log('Became leader')
})

reaper.on('stalled', (id) => {
  console.warn(`Recovered stalled job: ${id}`)
})

await reaper.start()

process.on('SIGTERM', async () => {
  await reaper.stop()
  process.exit(0)
})
Deployment:
# docker-compose.yml
services:
  redis:
    image: redis:7-alpine

  worker:
    build: .
    command: node worker.js
    environment:
      REDIS_URL: redis://redis:6379
    deploy:
      replicas: 3

  reaper:
    build: .
    command: node reaper.js
    environment:
      REDIS_URL: redis://redis:6379
    deploy:
      replicas: 2  # High availability

Best Practices

Match visibility timeouts: Always set the Reaper’s visibilityTimeout to match your Queue’s timeout.
Run multiple Reapers with leader election: For production, deploy 2-3 Reaper instances with leader election for high availability.
Monitor stalled events: Track the stalled event rate. High rates indicate worker instability or timeout misconfiguration.
Don’t set timeout too short: If jobs regularly exceed the timeout, they’ll be incorrectly marked as stalled and requeued multiple times.
Don’t run Reaper with MemoryStorage in multi-process setups: MemoryStorage is single-process only. Use RedisStorage or FileStorage for distributed setups.