Skip to content
Download

RabbitMQ Module

The @grupodiariodaregiao/bunstone RabbitMQ module provides first-class support for publishing and consuming messages via RabbitMQ (AMQP 0-9-1).

Installation

bash
bun add amqplib
bun add -d @types/amqplib

Setup

Register RabbitMQModule once in your root AppModule. The module is global so RabbitMQService is injectable everywhere without re-importing the module.

typescript
import { Module } from "@grupodiariodaregiao/bunstone";
import { RabbitMQModule } from "@grupodiariodaregiao/bunstone/lib/rabbitmq/rabbitmq-module";

@Module({
  imports: [
    RabbitMQModule.register({
      uri: "amqp://guest:guest@localhost:5672",

      exchanges: [
        { name: "events", type: "topic", durable: true },
      ],

      queues: [
        {
          name: "orders.created",
          durable: true,
          bindings: { exchange: "events", routingKey: "orders.created.*" },
        },
      ],

      prefetch: 10,
    }),
  ],
})
export class AppModule {}

Connection options

OptionTypeDefaultDescription
uristringFull AMQP URI. Takes precedence over individual fields.
hoststring"localhost"Broker hostname
portnumber5672Broker port
usernamestring"guest"AMQP username
passwordstring"guest"AMQP password
vhoststring"/"Virtual host
exchangesRabbitMQExchangeConfig[][]Exchanges to assert at startup
queuesRabbitMQQueueConfig[][]Queues to assert & bind at startup
prefetchnumber10Unacked message limit per consumer channel
reconnect.enabledbooleantrueReconnect on connection loss
reconnect.delaynumber2000ms between reconnection attempts
reconnect.maxRetriesnumber10Max attempts (0 = unlimited)

Declaring Exchanges

typescript
exchanges: [
  {
    name: "orders",
    type: "topic",      // "direct" | "topic" | "fanout" | "headers"
    durable: true,      // survives broker restart (default: true)
    autoDelete: false,  // delete when no bindings remain (default: false)
  },
]

Declaring Queues

typescript
queues: [
  {
    name: "orders.created",
    durable: true,

    // Bind to one exchange
    bindings: { exchange: "orders", routingKey: "orders.created.*" },

    // Or bind to multiple exchanges
    // bindings: [
    //   { exchange: "orders",  routingKey: "orders.created.*" },
    //   { exchange: "audit",   routingKey: "#" },
    // ],

    // Dead letter exchange for rejected/expired messages
    deadLetterExchange: "orders.dlx",
    deadLetterRoutingKey: "orders.dead",

    messageTtl: 60_000,  // message expiry in ms
    maxLength: 10_000,   // cap queue depth
  },
]

Consuming Messages

A consumer is a class decorated with @RabbitConsumer() that contains methods decorated with @RabbitSubscribe().

There are three subscription modes:

ModeWhen to use
Queue mode{ queue: "..." }Consume from a pre-declared, persistent queue. The handler receives every message that arrives on that queue, regardless of routing key.
Queue + routing key filter{ queue: "...", routingKey: "..." }Consume from a pre-declared queue but only dispatch messages whose routing key matches the declared pattern. Messages that don't match are silently acked.
Exchange / routing key mode{ exchange: "...", routingKey: "..." }The lib creates an exclusive auto-delete queue per handler and binds it to the exchange. Every handler for the same routing key gets its own copy (broker-level fan-out).

Queue mode – in-process fan-out

When multiple handlers (in the same or different @RabbitConsumer classes) subscribe to the same queue name, the lib creates a single AMQP consumer and delivers each message to all handlers in turn.

typescript
@RabbitConsumer()
export class Consumer1 {
  @RabbitSubscribe({ queue: "orders" })
  async data(msg: RabbitMessage<{ item: string }>) {
    console.log("RECEIVED 1", msg.data);
    msg.ack(); // only the first ack/nack/reject call takes effect
  }
}

@RabbitConsumer()
export class Consumer2 {
  @RabbitSubscribe({ queue: "orders" })
  async data(msg: RabbitMessage<{ item: string }>) {
    console.log("RECEIVED 2", msg.data);
    msg.ack(); // no-op: message was already settled above
  }
}

