Skip to main content
FileStorage is a file-based storage backend that provides persistence without external dependencies like Redis. It’s ideal for single-node deployments that need to survive process restarts.

Constructor

import { FileStorage } from '@platformatic/job-queue'

const storage = new FileStorage({
  basePath: '/var/lib/myapp/queue'
})

Configuration

basePath
string
required
Base directory for storing all queue data. The directory will be created if it doesn’t exist.FileStorage creates the following subdirectories:
  • queue/ - Pending jobs
  • processing/ - Jobs being processed by workers
  • jobs/ - Job state files
  • results/ - Job results
  • errors/ - Job errors
  • workers/ - Worker registrations
  • notify/ - Notification files

When to Use

FileStorage is suitable for:
  • Single-node deployments: Applications running on one machine
  • Persistence without Redis: No external dependencies required
  • Development with persistence: Local development that survives restarts
  • Low-throughput queues: Background jobs that don’t require high concurrency
  • Embedded systems: Resource-constrained environments

Limitations

Not for distributed systems: FileStorage only works on a single machine and cannot be shared across multiple nodes.
  • Single machine only: Cannot scale horizontally
  • Filesystem performance: Limited by disk I/O and filesystem characteristics
  • Network storage risks: Using NFS/CIFS may cause race conditions
  • No cross-platform sync: Cannot share between Windows and Unix systems

Features

Atomic Writes

All writes use fast-write-atomic for crash safety:
// Atomic write: creates temp file, then renames
await storage.enqueue(jobId, message, timestamp)
Benefits:
  • Crash resistant: Partial writes never corrupt existing files
  • Race condition safe: Multiple processes can write safely
  • Atomic operations: Rename is atomic on most filesystems

Filesystem Watching

Uses Node.js fs.watch() for efficient blocking dequeue:
// Worker blocks until a job file appears or timeout
const job = await storage.dequeue(workerId, 30) // 30 second timeout
Benefits:
  • No polling: Efficiently waits for filesystem events
  • Low CPU usage: OS-level notifications via inotify/FSEvents
  • Automatic retry: Watcher restarts on errors

Leader Election

Implements file-based leader election for cleanup coordination:
// Try to acquire lock file
const isLeader = await storage.acquireLeaderLock('cleanup-leader', instanceId, 10000)

if (isLeader) {
  // This instance runs TTL cleanup
  await storage.renewLeaderLock('cleanup-leader', instanceId, 10000)
}
Only the leader instance performs TTL cleanup to avoid duplicate work.

TTL Management

Results and errors are stored with separate .ttl files:
results/
  job123.result  # The actual result data
  job123.ttl     # Expiration timestamp
The cleanup leader checks and removes expired files every second.

Example

Basic Usage

import { Queue } from '@platformatic/job-queue'
import { FileStorage } from '@platformatic/job-queue'
import { join } from 'node:path'

const queue = new Queue({
  storage: new FileStorage({
    basePath: join(process.cwd(), 'queue-data')
  }),
  async process(job) {
    console.log('Processing:', job.data)
    return { status: 'completed' }
  }
})

await queue.start()

// Enqueue jobs
const job = await queue.enqueue({ task: 'process me' })

// Jobs persist across restarts
await queue.stop()

// Start again - jobs are still there
await queue.start()

Multiple Workers on Same Machine

// worker-1.js
const queue1 = new Queue({
  storage: new FileStorage({ basePath: '/var/lib/myapp/queue' }),
  async process(job) { /* ... */ }
})
await queue1.start()

// worker-2.js
const queue2 = new Queue({
  storage: new FileStorage({ basePath: '/var/lib/myapp/queue' }),
  async process(job) { /* ... */ }
})
await queue2.start()

// Both workers share the same queue via filesystem
// Atomic rename ensures only one worker claims each job

Temporary Queue for Testing

import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { mkdtemp } from 'node:fs/promises'

const tempDir = await mkdtemp(join(tmpdir(), 'queue-test-'))

