Skip to main content
MemoryStorage is an in-memory implementation of the Storage interface, ideal for testing, development, and single-process deployments where persistence is not required.

Constructor

import { MemoryStorage } from '@platformatic/job-queue'

const storage = new MemoryStorage()
The MemoryStorage constructor takes no parameters and creates a fully in-memory storage backend.

When to Use

MemoryStorage is best suited for:
  • Testing: Unit and integration tests where you need fast, isolated storage
  • Development: Local development environments without external dependencies
  • Single-process deployments: Applications running on a single Node.js process
  • Prototyping: Quick proof-of-concepts without setting up Redis or file storage

Limitations

Not for production: MemoryStorage has significant limitations that make it unsuitable for production use.
  • No persistence: All data is lost when the process exits or crashes
  • Single-process only: Cannot be shared across multiple worker processes or machines
  • Memory constraints: All jobs, results, and errors are stored in RAM
  • No leader election: Does not implement the optional leader election methods

Features

  • TTL cleanup: Automatically cleans up expired results and errors every second
  • Blocking dequeue: Implements blocking semantics with in-process waiters
  • Event notifications: Full pub/sub support using Node.js EventEmitter
  • Atomic operations: Simulates atomic operations with synchronous JavaScript

Example

import { Queue } from '@platformatic/job-queue'
import { MemoryStorage } from '@platformatic/job-queue'

// Create queue with memory storage
const queue = new Queue({
  storage: new MemoryStorage(),
  async process(job) {
    console.log('Processing:', job.data)
    return { status: 'done' }
  }
})

await queue.start()

// Enqueue a job
const job = await queue.enqueue({ message: 'Hello, world!' })

// Wait for completion
const result = await job.waitForCompletion()
console.log(result) // { status: 'done' }

await queue.stop()

Testing Utilities

MemoryStorage provides a clear() method for resetting state between tests:
import { MemoryStorage } from '@platformatic/job-queue'

const storage = new MemoryStorage()
await storage.connect()

// ... run tests ...

// Clear all data
storage.clear()

Implementation Details

Data Structures

Internally, MemoryStorage uses JavaScript Maps and Arrays:
  • Queue: Array of Buffer objects (FIFO)
  • Processing queues: Map of workerId to Array of jobs
  • Jobs registry: Map of job ID to state string
  • Results/Errors: Map with TTL tracking
  • Workers: Map with expiration timestamps

Dequeue Blocking

When dequeue() is called and the queue is empty, MemoryStorage creates a Promise that resolves when:
  • A job is enqueued (triggers notification to waiters)
  • The timeout expires (resolves with null)
  • The storage disconnects (resolves with null)

TTL Management

A cleanup interval runs every 1 second to:
  • Remove expired results and errors
  • Remove expired worker registrations
The interval is started in connect() and stopped in disconnect().