Queue mode with routing key filter

Add routingKey to a queue-mode subscription to make it selective: the handler is only called when the incoming message's routing key matches the declared pattern. Messages that don't match any handler are silently acknowledged so they don't pile up as unacked.

This is useful when a single durable queue receives multiple event types (e.g. article.*) but different handlers should react only to specific ones.

typescript
RabbitMQModule.register({
  exchanges: [{ name: "articles", type: "topic", durable: true }],
  queues: [
    {
      name: "article",
      durable: true,
      bindings: { exchange: "articles", routingKey: "article.*" },
    },
  ],
})
typescript
@RabbitConsumer()
export class ArticleConsumer {

  // ✅ Only called when routingKey === "article.published"
  @RabbitSubscribe({ queue: "article", routingKey: "article.published" })
  async onPublished(msg: RabbitMessage<{ articleId: string }>) {
    console.log("published", msg.data.articleId);
    msg.ack();
  }

  // ✅ Only called when routingKey === "article.deleted"
  @RabbitSubscribe({ queue: "article", routingKey: "article.deleted" })
  async onDeleted(msg: RabbitMessage<{ articleId: string }>) {
    console.log("deleted", msg.data.articleId);
    msg.ack();
  }

  // ✅ No routingKey → called for EVERY message on the queue
  @RabbitSubscribe({ queue: "article" })
  async onAll(msg: RabbitMessage<{ articleId: string }>) {
    console.log("any event", msg.raw.fields.routingKey, msg.data.articleId);
    msg.ack();
  }
}

Wildcard patternsroutingKey supports the same * (one word) and # (zero or more words) wildcards as topic exchanges:

typescript
@RabbitSubscribe({ queue: "article", routingKey: "article.#" })
// matches: article.published, article.deleted, article.updated.title, …

Unmatched messages – if a message arrives on the queue but no handler's routingKey matches it (and no handler has routingKey omitted), the lib automatically acks it to prevent it from blocking the queue.

Mix freely – you can combine filtered and unfiltered handlers on the same queue. Unfiltered handlers (routingKey omitted) always run.

Durability – unlike exchange + routing key mode, the queue persists even when no consumers are connected, so messages are never lost. This mode is recommended for production workloads.

Settle guardack(), nack(), and reject() are wrapped so only the first call takes effect. Subsequent calls from other handlers are silently ignored, preventing "already acknowledged" errors.

noAck mode – the channel uses noAck: true only when every handler for a queue opts in. If at least one handler uses manual ack (the default), the channel is placed in manual-ack mode.

typescript
import { RabbitConsumer, RabbitSubscribe } from "@grupodiariodaregiao/bunstone";
import type { RabbitMessage } from "@grupodiariodaregiao/bunstone";

@RabbitConsumer()
export class OrderConsumer {

  // Manual acknowledgement (default)
  @RabbitSubscribe({ queue: "orders.created" })
  async handleOrderCreated(msg: RabbitMessage<{ orderId: string }>) {
    console.log("New order:", msg.data.orderId);
    msg.ack();         // acknowledge – removes message from queue
    // msg.nack();     // negative-ack + requeue (default requeue: true)
    // msg.reject();   // reject without requeue
  }

  // Automatic acknowledgement
  @RabbitSubscribe({ queue: "notifications", noAck: true })
  async handleNotification(msg: RabbitMessage<{ text: string }>) {
    console.log(msg.data.text);
    // no need to call msg.ack()
  }
}

Add the consumer class to the providers array of its module:

typescript
@Module({
  imports: [RabbitMQModule.register({ ... })],
  providers: [OrderConsumer],
})
export class AppModule {}

RabbitMessage<T>

PropertyTypeDescription
dataTDeserialized JSON payload
rawConsumeMessageRaw amqplib message
ack()() => voidAcknowledge the message
nack(requeue?)(boolean?) => voidNegative-ack (requeue default: true)
reject()() => voidReject without requeueing

Publishing Messages

Inject RabbitMQService anywhere in your application to publish messages.

