Observability
Table of Contents
Job Logs
Append log lines from inside a processor using job.log(), then fetch them from any Queue instance.
import { Queue, Worker } from 'glide-mq';
// Inside the processor
const worker = new Worker(
'tasks',
async (job) => {
await job.log('Starting step 1');
await doStep1();
await job.log('Step 1 done, starting step 2');
await doStep2();
return { done: true };
},
{ connection },
);
// Fetching logs externally
const queue = new Queue('tasks', { connection });
const { logs, count } = await queue.getJobLogs(jobId);
// logs: ['Starting step 1', 'Step 1 done, starting step 2']
// count: 2
// Paginate logs for long-running jobs
const { logs: page1 } = await queue.getJobLogs(jobId, 0, 49); // first 50
const { logs: page2 } = await queue.getJobLogs(jobId, 50, 99); // next 50Job Counts and Metrics
getJobCounts()
Returns counts for every job state:
const counts = await queue.getJobCounts();
// {
// waiting: 12,
// active: 3,
// delayed: 5,
// completed: 842,
// failed: 7,
// }getMetrics(type, opts?)
Returns aggregate count plus per-minute time-series data for completed or failed jobs:
const metrics = await queue.getMetrics('completed');
// {
// count: 15234,
// data: [
// { timestamp: 1709654400000, count: 142, avgDuration: 234 },
// { timestamp: 1709654460000, count: 156, avgDuration: 218 },
// ],
// meta: { resolution: 'minute' }
// }
// Slice data points (e.g. last 10 data points):
const recent = await queue.getMetrics('completed', { start: -10 });Data points are recorded server-side with zero extra RTTs. Minute-resolution buckets are retained for 24 hours and trimmed automatically.
Disabling server-side telemetry
For maximum throughput, disable both events and metrics:
const worker = new Worker('tasks', handler, {
connection,
events: false, // skip XADD event emission per job
metrics: false, // skip HINCRBY metrics recording per job
});
const queue = new Queue('tasks', { connection, events: false });TS-side EventEmitter and OpenTelemetry spans are unaffected by these options.
count()
Returns the number of waiting jobs (stream length):
const waitingCount = await queue.count();AI Usage Telemetry
glide-mq tracks AI-specific usage metadata per job - model, tokens, cost, and latency. This data powers cost dashboards, budget enforcement, and rate limiting.
Per-job usage
Every AI job can report usage via job.reportUsage(). The data is stored in the job hash with usage:* prefixed fields and emitted as a usage event on the events stream.
const worker = new Worker('inference', async (job) => {
const result = await callLLM(job.data.prompt);
await job.reportUsage({
model: 'gpt-5.4',
provider: 'openai',
tokens: {
input: result.usage.prompt_tokens,
output: result.usage.completion_tokens,
},
costs: { total: 0.003 },
latencyMs: 1100,
cached: false,
});
return result;
}, { connection });Reading usage from a completed job
const job = await queue.getJob('42');
if (job?.usage) {
console.log(`Model: ${job.usage.model}`);
console.log(`Input: ${job.usage.tokens?.input}, Output: ${job.usage.tokens?.output}`);
console.log(`Cost: $${job.usage.totalCost}, Latency: ${job.usage.latencyMs}ms`);
}Flow-level aggregation
Aggregate token counts and cost across all jobs in a parent-child flow:
const usage = await queue.getFlowUsage(parentJobId);
console.log(`Total tokens: ${usage.totalTokens}`);
console.log(`Total cost: $${usage.totalCost}`);
console.log(`Jobs: ${usage.jobCount}`);
console.log(`Models used: ${JSON.stringify(usage.models)}`);
// models: { 'gpt-5.4': 3, 'gpt-5.4-nano': 1 }Listening for usage events
const events = new QueueEvents('inference', { connection });
events.on('usage', ({ jobId, data }) => {
const usage = JSON.parse(data);
metrics.recordTokens(usage.model, usage.totalTokens);
metrics.recordCost(usage.totalCost);
});TPM tracking
For token-per-minute rate limiting, report tokens separately via job.reportTokens():
await job.reportTokens(result.totalTokens);This value is read by the worker after job completion to increment the TPM counter when a tokenLimiter is configured.
OpenTelemetry Integration
glide-mq emits OpenTelemetry spans automatically when @opentelemetry/api is installed. No code changes are required.
Setup
npm install @opentelemetry/apiInitialise your tracer provider before creating any Queue or Worker instances (standard OTel setup), then use glide-mq normally:
import { Queue } from 'glide-mq';
const queue = new Queue('tasks', { connection });
await queue.add('my-job', data);
// → creates span: glide-mq.queue.add
// attributes: glide-mq.queue, glide-mq.job.name, glide-mq.job.id,
// glide-mq.job.delay, glide-mq.job.priorityCustom tracer
If you need to use a specific tracer instance instead of the global one:
import { setTracer, isTracingEnabled } from 'glide-mq';
import { trace } from '@opentelemetry/api';
const myTracer = trace.getTracer('my-service', '1.0.0');
setTracer(myTracer);
console.log('Tracing active:', isTracingEnabled());Instrumented operations
| Operation | Span name | Key attributes |
|---|---|---|
queue.add() | glide-mq.queue.add | glide-mq.queue, glide-mq.job.name, glide-mq.job.id, glide-mq.job.delay, glide-mq.job.priority |
flowProducer.add() | glide-mq.flow.add | glide-mq.queue, glide-mq.flow.name, glide-mq.flow.childCount |
@glidemq/dashboard
The @glidemq/dashboard package provides a web UI for inspecting queues in real time.
Quick start - built-in demo server
cd demo
npm install
npm run dashboard # http://localhost:3000REST API
| Method | Path | Description |
|---|---|---|
GET | /api/queues | List all queues with counts and metrics |
GET | /api/queues/:name | Queue details + recent jobs |
GET | /api/queues/:name/jobs/:id | Single job details, state, and logs |
POST | /api/queues/:name/jobs | Add a new job |
POST | /api/queues/:name/pause | Pause a queue |
POST | /api/queues/:name/resume | Resume a queue |
POST | /api/queues/:name/jobs/:id/retry | Retry a failed job |
DELETE | /api/queues/:name/jobs/:id | Remove a job |
POST | /api/queues/:name/drain | Drain all waiting jobs |
POST | /api/queues/:name/obliterate | Obliterate queue and all data |
GET | /api/events | SSE stream for real-time job events |
Real-time events via SSE
const es = new EventSource('http://localhost:3000/api/events');
es.onmessage = ({ data }) => {
const { queue, event, jobId } = JSON.parse(data);
// event: 'added' | 'completed' | 'failed' | 'progress' | 'stalled' | 'heartbeat'
console.log(`[${queue}] ${event} — job ${jobId}`);
};Embedding in an existing Express app
import express from 'express';
import { Queue } from 'glide-mq';
const app = express();
app.use(express.json());
const queues: Record<string, Queue> = {
orders: new Queue('orders', { connection }),
payments: new Queue('payments', { connection }),
};
// Expose queue counts to the dashboard
app.get('/api/queues', async (_req, res) => {
const data = await Promise.all(
Object.entries(queues).map(async ([name, q]) => ({
name,
counts: await q.getJobCounts(),
isPaused: await q.isPaused(),
})),
);
res.json(data);
});