Skip to content

@glidemq/fastify

REST API and real-time SSE for glide-mq job queues, as a Fastify v5 plugin. Two registrations give you the full queue API plus Fastify-native lifecycle management.

Why @glidemq/fastify

  • Use this when you need a queue management API on top of Fastify without writing route handlers yourself.
  • Use this when you want real-time SSE events for job completion, failure, progress, and stalls.
  • Use this when you need lightweight Producer endpoints for serverless or edge functions that only enqueue jobs.
  • Use this when you want to test queue logic with app.inject() and no running Valkey instance.
  • Use this when you already have a Fastify app and want to add queue operations behind a route prefix with access control.

Install

bash
npm install @glidemq/fastify glide-mq fastify

Optional -- install zod for request validation (falls back to manual checks otherwise):

bash
npm install zod

Requires glide-mq >= 0.15.2.

Quick Start

ts
import Fastify from 'fastify';
import { glideMQPlugin, glideMQRoutes } from '@glidemq/fastify';

const app = Fastify();

await app.register(glideMQPlugin, {
  connection: { addresses: [{ host: 'localhost', port: 6379 }] },
  queues: {
    emails: {
      processor: async (job) => {
        await sendEmail(job.data.to, job.data.subject);
        return { sent: true };
      },
      concurrency: 5,
    },
    reports: {},
  },
});

await app.register(glideMQRoutes, { prefix: '/api/queues' });
await app.listen({ port: 3000 });

What happened: glideMQPlugin created a QueueRegistry, started a worker for emails, and decorated the Fastify instance with app.glidemq. glideMQRoutes mounted the queue HTTP API and SSE routes under /api/queues. The onClose hook will shut down all queues and workers when the process exits.

How It Works

The package exposes two Fastify plugins that you register separately:

glideMQPlugin -- Core plugin, wrapped with fastify-plugin so it shares the encapsulation context. It creates a QueueRegistry that lazily initializes queues and workers on first access, eagerly initializes producers so connection errors surface at startup, and decorates the Fastify instance with app.glidemq. An onClose hook calls registry.closeAll() for graceful shutdown.

glideMQRoutes -- REST API plugin. Not wrapped with fastify-plugin, so it respects Fastify's encapsulation and supports prefix. It reads app.glidemq from the parent context and mounts the full queue HTTP surface. You can register it multiple times with different prefixes.

Endpoints

All queue routes use :name as the queue identifier.

Jobs

MethodPathDescription
POST/:name/jobsAdd a job
POST/:name/jobs/waitAdd a job and wait for its result
GET/:name/jobsList jobs (?type=waiting&start=0&end=-1&excludeData=false)
GET/:name/jobs/:idGet a single job by ID
POST/:name/jobs/:id/priorityChange job priority
POST/:name/jobs/:id/delayChange job delay
POST/:name/jobs/:id/promotePromote a delayed job to waiting

Queue Operations

MethodPathDescription
GET/:name/countsJob counts by state
GET/:name/metricsQueue metrics (?type=completed&start=0&end=-1)
POST/:name/pausePause the queue
POST/:name/resumeResume the queue
POST/:name/drainDrain all waiting jobs
POST/:name/retryRetry failed jobs ({"count": 50} or omit for all)
DELETE/:name/cleanClean old jobs (?grace=0&limit=100&type=completed)
GET/:name/workersList active workers
GET/:name/eventsSSE event stream
POST/:name/produceAdd a job via Producer (serverless)

Schedulers

MethodPathDescription
GET/:name/schedulersList all schedulers
GET/:name/schedulers/:schedulerNameGet one scheduler
PUT/:name/schedulers/:schedulerNameUpsert a scheduler
DELETE/:name/schedulers/:schedulerNameRemove a scheduler

AI-Native

MethodPathDescription
POST/flowsCreate a tree flow or DAG over HTTP with { flow, budget? } or { dag }
GET/flows/:idInspect a flow snapshot with nodes, roots, counts, usage, and budget
GET/flows/:id/treeInspect the nested tree view for a submitted tree flow or DAG
DELETE/flows/:idRevoke or flag remaining jobs in a flow and delete the HTTP flow record
GET/:name/flows/:id/usageAggregated token/cost usage across a flow
GET/:name/flows/:id/budgetBudget state (limits, spent, exceeded) for a flow
GET/:name/jobs/:id/streamSSE stream of real-time chunks from a streaming job

Global and Broadcast

MethodPathDescription
GET/usage/summaryRolling token/cost summary across one or more queues
POST/broadcast/:namePublish a broadcast message with a subject and payload
GET/broadcast/:name/eventsSSE stream for a durable broadcast subscription (?subscription= required)