typescript
import { Injectable } from "@grupodiariodaregiao/bunstone";
import { RabbitMQService } from "@grupodiariodaregiao/bunstone";

@Injectable()
export class OrderService {
  constructor(private readonly rabbit: RabbitMQService) {}

  async placeOrder(order: Order) {
    // Publish to an exchange with a routing key
    await this.rabbit.publish("orders", "orders.created.v1", order);
  }

  async sendDirectToQueue(notification: Notification) {
    // Send directly to a queue, bypassing exchange routing
    await this.rabbit.sendToQueue("notifications", notification);
  }
}

Publish options

Both publish() and sendToQueue() accept an optional RabbitPublishOptions object:

typescript
await this.rabbit.publish("orders", "orders.created", payload, {
  persistent: true,           // survive broker restart (default: true)
  headers: { "x-version": 2 },
  correlationId: "req-123",
  expiration: 30_000,         // message TTL in ms
  priority: 5,                // 0–9
});

Multiple Queues

Because each @RabbitSubscribe gets its own dedicated channel, a single consumer class can listen to multiple independent queues simultaneously:

typescript
@RabbitConsumer()
export class EventConsumer {

  @RabbitSubscribe({ queue: "user.registered" })
  async onUserRegistered(msg: RabbitMessage<User>) { /* … */ msg.ack(); }

  @RabbitSubscribe({ queue: "payment.completed" })
  async onPaymentCompleted(msg: RabbitMessage<Payment>) { /* … */ msg.ack(); }

  @RabbitSubscribe({ queue: "shipment.dispatched" })
  async onShipmentDispatched(msg: RabbitMessage<Shipment>) { /* … */ msg.ack(); }
}

Routing Key Subscriptions (Topic / Direct Fan-out)

Besides consuming a named queue, @RabbitSubscribe also supports routing key mode: declare exchange + routingKey instead of queue.

When this mode is used, the lib:

  1. Creates a server-named exclusive, auto-delete queue per handler at startup.
  2. Binds that queue to the given exchange with the given routing key.
  3. Starts consuming from that private queue.

Because every handler gets its own queue, all handlers subscribed to the same routing key receive an independent copy of each message — this is the natural fan-out behaviour of topic exchanges.

No queue declarations needed. The lib manages the ephemeral queues automatically. You only need to declare the exchange in RabbitMQModule.register({ exchanges: [...] }).

Basic example

typescript
// 1. Declare only the exchange in the module
RabbitMQModule.register({
  uri: "amqp://...",
  exchanges: [{ name: "articles", type: "topic" }],
})

// 2. Subscribe to specific routing keys
@RabbitConsumer()
export class ArticleConsumer {

  @RabbitSubscribe({ exchange: "articles", routingKey: "article.published" })
  async onPublished(msg: RabbitMessage<{ articleId: string }>) {
    console.log("Published:", msg.data.articleId);
    msg.ack();
  }

  @RabbitSubscribe({ exchange: "articles", routingKey: "article.updated" })
  async onUpdated(msg: RabbitMessage<{ articleId: string }>) {
    console.log("Updated:", msg.data.articleId);
    msg.ack();
  }

  @RabbitSubscribe({ exchange: "articles", routingKey: "article.deleted" })
  async onDeleted(msg: RabbitMessage<{ articleId: string }>) {
    console.log("Deleted:", msg.data.articleId);
    msg.ack();
  }
}
typescript
// 3. Publish with the routing key
await this.rabbit.publish("articles", "article.published", { articleId: "123" });

Multiple handlers for the same routing key

Every handler subscribed to the same routing key is called independently.
You can spread handlers across different classes:

typescript
/** Handler A – invalidate cache */
@RabbitConsumer()
export class ArticleCacheHandler {
  @RabbitSubscribe({ exchange: "articles", routingKey: "article.published" })
  async onPublished(msg: RabbitMessage<{ articleId: string }>) {
    await invalidateCache(msg.data.articleId);
    msg.ack();
  }
}

