Skip to main content

Overview

By default, Platformatic Job Queue uses JSON for serializing job payloads and results. For performance-critical applications or when working with binary data, you can implement custom serialization using the Serde interface.

Serde Interface

The serialization interface is simple (from src/serde/index.ts:1-7):
export interface Serde<T> {
  serialize (value: T): Buffer
  deserialize (buffer: Buffer): T
}
Any class implementing these two methods can be used as a custom serializer.

Default JSON Serializer

The built-in JsonSerde implementation (from src/serde/index.ts:9-20):
export class JsonSerde<T> implements Serde<T> {
  serialize (value: T): Buffer {
    return Buffer.from(JSON.stringify(value))
  }

  deserialize (buffer: Buffer): T {
    return JSON.parse(buffer.toString()) as T
  }
}
This is the default for both payloadSerde and resultSerde when none is specified.

MessagePack Example

MessagePack is a compact binary serialization format, often faster than JSON. From the README.md:
import { Serde } from '@platformatic/job-queue'
import * as msgpack from 'msgpackr'

class MsgPackSerde<T> implements Serde<T> {
  serialize(value: T): Buffer {
    return msgpack.pack(value)
  }

  deserialize(buffer: Buffer): T {
    return msgpack.unpack(buffer) as T
  }
}

const queue = new Queue({
  storage,
  payloadSerde: new MsgPackSerde(),
  resultSerde: new MsgPackSerde()
})
First, install msgpackr:
npm install msgpackr

Using Custom Serializers

You can specify different serializers for payloads and results:
import { Queue, RedisStorage } from '@platformatic/job-queue'
import { MsgPackSerde } from './serializers/msgpack'
import { ProtobufSerde } from './serializers/protobuf'

interface JobPayload {
  userId: number
  action: string
  data: Record<string, any>
}

interface JobResult {
  status: 'success' | 'failure'
  output: string
}

const queue = new Queue<JobPayload, JobResult>({
  storage: new RedisStorage({ url: process.env.REDIS_URL }),
  payloadSerde: new MsgPackSerde<JobPayload>(),
  resultSerde: new MsgPackSerde<JobResult>()
})
Both producer and consumer must use the same serializer configuration, otherwise deserialization will fail.

When to Use Custom Serialization

Consider custom serialization when:

1. Performance is Critical

Binary formats like MessagePack or Protocol Buffers are faster to serialize/deserialize than JSON:
import { Serde } from '@platformatic/job-queue'
import * as msgpack from 'msgpackr'

class MsgPackSerde<T> implements Serde<T> {
  serialize(value: T): Buffer {
    return msgpack.pack(value)
  }

  deserialize(buffer: Buffer): T {
    return msgpack.unpack(buffer) as T
  }
}

// Typical performance improvement: 2-5x faster than JSON
const queue = new Queue({
  storage,
  payloadSerde: new MsgPackSerde(),
  resultSerde: new MsgPackSerde()
})

2. Payload Size Matters

Binary formats are more compact than JSON, reducing storage and network overhead:
const jsonSize = Buffer.from(JSON.stringify(largeObject)).length
const msgpackSize = msgpack.pack(largeObject).length

console.log(`JSON: ${jsonSize} bytes`)
console.log(`MessagePack: ${msgpackSize} bytes`)
// MessagePack typically 20-50% smaller

3. Working with Binary Data

If your payloads contain binary data (images, files), custom serialization avoids base64 encoding:
import { Serde } from '@platformatic/job-queue'

interface ImageJob {
  imageBuffer: Buffer
  operations: string[]
}

class ImageSerde implements Serde<ImageJob> {
  serialize(value: ImageJob): Buffer {
    // Custom binary format
    const operationsJson = JSON.stringify(value.operations)
    const operationsBuffer = Buffer.from(operationsJson)
    const operationsLength = Buffer.allocUnsafe(4)
    operationsLength.writeUInt32BE(operationsBuffer.length)
    
    return Buffer.concat([
      operationsLength,
      operationsBuffer,
      value.imageBuffer
    ])
  }

