@glidemq/fastify
REST API and real-time SSE for glide-mq job queues, as a Fastify v5 plugin. Two registrations give you the full queue API plus Fastify-native lifecycle management.
Package Links
- npm: @glidemq/fastify
- GitHub: github.com/avifenesh/glidemq-fastify
Why @glidemq/fastify
- Use this when you need a queue management API on top of Fastify without writing route handlers yourself.
- Use this when you want real-time SSE events for job completion, failure, progress, and stalls.
- Use this when you need lightweight
Producerendpoints for serverless or edge functions that only enqueue jobs. - Use this when you want to test queue logic with
app.inject()and no running Valkey instance. - Use this when you already have a Fastify app and want to add queue operations behind a route prefix with access control.
Install
npm install @glidemq/fastify glide-mq fastifyOptional -- install zod for request validation (falls back to manual checks otherwise):
npm install zodRequires glide-mq >= 0.15.2.
Quick Start
import Fastify from 'fastify';
import { glideMQPlugin, glideMQRoutes } from '@glidemq/fastify';
const app = Fastify();
await app.register(glideMQPlugin, {
connection: { addresses: [{ host: 'localhost', port: 6379 }] },
queues: {
emails: {
processor: async (job) => {
await sendEmail(job.data.to, job.data.subject);
return { sent: true };
},
concurrency: 5,
},
reports: {},
},
});
await app.register(glideMQRoutes, { prefix: '/api/queues' });
await app.listen({ port: 3000 });What happened: glideMQPlugin created a QueueRegistry, started a worker for emails, and decorated the Fastify instance with app.glidemq. glideMQRoutes mounted the queue HTTP API and SSE routes under /api/queues. The onClose hook will shut down all queues and workers when the process exits.
How It Works
The package exposes two Fastify plugins that you register separately:
glideMQPlugin -- Core plugin, wrapped with fastify-plugin so it shares the encapsulation context. It creates a QueueRegistry that lazily initializes queues and workers on first access, eagerly initializes producers so connection errors surface at startup, and decorates the Fastify instance with app.glidemq. An onClose hook calls registry.closeAll() for graceful shutdown.
glideMQRoutes -- REST API plugin. Not wrapped with fastify-plugin, so it respects Fastify's encapsulation and supports prefix. It reads app.glidemq from the parent context and mounts the full queue HTTP surface. You can register it multiple times with different prefixes.
Endpoints
All queue routes use :name as the queue identifier.
Jobs
| Method | Path | Description |
|---|---|---|
| POST | /:name/jobs | Add a job |
| POST | /:name/jobs/wait | Add a job and wait for its result |
| GET | /:name/jobs | List jobs (?type=waiting&start=0&end=-1&excludeData=false) |
| GET | /:name/jobs/:id | Get a single job by ID |
| POST | /:name/jobs/:id/priority | Change job priority |
| POST | /:name/jobs/:id/delay | Change job delay |
| POST | /:name/jobs/:id/promote | Promote a delayed job to waiting |
Queue Operations
| Method | Path | Description |
|---|---|---|
| GET | /:name/counts | Job counts by state |
| GET | /:name/metrics | Queue metrics (?type=completed&start=0&end=-1) |
| POST | /:name/pause | Pause the queue |
| POST | /:name/resume | Resume the queue |
| POST | /:name/drain | Drain all waiting jobs |
| POST | /:name/retry | Retry failed jobs ({"count": 50} or omit for all) |
| DELETE | /:name/clean | Clean old jobs (?grace=0&limit=100&type=completed) |
| GET | /:name/workers | List active workers |
| GET | /:name/events | SSE event stream |
| POST | /:name/produce | Add a job via Producer (serverless) |
Schedulers
| Method | Path | Description |
|---|---|---|
| GET | /:name/schedulers | List all schedulers |
| GET | /:name/schedulers/:schedulerName | Get one scheduler |
| PUT | /:name/schedulers/:schedulerName | Upsert a scheduler |
| DELETE | /:name/schedulers/:schedulerName | Remove a scheduler |
AI-Native
| Method | Path | Description |
|---|---|---|
| POST | /flows | Create a tree flow or DAG over HTTP with { flow, budget? } or { dag } |
| GET | /flows/:id | Inspect a flow snapshot with nodes, roots, counts, usage, and budget |
| GET | /flows/:id/tree | Inspect the nested tree view for a submitted tree flow or DAG |
| DELETE | /flows/:id | Revoke or flag remaining jobs in a flow and delete the HTTP flow record |
| GET | /:name/flows/:id/usage | Aggregated token/cost usage across a flow |
| GET | /:name/flows/:id/budget | Budget state (limits, spent, exceeded) for a flow |
| GET | /:name/jobs/:id/stream | SSE stream of real-time chunks from a streaming job |
Global and Broadcast
| Method | Path | Description |
|---|---|---|
| GET | /usage/summary | Rolling token/cost summary across one or more queues |
| POST | /broadcast/:name | Publish a broadcast message with a subject and payload |
| GET | /broadcast/:name/events | SSE stream for a durable broadcast subscription (?subscription= required) |
Features
- Two-step registration --
glideMQPluginfor state,glideMQRoutesfor HTTP. Register routes multiple times under different prefixes. - Lazy queue/worker init, eager producer init -- queues and workers start on first request; producers connect at registration so errors fail fast.
- Optional Zod validation -- install
zodand all request bodies and query strings are validated with structured error responses. Without Zod, manual validation still rejects bad input. - Real-time SSE --
/:name/eventsusesreply.hijack()and streamscompleted,failed,progress,active,waiting,stalled,usage,suspended,budget-exceeded, andheartbeatevents directly onreply.raw. - Queue access control -- pass
queuesandproducersarrays toglideMQRoutesto restrict which names are accessible through the API. The queue allowlist also governs broadcast names and/usage/summary?queues=.... - Testing mode --
createTestAppbuilds a Fastify instance with in-memory queues, no Valkey required. Test withapp.inject(). - Graceful shutdown --
onClosehook callsregistry.closeAll()usingPromise.allSettledso one failing close does not block the rest. - Serverless producers -- lightweight
Producerendpoints that return only{ id }, suitable for Lambda/edge functions that only enqueue work. - Broadcast over HTTP -- publish messages and stream them via SSE with durable subscriptions and optional subject filters.
- Flow orchestration over HTTP -- create tree flows or DAGs from any HTTP client, then inspect them as flat snapshots or nested trees.
Configuration
GlideMQPluginOptions
interface GlideMQPluginOptions {
connection?: ConnectionOptions; // Required unless testing: true
queues: Record<string, QueueConfig>;
producers?: Record<string, ProducerConfig>;
prefix?: string; // Valkey key prefix (default: 'glide')
testing?: boolean; // Use TestQueue/TestWorker, no Valkey needed
}
interface QueueConfig {
processor?: (job: Job) => Promise<any>; // Omit for producer-only queues
concurrency?: number; // Default: 1
workerOpts?: Record<string, unknown>;
serializer?: (job: Job) => Record<string, unknown>;
}
interface ProducerConfig {
compression?: 'none' | 'gzip';
serializer?: Serializer;
}GlideMQRoutesOptions
interface GlideMQRoutesOptions {
queues?: string[]; // Allowlist of queue names (omit to allow all)
producers?: string[]; // Allowlist of producer names (omit to allow all)
}Route prefix is set via Fastify's standard prefix option in app.register().
Testing
createTestApp builds a ready-to-use Fastify instance with in-memory queues. No Valkey, no network.
import { createTestApp } from '@glidemq/fastify/testing';
const { app, registry } = await createTestApp({
emails: {
processor: async (job) => ({ sent: true }),
},
});
const res = await app.inject({
method: 'POST',
url: '/emails/jobs',
payload: { name: 'welcome', data: { to: 'user@test.com' } },
});
console.log(res.statusCode); // 201
console.log(res.json().id); // job ID
await app.close();Pass { prefix: '/api' } as the second argument to createTestApp to test prefixed routes.
SSE in testing mode emits
countsevents (polling-based state diffs) rather than job lifecycle events.
Direct Registry Access
Use app.glidemq in your own routes to work with queues directly:
app.post('/send-email', async (request, reply) => {
const { queue } = app.glidemq.get('emails');
const job = await queue.add('send', {
to: (request.body as any).to,
subject: (request.body as any).subject,
});
return reply.send({ jobId: job?.id });
});The registry exposes get(name), getProducer(name), has(name), names(), closeAll(), and more. See the QueueRegistry interface in src/types.ts.
Limitations
- SSE uses
reply.hijack()to bypass Fastify's response pipeline. This means Fastify hooks likeonSenddo not run for SSE connections. - No built-in authentication or rate limiting. Use Fastify hooks or plugins (
@fastify/auth,@fastify/rate-limit) in front ofglideMQRoutes. POST /flowsaccepts tree flow payloads with optional budgets and DAG payloads without budgets. HTTP-submitted budgets are currently supported for tree flows only./flows*,/usage/summary, and/broadcast/*require a live connection and are unavailable in testing mode.- Queue names are validated against
/^[a-zA-Z0-9_-]{1,128}$/. Names outside this pattern are rejected with 400. - Scheduler names allow a wider character set (
/^[a-zA-Z0-9_:.-]{1,256}$/) but are still length-limited.
Ecosystem
| Package | Description |
|---|---|
| glide-mq | Core queue library -- producers, workers, schedulers, workflows |
| @glidemq/fastify | Fastify REST API + SSE plugin (this package) |
| @glidemq/hono | Hono REST API + SSE middleware |
| @glidemq/hapi | Hapi REST API + SSE plugin |
| @glidemq/nestjs | NestJS module -- decorators, DI, lifecycle management |
| @glidemq/dashboard | Express web UI for monitoring and managing queues |
License
Apache-2.0