const queue = new Queue({
  storage: new FileStorage({ basePath: tempDir }),
  async process(job) { /* ... */ }
})

await queue.start()
// Run tests...
await queue.stop()

// Clean up
await rm(tempDir, { recursive: true })

File Structure

With basePath: '/var/lib/queue', FileStorage creates:
/var/lib/queue/
├── queue/
│   ├── 000000000001-job123.msg
│   ├── 000000000002-job124.msg
│   └── 000000000003-job125.msg
├── processing/
│   ├── worker-1/
│   │   └── job126.msg
│   └── worker-2/
│       └── job127.msg
├── jobs/
│   ├── job123.state
│   ├── job124.state
│   └── job125.state
├── results/
│   ├── job123.result
│   └── job123.ttl
├── errors/
│   ├── job124.error
│   └── job124.ttl
├── workers/
│   ├── worker-1.worker
│   └── worker-2.worker
├── notify/
│   └── job123-1234567890.notify
└── cleanup-leader.lock

File Naming Conventions

  • Queue files: {sequence}-{jobId}.msg (sequence ensures FIFO order)
  • State files: {jobId}.state (contains state string)
  • Result files: {jobId}.result + {jobId}.ttl
  • Error files: {jobId}.error + {jobId}.ttl
  • Worker files: {workerId}.worker (contains expiry timestamp)
  • Notify files: {jobId}-{timestamp}.notify
  • Lock files: {lockKey}.lock (JSON with ownerId and expiresAt)

Implementation Details

Dequeue Race Conditions

Multiple workers can safely dequeue from the same filesystem:
  1. Worker lists queue files sorted by sequence
  2. Worker attempts atomic rename() to claim file
  3. Only one worker succeeds (OS guarantees atomicity)
  4. Failed workers try the next file
// Atomic claim - only one succeeds
try {
  await rename(
    '/var/lib/queue/queue/000000000001-job123.msg',
    '/var/lib/queue/processing/worker-1/job123.msg'
  )
  // Success - this worker got the job
} catch {
  // Another worker claimed it, try next file
}

Notification System

FileStorage uses a hybrid notification approach:
  1. In-process: EventEmitter for same-process subscribers
  2. Cross-process: File watcher on notify/ directory
When a job completes:
// Write notification file
await writeFile(
  '/var/lib/queue/notify/job123-1234567890.notify',
  'job123:completed'
)

// Watcher picks it up and emits event
// File is automatically deleted after processing

Cleanup Leadership

Only one FileStorage instance performs TTL cleanup:
  • Leader: Acquired lock, runs cleanup interval
  • Follower: No lock, tries to acquire every 5 seconds
  • Lock renewal: Leader renews every 3 seconds (TTL is 10 seconds)
  • Automatic failover: If leader crashes, lock expires and follower takes over

Performance Considerations

Filesystem Recommendations

  • Local SSD: Best performance (low latency, high IOPS)
  • Local HDD: Acceptable for low-throughput queues
  • Network storage: Avoid if possible (NFS/CIFS have race condition risks)
  • Tmpfs/RAM disk: Fast but not persistent (use MemoryStorage instead)

Sequence Number Overflow

Sequence numbers are incremented per enqueue. With 12-digit zero-padding:
  • Max sequence: 999,999,999,999 (1 trillion)
  • At 1000 jobs/sec: Overflows after ~31 years
  • Overflow behavior: Filenames still sort correctly (lexicographic)

Directory Listing Performance

For large queues, consider:
  • Periodic cleanup: Use reaper to remove old job states
  • Partition by date: Implement custom storage with date-based subdirectories
  • Monitor inode usage: Large queues can exhaust filesystem inodes

Testing

FileStorage provides a clear() method for testing:
import { FileStorage } from '@platformatic/job-queue'

const storage = new FileStorage({ basePath: '/tmp/test-queue' })
await storage.connect()

// Run tests...

// Clear all data
await storage.clear()
await storage.disconnect()
The clear() method:
  • Removes all queue files
  • Removes all job states
  • Removes all results and errors
  • Recreates empty directory structure