Overview
Leader election allows you to run multiple Reaper instances for high availability. Only one Reaper (the “leader”) actively monitors and recovers stalled jobs at a time. If the leader fails, another instance automatically takes over.
Why Leader Election?
Without leader election:
Single Reaper = single point of failure
If Reaper crashes, stalled jobs won’t be recovered
No automatic failover
With leader election:
Multiple Reapers provide redundancy
Automatic failover when leader fails
Only one Reaper actively processes at a time (no duplicate recovery)
Zero-downtime Reaper deployments
Architecture
┌─────────────────────────────────────────┐
│ Redis Storage │
│ ┌─────────────────────────────────┐ │
│ │ reaper:lock (Leader Lock) │ │
│ └─────────────────────────────────┘ │
└────────┬────────────┬───────────────────┘
│ │
┌────▼─────┐ ┌───▼──────┐
│ Reaper 1 │ │ Reaper 2 │
│ (Leader) │ │(Follower)│
│ Active │ │ Standby │
└──────────┘ └──────────┘
│
│ Leader crashes
▼
┌──────────┐ ┌──────────┐
│ Reaper 1 │ │ Reaper 2 │
│ Dead │ │ (Leader) │
│ │ │ Active │
└──────────┘ └──────────┘
Basic Configuration
Enable leader election when creating a Reaper:
import { Reaper , RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000 ,
leaderElection: {
enabled: true , // Enable leader election (default: false)
lockTTL: 30000 , // Lock expiry in ms (default: 30s)
renewalInterval: 10000 , // Renew lock every 10s (default: 1/3 of TTL)
acquireRetryInterval: 5000 // Followers retry every 5s (default: 5s)
}
})
reaper . on ( 'leadershipAcquired' , () => {
console . log ( 'This reaper is now the leader' )
})
reaper . on ( 'leadershipLost' , () => {
console . log ( 'This reaper lost leadership' )
})
await reaper . start ()
From the source code (src/reaper.ts:11-16), leader election configuration includes enabled, lockTTL, renewalInterval, and acquireRetryInterval.
Configuration Options
enabled
Type: boolean
Default: false
Enables or disables leader election:
const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000 ,
leaderElection: {
enabled: true
}
})
lockTTL
Type: number (milliseconds)
Default: 30000 (30 seconds)
How long the leader lock is valid. If the leader crashes and doesn’t renew the lock, it expires after this duration:
leaderElection : {
enabled : true ,
lockTTL : 45000 // Lock expires after 45 seconds
}
Set lockTTL longer than renewalInterval to avoid leadership flapping. A good rule of thumb is lockTTL >= 3 × renewalInterval.
renewalInterval
Type: number (milliseconds)
Default: 10000 (10 seconds, or 1/3 of lockTTL)
How often the leader renews its lock:
leaderElection : {
enabled : true ,
lockTTL : 30000 ,
renewalInterval : 10000 // Renew every 10 seconds
}
acquireRetryInterval
Type: number (milliseconds)
Default: 5000 (5 seconds)
How often followers try to acquire leadership:
leaderElection : {
enabled : true ,
acquireRetryInterval : 5000 // Check for leadership every 5s
}
Redis SET NX PX Pattern
Leader election uses Redis’s atomic SET NX PX command. From src/reaper.ts:216-225:
async # tryAcquireLock ( ttlMs : number ): Promise < boolean > {
if (!this.#storage.acquireLeaderLock) {
// Storage doesn't support leader election
this . #emitError ( new Error ( 'Storage does not support leader election' ))
return false
}
return this.#storage.acquireLeaderLock( LOCK_KEY , this.# reaperId , ttlMs)
}
The Redis implementation uses:
SET reaper:lock <reaper-id> NX PX <ttl-ms>
NX - Only set if key doesn’t exist (prevents multiple leaders)
PX - Set expiry in milliseconds (automatic failover)
reaper-id - Unique identifier for this Reaper instance
How It Works
The leader election process (from src/reaper.ts:159-212):
Startup - Try to acquire lock
Each Reaper attempts to acquire the leader lock when it starts: const acquired = await this . #tryAcquireLock ( lockTTL )
if ( acquired ) {
this . #isLeader = true
await this . #becomeActive () // Subscribe and scan for stalled jobs
this . emit ( 'leadershipAcquired' )
}
Leader - Renew lock periodically
The leader renews its lock every renewalInterval: if ( this . #isLeader ) {
const renewed = await this . #tryRenewLock ( lockTTL )
if ( ! renewed ) {
await this . #transitionToFollower () // Lost leadership
}
}
Follower - Retry acquisition
Followers try to acquire the lock every acquireRetryInterval: if ( ! this . #isLeader ) {
const acquired = await this . #tryAcquireLock ( lockTTL )
if ( acquired ) {
await this . #transitionToLeader () // Become active
}
}
Failover - Leader crashes
If the leader crashes without releasing the lock:
Lock expires after lockTTL milliseconds
First follower to retry acquisition becomes new leader
New leader starts monitoring stalled jobs
Multiple Reaper Setup
Run multiple Reaper instances for high availability:
reaper.ts
docker-compose.yml
kubernetes.yaml
import { Reaper , RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'
const logger = pino ()
const storage = new RedisStorage ({
url: process . env . REDIS_URL ,
logger
})
const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000 ,
leaderElection: {
enabled: true ,
lockTTL: 30000 ,
renewalInterval: 10000 ,
acquireRetryInterval: 5000
},
logger
})
// Track leadership changes
reaper . on ( 'leadershipAcquired' , () => {
logger . info ({ reaperId: reaper . reaperId }, 'Leadership acquired' )
})
reaper . on ( 'leadershipLost' , () => {
logger . warn ({ reaperId: reaper . reaperId }, 'Leadership lost' )
})
reaper . on ( 'stalled' , ( id ) => {
logger . warn ({ jobId: id }, 'Recovered stalled job' )
})
await reaper . start ()
logger . info ({
reaperId: reaper . reaperId ,
isLeader: reaper . isLeader
}, 'Reaper started' )
// Graceful shutdown
process . on ( 'SIGTERM' , async () => {
logger . info ( 'Shutting down reaper' )
await reaper . stop () // Releases lock immediately
process . exit ( 0 )
})
Monitoring Leadership
Track which Reaper is currently the leader:
import { Reaper , RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'
const logger = pino ()
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000 ,
leaderElection: { enabled: true },
logger
})
// Check leadership status
setInterval (() => {
logger . info ({
reaperId: reaper . reaperId ,
isLeader: reaper . isLeader
}, 'Reaper status' )
}, 10000 )
reaper . on ( 'leadershipAcquired' , () => {
// Emit metric to monitoring system
metrics . gauge ( 'reaper.is_leader' , 1 )
logger . info ({ reaperId: reaper . reaperId }, 'Now leading' )
})
reaper . on ( 'leadershipLost' , () => {
metrics . gauge ( 'reaper.is_leader' , 0 )
logger . warn ({ reaperId: reaper . reaperId }, 'Lost leadership' )
})
await reaper . start ()
Events
Reaper emits two leadership events (from src/reaper.ts:26-31):
interface ReaperEvents {
error : [ error : Error ]
stalled : [ id : string ]
leadershipAcquired : []
leadershipLost : []
}
leadershipAcquired
Emitted when this Reaper becomes the leader:
reaper . on ( 'leadershipAcquired' , () => {
console . log ( 'I am now the leader' )
// Start custom monitoring, update service registry, etc.
})
leadershipLost
Emitted when this Reaper loses leadership:
reaper . on ( 'leadershipLost' , () => {
console . log ( 'I lost leadership' )
// Stop custom monitoring, remove from service registry, etc.
})
Leadership loss is detected when lock renewal fails. This can happen if:
Another Reaper acquired the lock (shouldn’t happen under normal operation)
Redis connection issues
The Reaper fell behind on renewals
Graceful Shutdown
When a leader shuts down gracefully, it releases the lock immediately (from src/reaper.ts:106-125):
async stop (): Promise < void > {
if (!this.# running ) return
this.#running = false
// Stop leadership loop
if (this.# leadershipTimer ) {
clearInterval ( this . #leadershipTimer )
this . #leadershipTimer = null
}
// Release lock if we're leader
if (this.# isLeader && this.#leaderElection.enabled) {
await this . #releaseLeadership ()
this . #isLeader = false
}
// Become inactive (cleanup)
await this.# becomeInactive ()
}
This allows immediate failover without waiting for lock expiry:
process . on ( 'SIGTERM' , async () => {
console . log ( 'Shutting down...' )
await reaper . stop () // Releases lock immediately
console . log ( 'Lock released, another Reaper can take over' )
process . exit ( 0 )
})
Storage Backend Support
Leader election requires RedisStorage. Other storage backends (MemoryStorage, FileStorage) don’t support leader election and will emit an error.
From src/reaper.ts:216-222:
if ( ! this . #storage . acquireLeaderLock ) {
// Storage doesn't support leader election
this . #emitError ( new Error ( 'Storage does not support leader election' ))
return false
}
Only use leader election with Redis:
import { Reaper , RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const reaper = new Reaper ({
storage , // Must be RedisStorage
leaderElection: { enabled: true }
})
Best Practices
Run at least 3 Reapers in production. This provides resilience against single instance failures while avoiding split-brain scenarios.
Set lockTTL to 3-5× the renewalInterval
Run Reapers in different availability zones for true HA
Monitor the leadershipAcquired and leadershipLost events
Set acquireRetryInterval low enough for quick failover (5-10s)
Use the same visibilityTimeout for Queue and Reaper
Handle leadership events in monitoring/alerting systems
Test failover scenarios during development
Ensure graceful shutdown releases lock quickly
Common Patterns
Combined Worker and Reaper
Run worker and Reaper in the same process:
import { Queue , Reaper , RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
// Worker processes jobs
const queue = new Queue ({
storage ,
workerId: `worker- ${ process . pid } ` ,
concurrency: 10 ,
visibilityTimeout: 30000
})
// Reaper monitors stalled jobs with HA
const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000 ,
leaderElection: {
enabled: true ,
lockTTL: 30000 ,
renewalInterval: 10000
}
})
queue . execute ( async ( job ) => {
return await processJob ( job . payload )
})
await queue . start ()
await reaper . start ()
// Shutdown both
process . on ( 'SIGTERM' , async () => {
await Promise . all ([
queue . stop (),
reaper . stop ()
])
process . exit ( 0 )
})
Health Check Endpoint
Expose Reaper status via HTTP:
import express from 'express'
import { Reaper , RedisStorage } from '@platformatic/job-queue'
const app = express ()
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000 ,
leaderElection: { enabled: true }
})
await reaper . start ()
// Health check endpoint
app . get ( '/health' , ( req , res ) => {
res . json ({
status: 'healthy' ,
reaperId: reaper . reaperId ,
isLeader: reaper . isLeader
})
})
// Readiness check (only ready if leader)
app . get ( '/ready' , ( req , res ) => {
if ( reaper . isLeader ) {
res . json ({ status: 'ready' })
} else {
res . status ( 503 ). json ({ status: 'standby' })
}
})
app . listen ( 3000 )
Troubleshooting
Multiple Leaders
Problem: Multiple Reapers think they’re the leader.
Solution: This shouldn’t happen with RedisStorage. Check:
Redis connection is stable
No clock skew between instances
lockTTL > renewalInterval
Leadership Flapping
Problem: Leadership constantly switches between Reapers.
Solution: Increase lockTTL and renewalInterval:
leaderElection : {
enabled : true ,
lockTTL : 60000 , // Increase from 30s to 60s
renewalInterval : 20000 // Increase from 10s to 20s
}
Slow Failover
Problem: Takes too long for a follower to become leader after leader crashes.
Solution: Decrease lockTTL and acquireRetryInterval:
leaderElection : {
enabled : true ,
lockTTL : 15000 , // Faster expiry
renewalInterval : 5000 , // Renew more frequently
acquireRetryInterval : 2000 // Check more frequently
}
No Leader
Problem: All Reapers report isLeader: false.
Solution: Check Redis connectivity and logs:
import pino from 'pino'
const logger = pino ({ level: 'debug' })
const reaper = new Reaper ({
storage: new RedisStorage ({ url: process . env . REDIS_URL , logger }),
leaderElection: { enabled: true },
logger
})
reaper . on ( 'error' , ( error ) => {
console . error ( 'Reaper error:' , error )
})
See Also