Back/Module P-12 Background Jobs and Task Queues with BullMQ
Module P-12·23 min read

Why job queues exist, BullMQ with Redis, workers and concurrency, retries and exponential backoff, dead letter queues, cron jobs, and real-world use cases.

Module P-12 — Background Jobs and Task Queues with BullMQ

What this module covers: Some work is too slow, too unreliable, or too risky to do inside a request. Sending email, resizing images, calling third-party APIs, generating reports, sending push notifications — all of these should happen off the request path. This module covers why job queues exist, BullMQ's architecture with Redis, defining and dispatching jobs, writing workers with concurrency control, retries with exponential backoff, dead letter queues for failed jobs, repeatable cron jobs, and the patterns that make queues reliable in production.


Why Job Queues

Consider sending a transactional email when an order is placed. Three approaches:

typescript
// Option 1: Inline — user waits 300ms for Mailgun export const createOrder = asyncHandler(async (req, res) => { const order = await ordersService.createOrder(req.body); await emailService.sendConfirmation(order); // blocks the response res.status(201).json(order); }); // Option 2: Fire-and-forget — fast but unreliable export const createOrder = asyncHandler(async (req, res) => { const order = await ordersService.createOrder(req.body); emailService.sendConfirmation(order).catch(console.error); // if this fails, no retry res.status(201).json(order); }); // Option 3: Job queue — fast AND reliable export const createOrder = asyncHandler(async (req, res) => { const order = await ordersService.createOrder(req.body); await emailQueue.add('send-confirmation', { orderId: order.id }); // returns in <1ms res.status(201).json(order); // response goes out immediately // Worker picks up the job, retries on failure, logs results });

A job queue gives you:

  • Decoupled execution — the response returns before the work is done
  • Automatic retries — if the email provider is down, the job retries with backoff
  • Visibility — you can see queued, active, completed, and failed jobs
  • Rate limiting — process at most N jobs per second regardless of how many are enqueued
  • Concurrency control — run M workers in parallel without overloading the system
  • Persistence — jobs survive process restarts (stored in Redis)

BullMQ Architecture

BullMQ uses Redis as its store. Three components:

Producer (your API)     Queue (Redis)        Worker (separate process or same process)
       │                     │                           │
       │── queue.add() ──────►│                           │
       │                     │◄──── worker.process() ────│
       │                     │                           │
       │                     │──── job data ────────────►│
       │                     │                           │ (processes job)
       │                     │◄──── completed ───────────│

The queue lives in Redis — producers and workers are just Node.js processes that connect to it. You can have multiple producers (different API instances) and multiple workers (separate scaling).

bash
npm install bullmq ioredis

Defining Queues and Adding Jobs

typescript
// src/queues/email.queue.ts import { Queue } from 'bullmq'; import { env } from '../config/env.js'; // Job type definitions — TypeScript safety across producer and worker export interface SendOrderConfirmationJob { orderId: number; } export interface SendPasswordResetJob { userId: number; resetToken: string; } export type EmailJobData = | { type: 'order-confirmation'; data: SendOrderConfirmationJob } | { type: 'password-reset'; data: SendPasswordResetJob }; // Create the queue — connects to Redis export const emailQueue = new Queue<EmailJobData>('email', { connection: { url: env.REDIS_URL }, defaultJobOptions: { attempts: 3, // retry up to 3 times backoff: { type: 'exponential', delay: 1000, // 1s, 2s, 4s }, removeOnComplete: { count: 1000 }, // keep last 1000 completed jobs removeOnFail: { count: 5000 }, // keep last 5000 failed jobs for debugging }, });

Adding jobs from your service:

typescript
// src/services/orders.service.ts import { emailQueue } from '../queues/email.queue.js'; export async function createOrder(input: CreateOrderInput) { const order = await ordersRepo.create(input); // Add job — returns in < 1ms await emailQueue.add('send-confirmation', { type: 'order-confirmation', data: { orderId: order.id }, }); return order; }

Job options per-add

typescript
// High priority — processed before normal jobs await notificationQueue.add('push', payload, { priority: 1 }); // Delay — run 10 minutes from now (e.g. "your session expires soon") await emailQueue.add('expiry-warning', payload, { delay: 10 * 60 * 1000 }); // Unique job — skip if a job with this key already exists await reportQueue.add('daily-report', payload, { jobId: `daily-report:${today}`, // deterministic ID prevents duplicates }); // LIFO — process newest first (useful for real-time notifications) await notificationQueue.add('push', payload, { lifo: true });

