By default, Platformatic Job Queue uses JSON for serializing job payloads and results. For performance-critical applications or when working with binary data, you can implement custom serialization using the Serde interface.
MessagePack is a compact binary serialization format, often faster than JSON. From the README.md:
import { Serde } from '@platformatic/job-queue'import * as msgpack from 'msgpackr'class MsgPackSerde<T> implements Serde<T> { serialize(value: T): Buffer { return msgpack.pack(value) } deserialize(buffer: Buffer): T { return msgpack.unpack(buffer) as T }}const queue = new Queue({ storage, payloadSerde: new MsgPackSerde(), resultSerde: new MsgPackSerde()})
Binary formats like MessagePack or Protocol Buffers are faster to serialize/deserialize than JSON:
import { Serde } from '@platformatic/job-queue'import * as msgpack from 'msgpackr'class MsgPackSerde<T> implements Serde<T> { serialize(value: T): Buffer { return msgpack.pack(value) } deserialize(buffer: Buffer): T { return msgpack.unpack(buffer) as T }}// Typical performance improvement: 2-5x faster than JSONconst queue = new Queue({ storage, payloadSerde: new MsgPackSerde(), resultSerde: new MsgPackSerde()})
import { Serde } from '@platformatic/job-queue'import cbor from 'cbor'export class CborSerde<T> implements Serde<T> { serialize(value: T): Buffer { return cbor.encode(value) } deserialize(buffer: Buffer): T { return cbor.decode(buffer) as T }}const queue = new Queue({ storage, payloadSerde: new CborSerde(), resultSerde: new CborSerde()})
import { Serde } from '@platformatic/job-queue'import { gzipSync, gunzipSync } from 'node:zlib'export class CompressedJsonSerde<T> implements Serde<T> { serialize(value: T): Buffer { const json = Buffer.from(JSON.stringify(value)) return gzipSync(json) } deserialize(buffer: Buffer): T { const json = gunzipSync(buffer) return JSON.parse(json.toString()) as T }}// Good for large payloadsconst queue = new Queue({ storage, payloadSerde: new CompressedJsonSerde(), concurrency: 5})
Use different serializers for payloads and results:
import { Queue } from '@platformatic/job-queue'import { MsgPackSerde } from './serializers/msgpack'import { JsonSerde } from '@platformatic/job-queue'interface ComplexPayload { binary: Buffer metadata: Record<string, any>}interface SimpleResult { status: string message: string}const queue = new Queue<ComplexPayload, SimpleResult>({ storage, // Use MessagePack for large binary payloads payloadSerde: new MsgPackSerde<ComplexPayload>(), // Use JSON for simple string results resultSerde: new JsonSerde<SimpleResult>()})
In producer/consumer setups, both sides must use the same serializers:
import { Queue, RedisStorage } from '@platformatic/job-queue'import { MsgPackSerde } from './serializers/msgpack'const storage = new RedisStorage({ url: process.env.REDIS_URL })const producer = new Queue({ storage, payloadSerde: new MsgPackSerde(), // Must match worker resultSerde: new MsgPackSerde() // Must match worker})await producer.start()await producer.enqueue('job-1', { data: 'value' })
If producer and consumer use different serializers, deserialization will fail with cryptic errors. Ensure consistency across all queue instances.
Start with JSON, optimize later. The default JSON serializer works well for most use cases. Only implement custom serialization when you have measurable performance issues.
Use the same serializer for producer and consumer
Test serialization round-trips thoroughly
Benchmark performance gains before switching
Document which serializer is used in your deployment
Problem: Getting errors like “Unexpected token” or “Invalid MessagePack data”.Solution: Ensure producer and consumer use the same serializer:
// Create a shared configurationexport const createQueue = (storage: Storage) => { return new Queue({ storage, payloadSerde: new MsgPackSerde(), resultSerde: new MsgPackSerde() })}// Use in both producer and workerimport { createQueue } from './queue-config'const queue = createQueue(storage)