Overview
Graceful shutdown ensures that workers complete their current jobs before terminating. This prevents:
Lost work from interrupted jobs
Inconsistent state in external systems
Wasted resources from partial processing
Unnecessary retries of almost-complete work
How It Works
When queue.stop() is called:
Stop accepting new jobs
The consumer stops dequeuing new jobs from the queue.
Wait for in-flight jobs
The queue waits for all currently processing jobs to complete, up to the visibilityTimeout duration.
Requeue incomplete jobs
Any jobs that don’t complete within the timeout are returned to the queue for retry.
Disconnect from storage
Finally, the storage connection is closed cleanly.
Basic Example
The stop() method handles graceful shutdown automatically:
import { Queue , MemoryStorage } from '@platformatic/job-queue'
const storage = new MemoryStorage ()
const queue = new Queue ({
storage ,
visibilityTimeout: 30000 // Jobs have 30s to complete
})
queue . execute ( async ( job ) => {
// Long-running work...
await doWork ()
return result
})
await queue . start ()
// Handle shutdown signals
process . on ( 'SIGTERM' , async () => {
console . log ( 'Shutting down...' )
await queue . stop () // Waits for in-flight jobs
process . exit ( 0 )
})
From the source code (src/consumer.ts:132-169), the consumer waits up to visibilityTimeout milliseconds for active jobs to complete before forcing shutdown.
Cancellation Detection
Jobs can detect when a shutdown is in progress using job.signal:
queue . execute ( async ( job ) => {
// Check for cancellation
if ( job . signal . aborted ) {
throw new Error ( 'Job cancelled' )
}
// Long-running work...
await doWork ()
// Check again before committing
if ( job . signal . aborted ) {
throw new Error ( 'Job cancelled' )
}
return result
})
The job.signal is an AbortSignal that gets triggered in two scenarios:
During graceful shutdown - When stop() is called and jobs need to be requeued
When visibility timeout expires - When a job runs longer than expected
Complete Shutdown Example
A production-ready shutdown handler that handles multiple signals:
import { Queue , RedisStorage , Reaper } from '@platformatic/job-queue'
import pino from 'pino'
const logger = pino ()
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const queue = new Queue ({
storage ,
workerId: `worker- ${ process . pid } ` ,
concurrency: 10 ,
visibilityTimeout: 30000 ,
logger
})
const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000 ,
logger
})
queue . execute ( async ( job ) => {
// Check for cancellation
if ( job . signal . aborted ) {
throw new Error ( 'Job cancelled' )
}
// Long-running work...
await doWork ()
return result
})
await queue . start ()
await reaper . start ()
logger . info ( 'Worker started' )
// Graceful shutdown function
let isShuttingDown = false
async function shutdown ( signal : string ) {
if ( isShuttingDown ) {
logger . warn ( 'Already shutting down, please wait...' )
return
}
isShuttingDown = true
logger . info ({ signal }, 'Received shutdown signal' )
try {
// Stop accepting new jobs
await queue . stop ()
logger . info ( 'Queue stopped' )
// Stop the reaper
await reaper . stop ()
logger . info ( 'Reaper stopped' )
logger . info ( 'Shutdown complete' )
process . exit ( 0 )
} catch ( error ) {
logger . error ({ error }, 'Error during shutdown' )
process . exit ( 1 )
}
}
// Handle shutdown signals
process . on ( 'SIGTERM' , () => shutdown ( 'SIGTERM' ))
process . on ( 'SIGINT' , () => shutdown ( 'SIGINT' ))
// Handle uncaught errors
process . on ( 'uncaughtException' , ( error ) => {
logger . error ({ error }, 'Uncaught exception' )
shutdown ( 'uncaughtException' )
})
process . on ( 'unhandledRejection' , ( reason ) => {
logger . error ({ reason }, 'Unhandled rejection' )
shutdown ( 'unhandledRejection' )
})
Visibility Timeout Relationship
The visibilityTimeout determines how long stop() will wait:
const queue = new Queue ({
storage ,
visibilityTimeout: 30000 // Wait up to 30s during shutdown
})
// Jobs typically complete in < 5s
const queue = new Queue ({
storage ,
concurrency: 20 ,
visibilityTimeout: 10000 // 10s timeout
})
queue . execute ( async ( job ) => {
// Quick work
return await processQuickly ( job . payload )
})
Job Requeuing
Jobs that don’t complete during shutdown are automatically requeued:
// From src/consumer.ts:186-194
if ( abortSignal . aborted ) {
// Put message back
const queueMessage = this . #deserializeMessage ( message )
this . #logger . warn ({ id: queueMessage . id }, 'Consumer aborted while holding job, requeueing.' )
await this . #storage . requeue ( queueMessage . id , message , this . #workerId )
this . emit ( 'requeued' , queueMessage . id )
break
}
Listen for the requeued event to track this:
queue . on ( 'requeued' , ( id ) => {
console . log ( `Job ${ id } was returned to queue (e.g., during graceful shutdown)` )
})
Kubernetes/Docker Integration
Configure your deployment to allow time for graceful shutdown:
Kubernetes
Docker Compose
Dockerfile
apiVersion : apps/v1
kind : Deployment
metadata :
name : job-worker
spec :
replicas : 3
template :
spec :
containers :
- name : worker
image : my-worker:latest
lifecycle :
preStop :
exec :
# Give workers time to finish
command : [ "/bin/sh" , "-c" , "sleep 5" ]
# Must be longer than visibilityTimeout
terminationGracePeriodSeconds : 45
Ensure terminationGracePeriodSeconds is longer than your visibilityTimeout to avoid forceful termination of in-flight jobs.
Testing Graceful Shutdown
Test your shutdown logic during development:
import { Queue , MemoryStorage } from '@platformatic/job-queue'
import { setTimeout } from 'node:timers/promises'
const storage = new MemoryStorage ()
const queue = new Queue ({
storage ,
concurrency: 1 ,
visibilityTimeout: 10000
})
let jobsCompleted = 0
let jobsCancelled = 0
queue . execute ( async ( job ) => {
console . log ( `Started job ${ job . id } ` )
// Simulate work with cancellation checks
for ( let i = 0 ; i < 10 ; i ++ ) {
if ( job . signal . aborted ) {
console . log ( `Job ${ job . id } was cancelled at step ${ i } ` )
jobsCancelled ++
throw new Error ( 'Cancelled' )
}
await setTimeout ( 500 ) // 500ms per step
}
console . log ( `Completed job ${ job . id } ` )
jobsCompleted ++
return { success: true }
})
queue . on ( 'requeued' , ( id ) => {
console . log ( `Job ${ id } was requeued` )
})
await queue . start ()
// Enqueue some jobs
for ( let i = 0 ; i < 5 ; i ++ ) {
await queue . enqueue ( `job- ${ i } ` , { data: i })
}
// Wait a bit, then shutdown
await setTimeout ( 3000 )
console . log ( 'Initiating shutdown...' )
await queue . stop ()
console . log ( `Completed: ${ jobsCompleted } , Cancelled: ${ jobsCancelled } ` )
Best Practices
Check job.signal.aborted before expensive operations. This allows jobs to exit quickly during shutdown instead of wasting resources on work that will be discarded.
Set visibilityTimeout to be longer than your longest expected job
Use job.signal.aborted checks at natural boundaries in your job logic
Configure infrastructure timeout longer than visibilityTimeout
Log shutdown events for debugging and monitoring
Test graceful shutdown in development
Handle SIGTERM and SIGINT signals
Avoid long-running synchronous operations that can’t be interrupted
Common Issues
Jobs Keep Getting Requeued
Problem: Jobs are repeatedly requeued during shutdown.
Solution: Increase visibilityTimeout to give jobs more time to complete:
const queue = new Queue ({
storage ,
visibilityTimeout: 60000 // Increase from 30s to 60s
})
Process Won’t Exit
Problem: Process hangs after calling stop().
Solution: Check for lingering resources (see src/CLAUDE.md guidelines):
// Ensure all timers and connections are cleaned up
await queue . stop ()
await storage . disconnect ()
// If using custom resources, clean them up
clearInterval ( myInterval )
myConnection . close ()
Jobs Lost During Shutdown
Problem: Jobs disappear when worker shuts down.
Solution: Ensure you’re running a Reaper to recover stalled jobs:
const reaper = new Reaper ({
storage ,
visibilityTimeout: 30000 // Match queue timeout
})
await reaper . start ()
// Shutdown both
process . on ( 'SIGTERM' , async () => {
await queue . stop ()
await reaper . stop () // Don't forget this!
})
See Also