  deserialize(buffer: Buffer): ImageJob {
    const operationsLength = buffer.readUInt32BE(0)
    const operationsBuffer = buffer.subarray(4, 4 + operationsLength)
    const operations = JSON.parse(operationsBuffer.toString())
    const imageBuffer = buffer.subarray(4 + operationsLength)
    
    return { imageBuffer, operations }
  }
}

const queue = new Queue<ImageJob, Buffer>({
  storage,
  payloadSerde: new ImageSerde(),
  resultSerde: new BufferSerde() // Custom Buffer serializer
})

4. Schema Validation

Use Protocol Buffers or Avro for schema enforcement:
import { Serde } from '@platformatic/job-queue'
import protobuf from 'protobufjs'

// Load your .proto schema
const root = await protobuf.load('job.proto')
const JobPayload = root.lookupType('myapp.JobPayload')

class ProtobufSerde<T> implements Serde<T> {
  constructor(private type: protobuf.Type) {}
  
  serialize(value: T): Buffer {
    const message = this.type.create(value)
    return Buffer.from(this.type.encode(message).finish())
  }

  deserialize(buffer: Buffer): T {
    const message = this.type.decode(buffer)
    return this.type.toObject(message) as T
  }
}

const queue = new Queue({
  storage,
  payloadSerde: new ProtobufSerde(JobPayload)
})

Complete Examples

MessagePack Implementation

import { Serde } from '@platformatic/job-queue'
import { pack, unpack } from 'msgpackr'

export class MsgPackSerde<T> implements Serde<T> {
  serialize(value: T): Buffer {
    return pack(value)
  }

  deserialize(buffer: Buffer): T {
    return unpack(buffer) as T
  }
}

CBOR Implementation

import { Serde } from '@platformatic/job-queue'
import cbor from 'cbor'

export class CborSerde<T> implements Serde<T> {
  serialize(value: T): Buffer {
    return cbor.encode(value)
  }

  deserialize(buffer: Buffer): T {
    return cbor.decode(buffer) as T
  }
}

const queue = new Queue({
  storage,
  payloadSerde: new CborSerde(),
  resultSerde: new CborSerde()
})

Compression Example

Combine serialization with compression:
import { Serde } from '@platformatic/job-queue'
import { gzipSync, gunzipSync } from 'node:zlib'

export class CompressedJsonSerde<T> implements Serde<T> {
  serialize(value: T): Buffer {
    const json = Buffer.from(JSON.stringify(value))
    return gzipSync(json)
  }

  deserialize(buffer: Buffer): T {
    const json = gunzipSync(buffer)
    return JSON.parse(json.toString()) as T
  }
}

// Good for large payloads
const queue = new Queue({
  storage,
  payloadSerde: new CompressedJsonSerde(),
  concurrency: 5
})

Separate Payload and Result Serializers

Use different serializers for payloads and results:
import { Queue } from '@platformatic/job-queue'
import { MsgPackSerde } from './serializers/msgpack'
import { JsonSerde } from '@platformatic/job-queue'

interface ComplexPayload {
  binary: Buffer
  metadata: Record<string, any>
}

interface SimpleResult {
  status: string
  message: string
}

const queue = new Queue<ComplexPayload, SimpleResult>({
  storage,
  // Use MessagePack for large binary payloads
  payloadSerde: new MsgPackSerde<ComplexPayload>(),
  // Use JSON for simple string results
  resultSerde: new JsonSerde<SimpleResult>()
})

Configuration Consistency

In producer/consumer setups, both sides must use the same serializers:
import { Queue, RedisStorage } from '@platformatic/job-queue'
import { MsgPackSerde } from './serializers/msgpack'

const storage = new RedisStorage({ url: process.env.REDIS_URL })

const producer = new Queue({
  storage,
  payloadSerde: new MsgPackSerde(), // Must match worker
  resultSerde: new MsgPackSerde()   // Must match worker
})

await producer.start()
await producer.enqueue('job-1', { data: 'value' })
If producer and consumer use different serializers, deserialization will fail with cryptic errors. Ensure consistency across all queue instances.

Testing Custom Serializers

Test your serializer implementation:
import { describe, it } from 'node:test'
import assert from 'node:assert'
import { MsgPackSerde } from './serializers/msgpack'