Features

  • Two-step registration -- glideMQPlugin for state, glideMQRoutes for HTTP. Register routes multiple times under different prefixes.
  • Lazy queue/worker init, eager producer init -- queues and workers start on first request; producers connect at registration so errors fail fast.
  • Optional Zod validation -- install zod and all request bodies and query strings are validated with structured error responses. Without Zod, manual validation still rejects bad input.
  • Real-time SSE -- /:name/events uses reply.hijack() and streams completed, failed, progress, active, waiting, stalled, usage, suspended, budget-exceeded, and heartbeat events directly on reply.raw.
  • Queue access control -- pass queues and producers arrays to glideMQRoutes to restrict which names are accessible through the API. The queue allowlist also governs broadcast names and /usage/summary?queues=....
  • Testing mode -- createTestApp builds a Fastify instance with in-memory queues, no Valkey required. Test with app.inject().
  • Graceful shutdown -- onClose hook calls registry.closeAll() using Promise.allSettled so one failing close does not block the rest.
  • Serverless producers -- lightweight Producer endpoints that return only { id }, suitable for Lambda/edge functions that only enqueue work.
  • Broadcast over HTTP -- publish messages and stream them via SSE with durable subscriptions and optional subject filters.
  • Flow orchestration over HTTP -- create tree flows or DAGs from any HTTP client, then inspect them as flat snapshots or nested trees.

Configuration

GlideMQPluginOptions

ts
interface GlideMQPluginOptions {
  connection?: ConnectionOptions; // Required unless testing: true
  queues: Record<string, QueueConfig>;
  producers?: Record<string, ProducerConfig>;
  prefix?: string;    // Valkey key prefix (default: 'glide')
  testing?: boolean;  // Use TestQueue/TestWorker, no Valkey needed
}

interface QueueConfig {
  processor?: (job: Job) => Promise<any>; // Omit for producer-only queues
  concurrency?: number;                   // Default: 1
  workerOpts?: Record<string, unknown>;
  serializer?: (job: Job) => Record<string, unknown>;
}

interface ProducerConfig {
  compression?: 'none' | 'gzip';
  serializer?: Serializer;
}

GlideMQRoutesOptions

ts
interface GlideMQRoutesOptions {
  queues?: string[];    // Allowlist of queue names (omit to allow all)
  producers?: string[]; // Allowlist of producer names (omit to allow all)
}

Route prefix is set via Fastify's standard prefix option in app.register().

Testing

createTestApp builds a ready-to-use Fastify instance with in-memory queues. No Valkey, no network.

ts
import { createTestApp } from '@glidemq/fastify/testing';

const { app, registry } = await createTestApp({
  emails: {
    processor: async (job) => ({ sent: true }),
  },
});

const res = await app.inject({
  method: 'POST',
  url: '/emails/jobs',
  payload: { name: 'welcome', data: { to: 'user@test.com' } },
});
console.log(res.statusCode); // 201
console.log(res.json().id);  // job ID

await app.close();

Pass { prefix: '/api' } as the second argument to createTestApp to test prefixed routes.

SSE in testing mode emits counts events (polling-based state diffs) rather than job lifecycle events.

Direct Registry Access

Use app.glidemq in your own routes to work with queues directly:

ts
app.post('/send-email', async (request, reply) => {
  const { queue } = app.glidemq.get('emails');
  const job = await queue.add('send', {
    to: (request.body as any).to,
    subject: (request.body as any).subject,
  });
  return reply.send({ jobId: job?.id });
});

The registry exposes get(name), getProducer(name), has(name), names(), closeAll(), and more. See the QueueRegistry interface in src/types.ts.

Limitations

  • SSE uses reply.hijack() to bypass Fastify's response pipeline. This means Fastify hooks like onSend do not run for SSE connections.
  • No built-in authentication or rate limiting. Use Fastify hooks or plugins (@fastify/auth, @fastify/rate-limit) in front of glideMQRoutes.
  • POST /flows accepts tree flow payloads with optional budgets and DAG payloads without budgets. HTTP-submitted budgets are currently supported for tree flows only.
  • /flows*, /usage/summary, and /broadcast/* require a live connection and are unavailable in testing mode.
  • Queue names are validated against /^[a-zA-Z0-9_-]{1,128}$/. Names outside this pattern are rejected with 400.
  • Scheduler names allow a wider character set (/^[a-zA-Z0-9_:.-]{1,256}$/) but are still length-limited.

Ecosystem

PackageDescription
glide-mqCore queue library -- producers, workers, schedulers, workflows
@glidemq/fastifyFastify REST API + SSE plugin (this package)
@glidemq/honoHono REST API + SSE middleware
@glidemq/hapiHapi REST API + SSE plugin
@glidemq/nestjsNestJS module -- decorators, DI, lifecycle management
@glidemq/dashboardExpress web UI for monitoring and managing queues

License

Apache-2.0

Released under the Apache-2.0 License.