Writing Workers

A worker subscribes to a queue and processes jobs:

typescript
// src/workers/email.worker.ts import { Worker, Job } from 'bullmq'; import { EmailJobData } from '../queues/email.queue.js'; import * as emailService from '../services/email.service.js'; import * as ordersRepo from '../repositories/orders.repository.js'; import * as usersRepo from '../repositories/users.repository.js'; import logger from '../utils/logger.js'; import { env } from '../config/env.js'; const worker = new Worker<EmailJobData>( 'email', async (job: Job<EmailJobData>) => { logger.info({ jobId: job.id, type: job.data.type }, 'Processing email job'); switch (job.data.type) { case 'order-confirmation': { const { orderId } = job.data.data; const order = await ordersRepo.findById(orderId); if (!order) throw new Error(`Order ${orderId} not found`); const user = await usersRepo.findById(order.userId); if (!user) throw new Error(`User ${order.userId} not found`); await emailService.sendOrderConfirmation({ to: user.email, orderNumber: order.id.toString(), items: order.items, total: order.total, }); break; } case 'password-reset': { const { userId, resetToken } = job.data.data; const user = await usersRepo.findById(userId); if (!user) throw new Error(`User ${userId} not found`); await emailService.sendPasswordReset(user.email, resetToken); break; } default: throw new Error(`Unknown job type: ${(job.data as any).type}`); } logger.info({ jobId: job.id }, 'Email job completed'); }, { connection: { url: env.REDIS_URL }, concurrency: 5, // process up to 5 jobs simultaneously limiter: { max: 10, // max 10 jobs per... duration: 1000, // ...1 second (rate limit) }, }, ); // Event listeners for observability worker.on('completed', (job) => { logger.info({ jobId: job.id, type: job.data.type }, 'Job completed'); }); worker.on('failed', (job, err) => { logger.error({ jobId: job?.id, type: job?.data.type, err }, 'Job failed'); }); worker.on('error', (err) => { logger.error({ err }, 'Worker error'); }); export default worker;

Running workers

Workers can run in the same process as your API (simple) or a separate process (recommended for production — independent scaling and crash isolation):

typescript
// src/workers/index.ts — separate entry point import 'dotenv/config'; import './email.worker.js'; import './image.worker.js'; import './report.worker.js'; import logger from '../utils/logger.js'; logger.info('Workers started'); // Graceful shutdown process.on('SIGTERM', async () => { logger.info('Shutting down workers...'); // Workers drain current jobs before stopping process.exit(0); });
json
// package.json { "scripts": { "start": "node dist/index.js", "start:workers": "node dist/workers/index.js" } }
yaml
# docker-compose.yml — separate container for workers services: api: command: node dist/index.js worker: command: node dist/workers/index.js # Scale workers independently # docker-compose up --scale worker=3

Retries and Exponential Backoff

When a job fails (throws an error), BullMQ retries it according to the attempts and backoff settings:

typescript
export const emailQueue = new Queue('email', { connection: { url: env.REDIS_URL }, defaultJobOptions: { attempts: 5, backoff: { type: 'exponential', delay: 2000, // initial delay in ms // Retry schedule: 2s, 4s, 8s, 16s, 32s }, }, });

Override per-job for different retry strategies:

typescript
// Critical payment job — more attempts, longer waits await paymentQueue.add('process-payment', payload, { attempts: 10, backoff: { type: 'exponential', delay: 5000 }, // Retries: 5s, 10s, 20s, 40s, 80s, 160s, 320s, 640s, 1280s, 2560s }); // Non-critical notification — fewer attempts await notificationQueue.add('push', payload, { attempts: 2, backoff: { type: 'fixed', delay: 3000 }, });

Dead letter queue pattern

After all retry attempts are exhausted, jobs land in BullMQ's failed set. For critical jobs, move them to a dedicated dead letter queue for manual review:

typescript
worker.on('failed', async (job, err) => { if (job && job.attemptsMade >= (job.opts.attempts ?? 1)) { // Final failure — move to dead letter queue await deadLetterQueue.add('failed-job', { originalQueue: 'email', jobName: job.name, jobData: job.data, error: err.message, failedAt: new Date().toISOString(), }); logger.error({ jobId: job.id, jobData: job.data }, 'Job moved to dead letter queue'); } });

Repeatable Jobs (Cron)

BullMQ supports repeatable jobs using cron syntax:

typescript
// src/jobs/scheduled.ts import { Queue } from 'bullmq'; import { env } from '../config/env.js'; const schedulerQueue = new Queue('scheduler', { connection: { url: env.REDIS_URL }, }); // Add repeatable jobs once at startup — BullMQ deduplicates automatically await schedulerQueue.add( 'daily-digest', { type: 'daily-digest' }, { repeat: { pattern: '0 8 * * *', // every day at 8:00 AM tz: 'Asia/Kolkata', }, jobId: 'daily-digest', // stable ID prevents duplicate schedule entries }, ); await schedulerQueue.add( 'cleanup-expired-tokens', { type: 'cleanup-tokens' }, { repeat: { pattern: '0 * * * *', // every hour }, jobId: 'cleanup-expired-tokens', }, ); await schedulerQueue.add( 'weekly-report', { type: 'weekly-report' }, { repeat: { pattern: '0 9 * * 1', // every Monday at 9:00 AM tz: 'Asia/Kolkata', }, jobId: 'weekly-report', }, ); logger.info('Scheduled jobs registered');

The worker for scheduled jobs is identical to any other worker. BullMQ handles the timing.


Multiple Queues by Priority

Separate queues for different workload types:

typescript
// src/queues/index.ts import { Queue } from 'bullmq'; import { env } from '../config/env.js'; const connection = { url: env.REDIS_URL }; // Separate queues — each can have independent workers and scaling export const emailQueue = new Queue('email', { connection }); export const imageQueue = new Queue('image-processing', { connection }); export const reportQueue = new Queue('reports', { connection }); export const notificationQueue = new Queue('notifications', { connection }); // High-throughput queue with rate limiting export const webhookQueue = new Queue('webhooks', { connection, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, limiter: { max: 100, duration: 1000 }, // 100 webhooks/second max }, });

BullMQ Dashboard

Inspect queues in a web UI with Bull Board:

bash
npm install @bull-board/api @bull-board/express
typescript
// src/app.ts import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter.js'; import { ExpressAdapter } from '@bull-board/express'; import { emailQueue, imageQueue, reportQueue } from './queues/index.js'; const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath('/admin/queues'); createBullBoard({ queues: [ new BullMQAdapter(emailQueue), new BullMQAdapter(imageQueue), new BullMQAdapter(reportQueue), ], serverAdapter, }); // Gate behind auth in production app.use( '/admin/queues', authenticate, authorize('admin'), serverAdapter.getRouter(), );

Visit /admin/queues to see job counts, retry failed jobs, clear queues, and inspect job data.


Real-World Patterns

Pattern 1: Always-on image processing

typescript
// When a user uploads an avatar await imageQueue.add('process-avatar', { userId: user.id, s3Key: uploadedKey, sizes: [{ width: 32, height: 32 }, { width: 128, height: 128 }], });

Pattern 2: Chaining jobs

typescript
// After payment, trigger fulfilment and email in sequence const paymentJob = await paymentQueue.add('process', paymentData); paymentJob.on('completed', async (job) => { await fulfilmentQueue.add('ship', { orderId: job.returnvalue.orderId }); await emailQueue.add('send-receipt', { orderId: job.returnvalue.orderId }); });

Pattern 3: Bulk operations off the request path

typescript
// User requests export — return immediately, email when done export const requestExport = asyncHandler(async (req, res) => { const jobId = await reportQueue.add('export', { userId: req.user!.id, format: req.body.format, filters: req.body.filters, }); res.json({ message: 'Export started. You will receive an email when it\'s ready.', jobId }); });

Summary

  • Job queues decouple slow/unreliable work from the request path. The response returns in milliseconds; the worker does the heavy lifting asynchronously.
  • BullMQ uses Redis for persistence — jobs survive process restarts. Workers and producers are just Node.js processes connected to the same Redis instance.
  • Exponential backoff retries transient failures automatically. Failed jobs after all attempts go to the failed set — move them to a dead letter queue for critical workflows.
  • Concurrency and rate limiting prevent workers from overwhelming downstream services. Set concurrency per worker instance, limiter per queue.
  • Repeatable jobs replace setInterval for cron-style work — they persist across restarts and won't double-schedule.
  • Separate queues by workload type so a spike in image processing doesn't delay emails.
  • Bull Board provides a web UI for inspecting, retrying, and clearing jobs without writing code.

Next: JSON internals — what JSON.parse and JSON.stringify actually do, the edge cases that bite at scale, streaming JSON for large payloads, and when MessagePack is worth the complexity.

Discussion