describe('MsgPackSerde', () => {
  it('should serialize and deserialize objects', () => {
    const serde = new MsgPackSerde<{ foo: string; bar: number }>()
    const original = { foo: 'test', bar: 123 }
    
    const buffer = serde.serialize(original)
    assert.ok(Buffer.isBuffer(buffer))
    
    const deserialized = serde.deserialize(buffer)
    assert.deepStrictEqual(deserialized, original)
  })
  
  it('should handle complex nested objects', () => {
    const serde = new MsgPackSerde<any>()
    const original = {
      nested: { deeply: { value: 42 } },
      array: [1, 2, 3],
      date: new Date().toISOString()
    }
    
    const buffer = serde.serialize(original)
    const deserialized = serde.deserialize(buffer)
    assert.deepStrictEqual(deserialized, original)
  })
  
  it('should be smaller than JSON', () => {
    const serde = new MsgPackSerde<any>()
    const data = {
      userId: 12345,
      action: 'process',
      metadata: { timestamp: Date.now(), region: 'us-east-1' }
    }
    
    const msgpackSize = serde.serialize(data).length
    const jsonSize = Buffer.from(JSON.stringify(data)).length
    
    assert.ok(msgpackSize < jsonSize)
    console.log(`MessagePack: ${msgpackSize} bytes, JSON: ${jsonSize} bytes`)
  })
})

Performance Considerations

Benchmarking Different Formats

import { Queue, MemoryStorage } from '@platformatic/job-queue'
import { MsgPackSerde } from './serializers/msgpack'
import { JsonSerde } from '@platformatic/job-queue'

const testPayload = {
  userId: 12345,
  timestamp: Date.now(),
  data: { /* large object */ }
}

const iterations = 10000

// Test JSON
const jsonSerde = new JsonSerde()
const jsonStart = Date.now()
for (let i = 0; i < iterations; i++) {
  const buffer = jsonSerde.serialize(testPayload)
  jsonSerde.deserialize(buffer)
}
const jsonTime = Date.now() - jsonStart

// Test MessagePack
const msgpackSerde = new MsgPackSerde()
const msgpackStart = Date.now()
for (let i = 0; i < iterations; i++) {
  const buffer = msgpackSerde.serialize(testPayload)
  msgpackSerde.deserialize(buffer)
}
const msgpackTime = Date.now() - msgpackStart

console.log(`JSON: ${jsonTime}ms`)
console.log(`MessagePack: ${msgpackTime}ms`)
console.log(`Speedup: ${(jsonTime / msgpackTime).toFixed(2)}x`)

Best Practices

Start with JSON, optimize later. The default JSON serializer works well for most use cases. Only implement custom serialization when you have measurable performance issues.
  • Use the same serializer for producer and consumer
  • Test serialization round-trips thoroughly
  • Benchmark performance gains before switching
  • Document which serializer is used in your deployment
  • Consider schema versioning for production systems
  • Handle serialization errors gracefully
  • Use binary formats for large payloads (> 10KB)
  • Profile memory usage with different formats

Common Issues

Deserialization Errors

Problem: Getting errors like “Unexpected token” or “Invalid MessagePack data”. Solution: Ensure producer and consumer use the same serializer:
// Create a shared configuration
export const createQueue = (storage: Storage) => {
  return new Queue({
    storage,
    payloadSerde: new MsgPackSerde(),
    resultSerde: new MsgPackSerde()
  })
}

// Use in both producer and worker
import { createQueue } from './queue-config'
const queue = createQueue(storage)

Type Safety Issues

Problem: TypeScript doesn’t catch serialization type mismatches. Solution: Use generic types consistently:
import { Serde } from '@platformatic/job-queue'

interface MyPayload {
  userId: number
  action: string
}

interface MyResult {
  status: string
}

// Type-safe serializer
const payloadSerde: Serde<MyPayload> = new MsgPackSerde<MyPayload>()
const resultSerde: Serde<MyResult> = new MsgPackSerde<MyResult>()

const queue = new Queue<MyPayload, MyResult>({
  storage,
  payloadSerde,
  resultSerde
})

See Also