/** Handler B – send push notification */
@RabbitConsumer()
export class ArticleNotificationHandler {
  @RabbitSubscribe({ exchange: "articles", routingKey: "article.published" })
  async onPublished(msg: RabbitMessage<{ articleId: string }>) {
    await sendPushNotification(msg.data.articleId);
    msg.ack();
  }
}

// Publishing one message → both handlers are triggered simultaneously
await this.rabbit.publish("articles", "article.published", { articleId: "123" });

Wildcard patterns

Topic exchanges support * (one word) and # (zero or more words):

typescript
@RabbitConsumer()
export class ArticleAuditHandler {

  // Matches: article.published, article.updated, article.deleted, …
  @RabbitSubscribe({ exchange: "articles", routingKey: "article.#" })
  async onAnyArticleEvent(msg: RabbitMessage<{ articleId: string }>) {
    console.log(
      "Event:", msg.raw.fields.routingKey,
      "| Article:", msg.data.articleId,
    );
    msg.ack();
  }
}

@RabbitSubscribe options reference

OptionTypeRequiredDescription
queuestring(modes 1 & 2)Named queue to consume from.
exchangestring(mode 3)Exchange to bind to. Must be used together with routingKey and without queue.
routingKeystringRouting key pattern. Supports * and # wildcards.
• With queue (mode 2): filters which messages dispatch to this handler.
• With exchange (mode 3): binds an ephemeral queue to the exchange.
noAckbooleanAuto-acknowledge on delivery. Default: false.

Mode summary

queueexchangeroutingKeyBehaviour
Receives every message from the named queue
Receives only messages whose routing key matches the pattern
Creates an ephemeral exclusive queue bound to the exchange

Dead Letter Exchanges & DLQ Reprocessing

When a message is rejected, expired (TTL), or the queue reaches maxLength, RabbitMQ routes it to a configured Dead Letter Exchange (DLX), from where it lands in a Dead Letter Queue (DLQ). The lib gives you two tools to work with DLQs:

  1. Auto-topology – declare the DLX exchange + DLQ queue with a single config option
  2. RabbitMQDeadLetterService – inspect, requeue, or discard dead-lettered messages

1. Auto-topology with deadLetterQueue

Set deadLetterExchange and deadLetterQueue together. The lib will automatically assert the DLX exchange, the DLQ queue, and their binding on startup — no need to list them in the exchanges or queues arrays separately.

typescript
RabbitMQModule.register({
  exchanges: [
    { name: "events", type: "topic" },
    // ↑ you only need to declare your main exchange
    // The DLX "orders.cancelled.dlx" is auto-asserted below
  ],
  queues: [
    {
      name: "orders.cancelled",
      bindings: { exchange: "events", routingKey: "orders.cancelled" },

      // ─── Dead Letter config ─────────────────────────────────────────────
      deadLetterExchange:    "orders.cancelled.dlx",  // DLX name (auto-asserted)
      deadLetterRoutingKey:  "orders.cancelled.dead", // routing key to DLQ
      deadLetterQueue:       "orders.cancelled.dlq",  // DLQ name (auto-asserted + bound)
      deadLetterExchangeType: "direct",               // optional, default: "direct"

      messageTtl: 30_000, // messages expire → go to DLQ after 30 s
    },
  ],
})

What happens at startup

StepAction
1Assert orders.cancelled queue with x-dead-letter-exchange arg
2Assert exchange orders.cancelled.dlx (direct, durable)
3Assert queue orders.cancelled.dlq (durable)
4Bind orders.cancelled.dlqorders.cancelled.dlx with key orders.cancelled.dead

2. Consuming DLQ messages with @RabbitSubscribe

Since the DLQ is a normal queue, you can attach a @RabbitConsumer to it. Messages arrive as DeadLetterMessage<T> (import the type from the lib) which adds a deathInfo field and a republish() helper.

typescript
import { RabbitConsumer, RabbitSubscribe } from "@grupodiariodaregiao/bunstone";
import type { DeadLetterMessage } from "@grupodiariodaregiao/bunstone";

@RabbitConsumer()
export class OrderDLQConsumer {

