Skip to main content
The request/response pattern allows you to enqueue a job and wait for its result, turning asynchronous job processing into a synchronous operation. This is useful for scenarios where you need an immediate response, such as API requests or user-facing operations.

enqueueAndWait()

The enqueueAndWait() method enqueues a job and blocks until it completes or times out:
import { Queue, RedisStorage } from '@platformatic/job-queue'

const queue = new Queue<{ url: string }, { status: number; body: string }>({
  storage: new RedisStorage({ url: 'redis://localhost:6379' })
})

try {
  const result = await queue.enqueueAndWait('fetch-1', { url: 'https://api.example.com' }, {
    timeout: 30000,     // 30 seconds
    maxAttempts: 3,
    resultTTL: 300000   // Cache result for 5 minutes
  })
  
  console.log('Response:', result.status, result.body)
} catch (error) {
  if (error instanceof TimeoutError) {
    console.error('Request timed out after 30s')
  } else if (error instanceof JobFailedError) {
    console.error('Job failed:', error.originalError)
  }
}

How It Works

Subscription Before Enqueue

To avoid race conditions, enqueueAndWait() subscribes to job notifications before enqueueing the job: Producer implementation (from src/producer.ts:93):
async enqueueAndWait(id: string, payload: TPayload, options?: EnqueueAndWaitOptions): Promise<TResult> {
  const timeout = options?.timeout ?? 30000

  // 1. Subscribe BEFORE enqueue to avoid race conditions
  const { promise: resultPromise, resolve: resolveResult, reject: rejectResult } = Promise.withResolvers<TResult>()

  const unsubscribe = await this.#storage.subscribeToJob(id, async status => {
    if (status === 'completed') {
      const result = await this.getResult(id)
      if (result !== null) {
        resolveResult(result)
      }
    } else if (status === 'failed') {
      const error = await this.#storage.getError(id)
      const errorMessage = error ? error.toString() : 'Job failed'
      rejectResult(new JobFailedError(id, errorMessage))
    }
  })

  // 2. Now enqueue the job
  const enqueueResult = await this.enqueue(id, payload, options)

  // 3. Handle immediate completion (cached result)
  if (enqueueResult.status === 'completed') {
    await unsubscribe()
    return enqueueResult.result
  }

  // 4. Wait for result with timeout
  const { promise: timeoutPromise, reject: rejectTimeout } = Promise.withResolvers<never>()
  const timeoutId = setTimeout(() => {
    rejectTimeout(new TimeoutError(id, timeout))
  }, timeout)

  try {
    return await Promise.race([resultPromise, timeoutPromise])
  } finally {
    clearTimeout(timeoutId)
    await unsubscribe()
  }
}

Notification Mechanism

Each storage backend implements job notifications differently: RedisStorage uses Redis Pub/Sub:
// Subscribe to job-specific channel
await subscriber.subscribe(`jq:notify:${jobId}`)

// Worker publishes completion
await redis.publish(`jq:notify:${jobId}`, 'completed')
MemoryStorage uses EventEmitter:
this.#notifyEmitter.on(`notify:${id}`, handler)
this.#notifyEmitter.emit(`notify:${id}`, status)
FileStorage uses filesystem watchers:
// Write notification file
await writeFile(`notify/${id}-${timestamp}.notify`, `${id}:completed`)

// Watcher picks it up
for await (const event of watch(notifyPath)) {
  // Read and emit notification
}

Timeout Handling

The timeout option specifies how long to wait for the job to complete:
const result = await queue.enqueueAndWait('job-123', payload, {
  timeout: 10000  // 10 seconds
})
If the job doesn’t complete within the timeout, a TimeoutError is thrown:
import { TimeoutError } from '@platformatic/job-queue'

try {
  await queue.enqueueAndWait('slow-job', payload, { timeout: 5000 })
} catch (error) {
  if (error instanceof TimeoutError) {
    console.log(`Job ${error.jobId} timed out after ${error.timeoutMs}ms`)
    // Job continues processing in the background
  }
}
Timeout does not cancel the job. The job continues processing in the background. Only the caller stops waiting.

