AI-Native Features
glide-mq is built for AI workloads. Beyond standard queue operations, it ships 7 primitives purpose-built for LLM pipelines, agent loops, RAG systems, and content moderation flows.
Table of Contents
- Usage Tracking
- Token Streaming
- Suspend / Resume (Human-in-the-Loop)
- Budget Caps
- Fallback Chains
- Dual-Axis Rate Limiting (RPM + TPM)
- Vector Search
- Per-Job Lock Duration
Usage Tracking
Track model, token counts, cost, and latency for every AI job. Data is persisted to the job hash and emitted as a usage event on the events stream.
Reporting usage from a processor
const worker = new Worker('inference', async (job) => {
const result = await openai.chat.completions.create({
model: 'gpt-5.4',
messages: [{ role: 'user', content: job.data.prompt }],
});
await job.reportUsage({
model: 'gpt-5.4',
provider: 'openai',
tokens: { input: result.usage.prompt_tokens, output: result.usage.completion_tokens },
costs: { total: 0.0035 },
costUnit: 'usd',
latencyMs: Date.now() - job.processedOn!,
cached: false,
});
return { content: result.choices[0].message.content };
}, { connection });Reading usage from a completed job
const job = await queue.getJob('42');
if (job?.usage) {
console.log(`Model: ${job.usage.model}`);
console.log(`Tokens: ${job.usage.tokens?.input} in, ${job.usage.tokens?.output} out`);
console.log(`Cost: $${job.usage.totalCost}`);
}Aggregating usage across a flow
const usage = await queue.getFlowUsage(parentJobId);
// {
// tokens: { input: 1250, output: 340 },
// totalTokens: 1590,
// costs: { total: 0.012 },
// totalCost: 0.012,
// costUnit: 'usd',
// jobCount: 4,
// models: { 'gpt-5.4': 3, 'gpt-5.4-nano': 1 }
// }The JobUsage interface
interface JobUsage {
model?: string; // e.g. 'gpt-5.4', 'claude-sonnet-4-20250514'
provider?: string; // e.g. 'openai', 'anthropic'
tokens?: Record<string, number>; // e.g. { input: 100, output: 50, reasoning: 200 }
totalTokens?: number; // auto-computed as sum of all tokens values
costs?: Record<string, number>; // e.g. { total: 0.003 } or { input: 0.001, output: 0.002 }
totalCost?: number; // auto-computed as sum of all costs values
costUnit?: string; // e.g. 'usd', 'credits', 'ils'
latencyMs?: number;
cached?: boolean;
}Well-known token keys: input, output, reasoning, cachedInput, cachedOutput. Any string key is accepted.
totalTokens is automatically computed as the sum of all values in tokens when not explicitly provided. totalCost works the same way for costs.
Token Streaming
Stream LLM output tokens to consumers in real time via per-job Valkey streams. Each job gets its own stream key (glide:{queueName}:jstream:{jobId}).
Producer side (inside the processor)
const worker = new Worker('chat', async (job) => {
const stream = await openai.chat.completions.create({
model: 'gpt-5.4',
messages: [{ role: 'user', content: job.data.prompt }],
stream: true,
});
for await (const chunk of stream) {
const token = chunk.choices[0]?.delta?.content ?? '';
if (token) {
await job.stream({ t: token });
}
}
// Signal end-of-stream
await job.stream({ t: '', done: '1' });
return { completed: true };
}, { connection });Consumer side (reading the stream)
// Non-blocking read (XRANGE)
const entries = await queue.readStream(jobId, { count: 100 });
for (const entry of entries) {
process.stdout.write(entry.fields.t);
}
// Long-polling read (XREAD BLOCK) - for real-time consumers
let lastId: string | undefined;
while (true) {
const entries = await queue.readStream(jobId, {
lastId,
block: 5000, // block up to 5 seconds
count: 50,
});
for (const entry of entries) {
lastId = entry.id;
if (entry.fields.done === '1') return;
process.stdout.write(entry.fields.t);
}
}Each chunk is a flat Record<string, string> appended via XADD. Common patterns:
{ t: 'token text' }- a text token{ t: '', done: '1' }- end-of-stream marker{ type: 'tool_call', name: 'search', args: '{"q":"valkey"}' }- structured events
Convenience: job.streamChunk(type, content?)
For typed LLM chunks (reasoning, content, tool calls), use the shorthand streamChunk method instead of building the record manually:
await job.streamChunk('reasoning', 'Let me think about this...');
await job.streamChunk('content', 'The answer is 42.');
await job.streamChunk('done');This is equivalent to job.stream({ type, content }) but more readable for common streaming patterns.
Suspend / Resume
Pause a job to wait for an external signal - human approval, webhook callback, or async third-party response. The job moves to suspended state until queue.signal() re-activates it.
Suspending from a processor
const worker = new Worker('moderation', async (job) => {
// Check if we were resumed with signals
if (job.signals.length > 0) {
const signal = job.signals[0];
if (signal.data.action === 'approve') {
return { status: 'approved', by: signal.data.reviewer };
}
return { status: 'rejected', reason: signal.data.reason };
}
// First run: generate content and suspend for review
const draft = await generateDraft(job.data);
await job.suspend({
reason: 'awaiting-human-review',
timeout: 3600_000, // auto-fail after 1 hour
});
}, { connection });Sending a signal to resume
// From an API endpoint, webhook handler, or admin panel
const resumed = await queue.signal(jobId, 'review', {
action: 'approve',
reviewer: 'alice@example.com',
});
// resumed: true if the job was in suspended state, false otherwiseChecking suspension status
const info = await queue.getSuspendInfo(jobId);
// {
// reason: 'awaiting-human-review',
// suspendedAt: 1709654400000,
// timeout: 3600000,
// signals: []
// }Signal interface
interface SignalEntry {
name: string; // e.g. 'approve', 'reject'
data: any; // arbitrary payload
receivedAt: number; // epoch ms
}How it works
job.suspend()throwsSuspendError- the worker moves the job tosuspendedstate.- The job hash stores
suspendReason,suspendedAt, andsuspendTimeout. queue.signal()appends a signal entry and re-queues the job to the stream.- On re-entry,
job.signalscontains all signals delivered while suspended. - If
timeoutis set and no signal arrives within the window, the job is auto-failed.
Budget Caps
Cap total token usage or USD cost across all jobs in a flow. When the budget is exceeded, remaining jobs either fail or pause depending on the onExceeded policy.
Setting a budget on a flow
import { FlowProducer } from 'glide-mq';
const flow = new FlowProducer({ connection });
const node = await flow.add(
{
name: 'rag-pipeline',
queueName: 'ai',
data: { query: 'How does glide-mq work?' },
children: [
{ name: 'embed', queueName: 'ai', data: { step: 'embed' } },
{ name: 'search', queueName: 'ai', data: { step: 'search' } },
{ name: 'generate', queueName: 'ai', data: { step: 'generate' } },
],
},
{ budget: { maxTotalTokens: 2000, maxTotalCost: 0.05, costUnit: 'usd', onExceeded: 'fail' } },
);Reading budget state
const budget = await queue.getFlowBudget(parentJobId);
// {
// maxTotalTokens: 2000,
// maxTotalCost: 0.05,
// costUnit: 'usd',
// usedTokens: 1450,
// usedCost: 0.032,
// exceeded: false,
// onExceeded: 'fail'
// }The BudgetOptions interface
interface BudgetOptions {
maxTotalTokens?: number; // hard cap on weighted total tokens
maxTokens?: Record<string, number>; // per-category caps (e.g. { input: 50000, reasoning: 5000 })
tokenWeights?: Record<string, number>;// weight multipliers (e.g. { reasoning: 4, cachedInput: 0.25 })
maxTotalCost?: number; // hard cap on total cost
maxCosts?: Record<string, number>; // per-category cost caps
costUnit?: string; // e.g. 'usd', 'credits', 'ils'
onExceeded?: 'pause' | 'fail'; // default: 'fail'
}Budget enforcement works with job.reportUsage() - each job's reported tokens and cost are accumulated against the flow's budget key.
Fallback Chains
Define an ordered list of model/provider alternatives that are tried automatically on retryable failure. The processor reads job.currentFallback to know which model to use on each retry.
Defining fallbacks on a job
await queue.add('llm-query', {
prompt: 'Explain message queues.',
primaryModel: 'gpt-5.4',
}, {
attempts: 4, // 1 original + 3 fallbacks
backoff: { type: 'fixed', delay: 1000 },
fallbacks: [
{ model: 'gpt-5.4-nano', provider: 'openai' },
{ model: 'claude-sonnet-4-20250514', provider: 'anthropic' },
{ model: 'gemini-2.5-pro', provider: 'google' },
],
});Using fallbacks in the processor
const worker = new Worker('llm-query', async (job) => {
const fallback = job.currentFallback;
// fallbackIndex=0: original attempt (currentFallback is undefined)
// fallbackIndex=1: first retry (currentFallback = fallbacks[0])
// fallbackIndex=2: second retry (currentFallback = fallbacks[1])
const model = fallback ? fallback.model : job.data.primaryModel;
const result = await callLLM(model, job.data.prompt);
await job.reportUsage({
model: result.model,
tokens: { input: result.inputTokens, output: result.outputTokens },
});
return { content: result.text, model };
}, { connection });How it works
fallbackIndexstarts at 0 (original request). On each retry, it increments.job.currentFallbackreturnsfallbacks[fallbackIndex - 1], orundefinedfor the original attempt.- Each fallback entry can carry arbitrary
metadatafor custom routing logic. - Fallback chains compose with standard retry/backoff configuration.
- The chain length determines maximum retries - if
fallbackshas 3 entries, setattempts: 4(1 original + 3 fallbacks).
Dual-Axis Rate Limiting
Enforce both RPM (requests per minute) and TPM (tokens per minute) limits to comply with LLM provider rate limits. Standard limiter controls RPM. The new tokenLimiter controls TPM.
Configuring dual-axis limiting
const worker = new Worker('inference', processor, {
connection,
concurrency: 10,
// RPM: max 60 requests per minute
limiter: { max: 60, duration: 60_000 },
// TPM: max 100,000 tokens per minute
tokenLimiter: {
maxTokens: 100_000,
duration: 60_000,
scope: 'both', // 'queue' | 'worker' | 'both'
},
});Reporting tokens for TPM tracking
const worker = new Worker('inference', async (job) => {
const result = await callLLM(job.data.prompt);
// Report tokens so the limiter can track consumption
await job.reportTokens(result.totalTokens);
return result;
}, { connection, tokenLimiter: { maxTokens: 50_000, duration: 60_000 } });Scope options
| Scope | Behavior |
|---|---|
'queue' | Shared Valkey counter across all workers. Accurate but adds 1 RTT per job. |
'worker' | In-memory counter per worker instance. Zero overhead, but each worker enforces independently. |
'both' (default) | Local check first, then Valkey. Best of both - fast path when under limit, accurate enforcement when near capacity. |
When either RPM or TPM is exceeded, the worker pauses fetching new jobs until the window resets. Jobs already in progress are not interrupted.
Vector Search
Search jobs by vector similarity using Valkey Search. Create an index over job hashes, store embeddings via job.storeVector(), then query with queue.vectorSearch().
See the dedicated Vector Search guide for full details.
Quick example
// Create an index with a vector field
await queue.createJobIndex({
vectorField: {
name: 'embedding',
dimensions: 1536,
algorithm: 'HNSW',
distanceMetric: 'COSINE',
},
});
// Store a vector on a job
const job = await queue.add('document', { title: 'Queue Fundamentals' });
await job.storeVector('embedding', myEmbeddingVector);
// Search by similarity
const results = await queue.vectorSearch(queryVector, {
k: 10,
filter: '@state:{completed}',
});
for (const { job, score } of results) {
console.log(`${job.name}: ${score}`);
}Per-Job Lock Duration
Override the worker-level lock duration for individual jobs. This is critical for AI workloads where inference latency varies widely - embedding calls finish in 1-2 seconds while large model generation can take 30-60 seconds.
// Fast embedding job - 5 second lock
await queue.add('embed', { text: 'hello' }, {
lockDuration: 5_000,
});
// Slow generation job - 60 second lock
await queue.add('generate', { prompt: 'Write a novel...' }, {
lockDuration: 60_000,
});The lock duration controls:
- Heartbeat frequency: the worker sends a heartbeat every
lockDuration / 2. - Stall detection threshold: jobs without a heartbeat within
lockDurationare reclaimed.
Without per-job lock, you must set the worker's lockDuration to accommodate the slowest job, which degrades stall detection for fast jobs.
Combining Primitives
These primitives compose naturally. A single pipeline can use:
- Fallback chains for model resilience
- Usage tracking to monitor costs
- Budget caps to prevent runaway spending
- Token streaming for real-time output
- Suspend/resume for human-in-the-loop checkpoints
- Per-job lock to match timeouts to model latency
- TPM limiting to respect provider rate limits
// RAG pipeline with all primitives
const node = await flow.add({
name: 'rag',
queueName: 'ai',
data: { query: 'How does it work?' },
children: [
{
name: 'embed',
queueName: 'ai',
data: { step: 'embed' },
opts: { lockDuration: 5_000 },
},
{
name: 'generate',
queueName: 'ai',
data: { step: 'generate' },
opts: {
lockDuration: 60_000,
fallbacks: [
{ model: 'gpt-5.4-nano', provider: 'openai' },
{ model: 'claude-sonnet-4-20250514', provider: 'anthropic' },
],
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
},
},
],
}, { budget: { maxTotalTokens: 5000, onExceeded: 'fail' } });See Examples: AI Pipelines for complete, runnable examples.