MemoryStorage is an in-memory implementation of the Storage interface, ideal for testing, development, and single-process deployments where persistence is not required.
Constructor
import { MemoryStorage } from '@platformatic/job-queue'
const storage = new MemoryStorage()
The MemoryStorage constructor takes no parameters and creates a fully in-memory storage backend.
When to Use
MemoryStorage is best suited for:
- Testing: Unit and integration tests where you need fast, isolated storage
- Development: Local development environments without external dependencies
- Single-process deployments: Applications running on a single Node.js process
- Prototyping: Quick proof-of-concepts without setting up Redis or file storage
Limitations
Not for production: MemoryStorage has significant limitations that make it unsuitable for production use.
- No persistence: All data is lost when the process exits or crashes
- Single-process only: Cannot be shared across multiple worker processes or machines
- Memory constraints: All jobs, results, and errors are stored in RAM
- No leader election: Does not implement the optional leader election methods
Features
- TTL cleanup: Automatically cleans up expired results and errors every second
- Blocking dequeue: Implements blocking semantics with in-process waiters
- Event notifications: Full pub/sub support using Node.js EventEmitter
- Atomic operations: Simulates atomic operations with synchronous JavaScript
Example
import { Queue } from '@platformatic/job-queue'
import { MemoryStorage } from '@platformatic/job-queue'
// Create queue with memory storage
const queue = new Queue({
storage: new MemoryStorage(),
async process(job) {
console.log('Processing:', job.data)
return { status: 'done' }
}
})
await queue.start()
// Enqueue a job
const job = await queue.enqueue({ message: 'Hello, world!' })
// Wait for completion
const result = await job.waitForCompletion()
console.log(result) // { status: 'done' }
await queue.stop()
Testing Utilities
MemoryStorage provides a clear() method for resetting state between tests:
import { MemoryStorage } from '@platformatic/job-queue'
const storage = new MemoryStorage()
await storage.connect()
// ... run tests ...
// Clear all data
storage.clear()
Implementation Details
Data Structures
Internally, MemoryStorage uses JavaScript Maps and Arrays:
- Queue: Array of Buffer objects (FIFO)
- Processing queues: Map of workerId to Array of jobs
- Jobs registry: Map of job ID to state string
- Results/Errors: Map with TTL tracking
- Workers: Map with expiration timestamps
Dequeue Blocking
When dequeue() is called and the queue is empty, MemoryStorage creates a Promise that resolves when:
- A job is enqueued (triggers notification to waiters)
- The timeout expires (resolves with null)
- The storage disconnects (resolves with null)
TTL Management
A cleanup interval runs every 1 second to:
- Remove expired results and errors
- Remove expired worker registrations
The interval is started in connect() and stopped in disconnect().