  @RabbitSubscribe({ queue: "orders.cancelled.dlq" })
  async handle(msg: DeadLetterMessage<{ orderId: string }>) {
    const { orderId } = msg.data;
    const info = msg.deathInfo; // structured x-death metadata

    console.warn(`Dead letter: ${orderId} | reason=${info?.reason} | attempts=${info?.count}`);

    if ((info?.count ?? 0) < 3) {
      // Retry: republish to the original exchange
      await msg.republish("events", "orders.cancelled");
      msg.ack(); // remove from DLQ after successful republish
    } else {
      // Too many failures → discard
      console.error(`Giving up on order ${orderId}`);
      msg.ack();
    }
  }
}

DeadLetterMessage<T>

PropertyTypeDescription
dataTDeserialized JSON payload
rawConsumeMessageRaw amqplib message
deathInfoDeadLetterDeathInfo | nullStructured x-death metadata
ack()() => voidRemove permanently from DLQ
nack(requeue?)(boolean?) => voidReturn to DLQ (requeue default: false)
republish(exchange, key, opts?)Promise<void>Re-publish to an exchange for reprocessing

DeadLetterDeathInfo

PropertyTypeDescription
queuestringOriginal queue where the message died
exchangestringExchange where it was published
routingKeysstring[]Routing keys used
countnumberHow many times this message has died
reason"rejected" | "expired" | "maxlen" | "delivery-limit"Why it was dead-lettered
timeDateWhen it was dead-lettered

3. Manual reprocessing with RabbitMQDeadLetterService

RabbitMQDeadLetterService is registered globally by RabbitMQModule and can be injected anywhere in your application. Useful for admin REST endpoints, scheduled requeue jobs, or CLI scripts.

typescript
import { Injectable } from "@grupodiariodaregiao/bunstone";
import { RabbitMQDeadLetterService } from "@grupodiariodaregiao/bunstone";

@Injectable()
export class DLQAdminService {
  constructor(private readonly dlq: RabbitMQDeadLetterService) {}

  // How many messages are stuck
  async countFailed() {
    return this.dlq.messageCount("orders.cancelled.dlq");
  }

  // Peek at messages without consuming them
  async preview(limit = 10) {
    return this.dlq.inspect("orders.cancelled.dlq", limit);
  }

  // Move all messages back to the original exchange
  async retryAll() {
    return this.dlq.requeueMessages({
      fromQueue:  "orders.cancelled.dlq",
      toExchange: "events",
      routingKey: "orders.cancelled",
    });
  }

  // Move only the first 50
  async retryBatch() {
    return this.dlq.requeueMessages({
      fromQueue:  "orders.cancelled.dlq",
      toExchange: "events",
      routingKey: "orders.cancelled",
      count: 50,
    });
  }

  // Permanently delete all dead letters
  async purge() {
    return this.dlq.discardMessages("orders.cancelled.dlq");
  }
}

RabbitMQDeadLetterService API

MethodReturnsDescription
inspect<T>(queue, count?)Promise<DeadLetterMessage<T>[]>Peek at messages (put back after reading)
requeueMessages(options)Promise<number>Move messages → exchange. Returns count requeued.
discardMessages(queue, count?)Promise<number>Permanently delete messages. Returns count discarded.
messageCount(queue)Promise<number>Current message count in a queue

RequeueOptions

FieldTypeRequiredDescription
fromQueuestringDead letter queue to consume from
toExchangestringExchange to republish to
routingKeystringRouting key for republished messages
countnumberMax messages to requeue. Omit for all.
publishOptionsRabbitPublishOptionsAdditional publish options

Every republished message gets an x-dlq-requeued header incremented on each manual requeue, so you can track how many times a message has been manually retried if needed.


4. Admin HTTP endpoints example

A common pattern is exposing DLQ management via protected REST endpoints:

typescript
@Controller("/admin/dlq")
export class DLQController {
  constructor(private readonly dlq: RabbitMQDeadLetterService) {}

  @Get("/count")
  count() {
    return this.dlq.messageCount("orders.cancelled.dlq");
  }

  @Get("/inspect")
  inspect(@Query("limit") limit: string) {
    return this.dlq.inspect("orders.cancelled.dlq", Number(limit ?? 10));
  }