Error Handling

Two types of errors can be thrown by enqueueAndWait():

TimeoutError

Thrown when the job doesn’t complete within the timeout period.
class TimeoutError extends Error {
  constructor(public jobId: string, public timeoutMs: number) {
    super(`Job ${jobId} timed out after ${timeoutMs}ms`)
  }
}

JobFailedError

Thrown when the job fails after all retry attempts.
class JobFailedError extends Error {
  constructor(public jobId: string, public originalError: string) {
    super(`Job ${jobId} failed: ${originalError}`)
  }
}
Example error handling:
import { TimeoutError, JobFailedError } from '@platformatic/job-queue'

try {
  const result = await queue.enqueueAndWait('api-request', { url }, { timeout: 10000 })
  return { success: true, data: result }
} catch (error) {
  if (error instanceof TimeoutError) {
    return { success: false, error: 'TIMEOUT', message: 'Request took too long' }
  } else if (error instanceof JobFailedError) {
    return { success: false, error: 'FAILED', message: error.originalError }
  } else {
    return { success: false, error: 'UNKNOWN', message: error.message }
  }
}

Cached Results

If a job with the same ID was already completed and its result is still cached, enqueueAndWait() returns immediately:
// First request - job runs
const result1 = await queue.enqueueAndWait('calc-123', { x: 5, y: 10 }, {
  timeout: 30000,
  resultTTL: 3600000  // Cache for 1 hour
})
// Takes 2 seconds to compute

// Second request - cached result returned immediately
const result2 = await queue.enqueueAndWait('calc-123', { x: 5, y: 10 }, {
  timeout: 30000
})
// Returns instantly with cached result
See Deduplication for more details on result caching.

Use Cases

API Gateway Pattern

Use the queue to offload heavy processing from API handlers:
import Fastify from 'fastify'
import { Queue, RedisStorage } from '@platformatic/job-queue'

const app = Fastify()
const queue = new Queue({
  storage: new RedisStorage({ url: process.env.REDIS_URL })
})

app.post('/api/process', async (request, reply) => {
  const { id, data } = request.body
  
  try {
    const result = await queue.enqueueAndWait(id, data, {
      timeout: 30000,
      resultTTL: 300000
    })
    return { success: true, result }
  } catch (error) {
    if (error instanceof TimeoutError) {
      return reply.code(504).send({ error: 'Processing timeout' })
    }
    return reply.code(500).send({ error: 'Processing failed' })
  }
})

Synchronous RPC Over Queue

Implement RPC-style communication between services:
// Service A (caller)
const response = await queue.enqueueAndWait('rpc:getUserProfile', { userId: 123 }, {
  timeout: 5000
})
console.log('User profile:', response)

// Service B (worker)
queue.execute(async (job) => {
  if (job.id.startsWith('rpc:getUserProfile')) {
    const { userId } = job.payload
    return await database.getUser(userId)
  }
})

Background Job with Frontend Waiting

Start a background job from the frontend and poll for results:
// Backend endpoint
app.post('/api/generate-report', async (request, reply) => {
  const reportId = `report-${Date.now()}`
  
  // Start job in background (don't wait)
  queue.enqueue(reportId, request.body, {
    maxAttempts: 3,
    resultTTL: 3600000  // Keep result for 1 hour
  })
  
  return { reportId, status: 'queued' }
})

app.get('/api/generate-report/:id', async (request, reply) => {
  const { id } = request.params
  
  // Wait up to 5 seconds for result
  try {
    const result = await queue.enqueueAndWait(id, {}, { timeout: 5000 })
    return { status: 'completed', result }
  } catch (error) {
    if (error instanceof TimeoutError) {
      return { status: 'pending' }
    }
    return reply.code(500).send({ status: 'failed', error: error.message })
  }
})

Fire-and-Forget vs Request/Response

Choose between enqueue() and enqueueAndWait() based on your needs:

Use enqueue() when:

  • ✅ You don’t need the result immediately
  • ✅ The operation is time-consuming (> 30 seconds)
  • ✅ You want to return quickly to the caller
  • ✅ Failures can be handled asynchronously (via events or retries)
  • ✅ You’re building background processing systems
// Fire-and-forget: return immediately
app.post('/api/send-email', async (request, reply) => {
  await queue.enqueue(`email-${Date.now()}`, request.body)
  return { status: 'queued' }
})

Use enqueueAndWait() when:

  • ✅ You need the result to return to the user
  • ✅ The operation is reasonably fast (< 30 seconds)
  • ✅ You want to reuse existing job deduplication
  • ✅ You’re implementing RPC-style communication
  • ✅ Failures should be reported immediately
// Request/response: wait for result
app.post('/api/calculate', async (request, reply) => {
  const result = await queue.enqueueAndWait(`calc-${Date.now()}`, request.body, {
    timeout: 10000
  })
  return { result }
})

Performance Considerations

Latency Overhead

The request/response pattern adds latency compared to direct execution:
Direct execution:           ~2ms
MemoryStorage:             ~5ms  (+3ms overhead)
RedisStorage (localhost):  ~8ms  (+6ms overhead)
RedisStorage (network):    ~15ms (+13ms overhead)
The overhead comes from:
  • Pub/sub subscription setup
  • Message serialization/deserialization
  • Network round trips (for Redis)

Connection Pooling

For high-throughput scenarios, reuse Queue instances:
// ❌ Creates new connections on every request
app.post('/api/process', async (request, reply) => {
  const queue = new Queue({ storage: new RedisStorage() })
  await queue.start()
  const result = await queue.enqueueAndWait(id, data)
  await queue.stop()
  return result
})

// ✅ Reuses a single Queue instance
const queue = new Queue({ storage: new RedisStorage() })
await queue.start()

app.post('/api/process', async (request, reply) => {
  const result = await queue.enqueueAndWait(id, data)
  return result
})

Timeout Tuning

Set timeouts based on your SLA and worker capacity:
// Fast operations (API calls, cache lookups)
await queue.enqueueAndWait(id, data, { timeout: 5000 })

// Medium operations (database queries, file processing)
await queue.enqueueAndWait(id, data, { timeout: 30000 })

// Slow operations (report generation, video encoding)
await queue.enqueueAndWait(id, data, { timeout: 300000 })  // 5 minutes
If operations regularly exceed the timeout, consider switching to fire-and-forget with polling or webhooks.

Duplicate Handling

When using enqueueAndWait() with duplicate job IDs:
// First call - job runs
const promise1 = queue.enqueueAndWait('job-123', payload, { timeout: 10000 })

// Second call (before first completes) - both wait for same job
const promise2 = queue.enqueueAndWait('job-123', payload, { timeout: 10000 })

// Both resolve with the same result when the job completes
const [result1, result2] = await Promise.all([promise1, promise2])
// result1 === result2
If the job already completed and the result is cached:
// Job completed 5 minutes ago, result still cached
const result = await queue.enqueueAndWait('job-123', payload, { timeout: 10000 })
// Returns immediately with cached result (no subscription needed)
See Deduplication for more details.

Best Practices

Set reasonable timeouts: Don’t make users wait forever. Use 5-30 second timeouts for user-facing requests.
Use content-addressed IDs: For idempotent operations, hash the input to get consistent job IDs:
const jobId = createHash('sha256').update(JSON.stringify(payload)).digest('hex')
await queue.enqueueAndWait(jobId, payload)
Cache aggressively: Set long resultTTL for deterministic operations to reduce load:
await queue.enqueueAndWait(jobId, payload, {
  resultTTL: 24 * 60 * 60 * 1000  // 24 hours for pure functions
})
Don’t use for long-running jobs: If a job takes > 30 seconds, use fire-and-forget with polling or webhooks instead.
Handle timeout errors gracefully: Remember that a timeout doesn’t cancel the job - it continues running in the background.