  @Get("/requeue")
  requeue(@Query("limit") limit: string) {
    return this.dlq.requeueMessages({
      fromQueue:  "orders.cancelled.dlq",
      toExchange: "events",
      routingKey: "orders.cancelled",
      count: limit ? Number(limit) : undefined,
    });
  }

  @Get("/discard")
  discard(@Query("limit") limit: string) {
    return this.dlq.discardMessages(
      "orders.cancelled.dlq",
      limit ? Number(limit) : undefined,
    );
  }
}

Practical Example

ts
import {
	AppStartup,
	Controller,
	Get,
	Injectable,
	Module,
	Query,
	RabbitConsumer,
	RabbitMQDeadLetterService,
	RabbitMQModule,
	RabbitMQService,
	RabbitSubscribe,
} from "../../index";
import type { DeadLetterMessage, RabbitMessage } from "../../index";

// ─── 1. Types ───────────────────────────────────────────────────────────────

interface OrderPayload {
	orderId: string;
	product: string;
	quantity: number;
}

interface NotificationPayload {
	userId: string;
	message: string;
}

// ─── 2. Consumers ────────────────────────────────────────────────────────────

/**
 * Handles messages from the "orders.created" queue.
 * Messages require manual acknowledgement (default).
 */
@RabbitConsumer()
export class OrderConsumer {
	@RabbitSubscribe({ queue: "orders.created" })
	async handleOrderCreated(msg: RabbitMessage<OrderPayload>) {
		const { orderId, product, quantity } = msg.data;
		console.log(
			`[OrderConsumer] New order: #${orderId} – ${quantity}x ${product}`,
		);

		// Simulate async processing
		await new Promise((resolve) => setTimeout(resolve, 200));

		// Acknowledge the message so it's removed from the queue
		msg.ack();
	}

	@RabbitSubscribe({ queue: "orders.cancelled" })
	async handleOrderCancelled(msg: RabbitMessage<{ orderId: string }>) {
		console.log(`[OrderConsumer] Order cancelled: #${msg.data.orderId}`);
		msg.ack();
	}
}

/**
 * Handles messages from the "notifications" queue.
 * Uses noAck mode – no manual acknowledgement needed.
 */
@RabbitConsumer()
export class NotificationConsumer {
	@RabbitSubscribe({ queue: "notifications", noAck: true })
	async handleNotification(msg: RabbitMessage<NotificationPayload>) {
		console.log(
			`[NotificationConsumer] Notify user ${msg.data.userId}: ${msg.data.message}`,
		);
	}
}

// ─── 3. Dead Letter Consumer ─────────────────────────────────────────────────

/**
 * Consumes messages that landed in the Dead Letter Queue for "orders.cancelled".
 *
 * The `deathInfo` property on the message contains structured metadata from the
 * RabbitMQ `x-death` header (original queue, exchange, reason, timestamp, etc.).
 *
 * Options:
 *   msg.ack()                             → remove permanently from DLQ
 *   msg.nack(true)                        → put back in DLQ
 *   msg.republish('events', 'orders.cancelled') → retry via original exchange
 */
@RabbitConsumer()
export class OrderDLQConsumer {
	@RabbitSubscribe({ queue: "orders.cancelled.dlq" })
	async handleFailedCancelledOrder(msg: DeadLetterMessage<{ orderId: string }>) {
		const { orderId } = msg.data;
		const info = msg.deathInfo;

		console.warn(
			`[DLQ] Dead letter received: orderId=${orderId}` +
				(info ? ` | reason=${info.reason} | from=${info.queue} | count=${info.count}` : ""),
		);

		// Decide what to do based on death count
		if ((info?.count ?? 0) < 3) {
			// Retry: republish back to the original exchange
			console.log(`[DLQ] Retrying order #${orderId}…`);
			await msg.republish("events", "orders.cancelled");
			msg.ack(); // remove from DLQ after successful republish
		} else {
			// Too many failures – log and discard
			console.error(`[DLQ] Giving up on order #${orderId} after ${info?.count} attempts`);
			msg.ack();
		}
	}
}

// ─── 4. Service (publisher) ─────────────────────────────────────────────────

@Injectable()
export class OrderService {
	constructor(private readonly rabbit: RabbitMQService) {}

	async createOrder(product: string, quantity: number) {
		const payload: OrderPayload = {
			orderId: `ORD-${Date.now()}`,
			product,
			quantity,
		};

		// Publish to the "events" exchange; routing key routes to "orders.created"
		await this.rabbit.publish("events", "orders.created", payload);
		return payload;
	}

	async cancelOrder(orderId: string) {
		await this.rabbit.publish("events", "orders.cancelled", { orderId });
		return { orderId, status: "cancelled" };
	}

	async sendNotification(userId: string, message: string) {
		// Send directly to a queue, bypassing the exchange
		await this.rabbit.sendToQueue("notifications", { userId, message });
		return { sent: true };
	}
}

// ─── 5. Controller ───────────────────────────────────────────────────────────

@Controller("/orders")
export class OrderController {
	constructor(
		private readonly orderService: OrderService,
		private readonly dlq: RabbitMQDeadLetterService,
	) {}

	@Get("/create")
	async create(
		@Query("product") product: string,
		@Query("qty") qty: string,
	) {
		const order = await this.orderService.createOrder(
			product ?? "Widget",
			Number(qty ?? 1),
		);
		return { message: "Order published", order };
	}

	@Get("/cancel")
	async cancel(@Query("id") id: string) {
		return this.orderService.cancelOrder(id ?? "ORD-UNKNOWN");
	}

	@Get("/notify")
	async notify(
		@Query("userId") userId: string,
		@Query("msg") message: string,
	) {
		return this.orderService.sendNotification(
			userId ?? "user-1",
			message ?? "Hello!",
		);
	}

	// ── DLQ admin endpoints ────────────────────────────────────────────────

	/** GET /orders/dlq/count – how many messages in the DLQ */
	@Get("/dlq/count")
	async dlqCount() {
		const count = await this.dlq.messageCount("orders.cancelled.dlq");
		return { queue: "orders.cancelled.dlq", count };
	}

	/** GET /orders/dlq/inspect – peek at the first N messages */
	@Get("/dlq/inspect")
	async dlqInspect(@Query("limit") limit: string) {
		const messages = await this.dlq.inspect("orders.cancelled.dlq", Number(limit ?? 10));
		return {
			count: messages.length,
			messages: messages.map((m) => ({
				data: m.data,
				deathInfo: m.deathInfo,
			})),
		};
	}

	/** GET /orders/dlq/requeue – move messages back to the original exchange */
	@Get("/dlq/requeue")
	async dlqRequeue(@Query("limit") limit: string) {
		const requeued = await this.dlq.requeueMessages({
			fromQueue: "orders.cancelled.dlq",
			toExchange: "events",
			routingKey: "orders.cancelled",
			count: limit ? Number(limit) : undefined,
		});
		return { requeued };
	}

	/** GET /orders/dlq/discard – permanently remove messages from the DLQ */
	@Get("/dlq/discard")
	async dlqDiscard(@Query("limit") limit: string) {
		const discarded = await this.dlq.discardMessages(
			"orders.cancelled.dlq",
			limit ? Number(limit) : undefined,
		);
		return { discarded };
	}
}

// ─── 6. Routing-key consumers (topic exchange fan-out) ───────────────────────
//
// Instead of naming a pre-declared queue, these handlers use
//   exchange + routingKey
// The lib creates an exclusive auto-delete queue per handler and binds it to
// the exchange. Because every handler gets its OWN queue, publishing a single
// message to "article.published" triggers ALL handlers subscribed to that key.
//
// Publish with:
//   await this.rabbit.publish("articles", "article.published", { articleId: "1" });

interface ArticlePayload {
	articleId: string;
}

/** First handler for article.published – e.g. invalidate cache */
@RabbitConsumer()
export class ArticleCacheHandler {
	@RabbitSubscribe({ exchange: "articles", routingKey: "article.published" })
	async onPublished(msg: RabbitMessage<ArticlePayload>) {
		console.log("[ArticleCacheHandler] Invalidate cache for", msg.data.articleId);
		msg.ack();
	}

	@RabbitSubscribe({ exchange: "articles", routingKey: "article.updated" })
	async onUpdated(msg: RabbitMessage<ArticlePayload>) {
		console.log("[ArticleCacheHandler] Refresh cache for", msg.data.articleId);
		msg.ack();
	}

	@RabbitSubscribe({ exchange: "articles", routingKey: "article.deleted" })
	async onDeleted(msg: RabbitMessage<ArticlePayload>) {
		console.log("[ArticleCacheHandler] Evict cache for", msg.data.articleId);
		msg.ack();
	}
}

/** Second handler for article.published – e.g. send notification */
@RabbitConsumer()
export class ArticleNotificationHandler {
	@RabbitSubscribe({ exchange: "articles", routingKey: "article.published" })
	async onPublished(msg: RabbitMessage<ArticlePayload>) {
		console.log(
			"[ArticleNotificationHandler] Send push notification for",
			msg.data.articleId,
		);
		msg.ack();
	}
}

/** Wildcard: subscribe to ALL article events with article.# */
@RabbitConsumer()
export class ArticleAuditHandler {
	@RabbitSubscribe({ exchange: "articles", routingKey: "article.#" })
	async onAnyArticleEvent(msg: RabbitMessage<ArticlePayload>) {
		console.log(
			"[ArticleAuditHandler] Audit event for",
			msg.data.articleId,
			"| routingKey:",
			msg.raw.fields.routingKey,
		);
		msg.ack();
	}
}

// ─── 7. App Module ───────────────────────────────────────────────────────────

@Module({
	imports: [
		RabbitMQModule.register({
			// Provide either `uri` or individual fields
			uri: process.env.RABBITMQ_URI ?? "amqp://guest:guest@localhost:5672",

			// Declare exchanges (asserted at startup)
			exchanges: [
				{
					name: "events",
					type: "topic",
					durable: true,
				},
				// Topic exchange for article events – used by routing-key consumers above
				{
					name: "articles",
					type: "topic",
					durable: true,
				},
			],

			// Declare queues and bind them to the exchange
			queues: [
				{
					name: "orders.created",
					durable: true,
					bindings: { exchange: "events", routingKey: "orders.created" },
				},
				{
					name: "orders.cancelled",
					durable: true,
					bindings: { exchange: "events", routingKey: "orders.cancelled" },
					//
					// Dead Letter configuration ─────────────────────────────────
					// Messages rejected or expired here land in "orders.cancelled.dlq".
					//
					// `deadLetterQueue` triggers auto-topology:
					//   - asserts exchange "orders.cancelled.dlx" (direct)
					//   - asserts queue    "orders.cancelled.dlq"
					//   - binds DLQ → DLX with the deadLetterRoutingKey
					//
					deadLetterExchange: "orders.cancelled.dlx",
					deadLetterRoutingKey: "orders.cancelled.dead",
					deadLetterQueue: "orders.cancelled.dlq",
					messageTtl: 30_000, // messages expire after 30 s → go to DLQ
				},
				{
					name: "notifications",
					durable: true,
				},
				// Note: NO queue declarations needed for the routing-key consumers above.
				// The lib creates exclusive auto-delete queues automatically at runtime.
			],

			// How many unacked messages each consumer channel may hold
			prefetch: 5,

			reconnect: {
				enabled: true,
				delay: 3000,
				maxRetries: 10,
			},
		}),
	],
	controllers: [OrderController],
	providers: [
		OrderService,
		OrderConsumer,
		NotificationConsumer,
		OrderDLQConsumer,
		// Routing-key consumers
		ArticleCacheHandler,
		ArticleNotificationHandler,
		ArticleAuditHandler,
	],
})
class AppModule {}

// ─── 7. Start ────────────────────────────────────────────────────────────────

console.log("Starting RabbitMQ example…");
console.log(
	"Ensure RabbitMQ is running: docker run -p 5672:5672 rabbitmq:4-management",
);
AppStartup.create(AppModule).then(({ listen }) => listen(3000));

See it on GitHub

Released under the MIT License.