Event System - Subscriber
Overview
Subscribing to events involves using Redis Consumer Groups to read events from Redis Streams and route them to BullMQ queues for reliable, asynchronous processing. This architecture supports horizontal scaling natively.
Add a Subscriber
Step 1: Create a BullMQ Consumer
Create a consumer to process the events forwarded by the bridge.
Consumer Implementation
import { Processor } from '@nestjs/bullmq';
import { Inject } from '@nestjs/common';
import {
BaseDomainConsumerErrorHandler,
BullMqDomainAwareConsumer,
BullMqJobs,
BullMqQueues,
BullMqQueueService,
CustomLoggerService,
DOMAIN_CONSUMER_ERROR_HANDLER,
getQueueOptions,
} from '@package/nest';
import { Job } from 'bullmq';
/**
* Dumb Subscriber Consumer
*
* A simple consumer that logs events received from Redis Stream via BullMQ.
* This is used to test the Redis Stream integration.
*/
@Processor(BullMqQueues.DUMB_SUBSCRIBER_QUEUE, {
...getQueueOptions[BullMqQueues.DUMB_SUBSCRIBER_QUEUE].workerOptions,
})
export class DumbSubscriberConsumer extends BullMqDomainAwareConsumer {
constructor(
bullMqQueueService: BullMqQueueService,
@Inject(DOMAIN_CONSUMER_ERROR_HANDLER)
domainErrorHandler: BaseDomainConsumerErrorHandler,
private readonly loggerService: CustomLoggerService,
) {
super(bullMqQueueService, domainErrorHandler);
}
async processJob(job: Job): Promise<void> {
switch (job.name) {
case BullMqJobs.DUMB_SUBSCRIBER_JOB: {
const data = job.data;
this.loggerService.log('🎉 DumbSubscriberConsumer received event!', {
jobId: job.id,
jobName: job.name,
eventData: data,
eventType: data._redisEvent?.name,
timestamp: data._redisEvent?.timestamp,
correlationId: data.correlationId,
});
// Just log the event for now
this.loggerService.log('📦 Event payload:', {
payload: JSON.stringify(data, null, 2),
});
break;
}
default:
throw new Error(`No job processor found for job ${job.name}`);
}
}
}Consumer Best Practices
- Extend
BullMqDomainAwareConsumerfor automatic error handling - Log correlation IDs for distributed tracing
- Keep consumers focused on a single responsibility
- Handle errors gracefully (failed jobs will be retried)
- Use
_redisEventmetadata for debugging and observability - Inject use cases or services, don't create business logic in consumers
Step 2: Configure Subscriptions
Define your subscriptions in the framework's RedisBullMqStreamSubscriber class by editing the getSubscriptions() method.
Location
packages/nest/src/framework/redis-stream/redis-bullmq-subscriber/redis-bullmq-stream-subscriber.tsSubscription Configuration
The getSubscriptions() method maps Redis channels (streams) to BullMQ jobs:
private getSubscriptions(): EventHandlerMap {
return this.redisToBullMq.createMultipleHandlers({
[EventPublisherChannel.DOMAIN_CODING_OPTIMIZATION_EVALUATED]: [
{
jobName: BullMqJobs.DUMB_SUBSCRIBER_JOB,
priority: BullMqJobPriority.NORMAL,
},
],
[EventPublisherChannel.DOMAIN_CODING_ORCHESTRATOR_RE_EVALUATED_FROM_IGNORED]: [
{
jobName: BullMqJobs.DUMB_SUBSCRIBER_JOB,
priority: BullMqJobPriority.NORMAL,
},
],
})
}Subscription Structure
Each subscription consists of:
- Channel - The Redis stream to subscribe to (from
EventPublisherChannelenum) - Handlers Array - One or more job configurations for that channel
- Job Configuration:
jobName: The BullMQ job to create when an event is receivedpriority: Optional priority for the job (HIGH, NORMAL, LOW)
Multiple Handlers Per Channel
You can create multiple BullMQ subscribers from a single event:
[EventPublisherChannel.DOMAIN_CODING_OPTIMIZATION_EVALUATED]: [
{
jobName: BullMqJobs.UPDATE_MEDICAL_STAY_JOB,
priority: BullMqJobPriority.HIGH, // Process first
},
{
jobName: BullMqJobs.SEND_NOTIFICATION_JOB,
priority: BullMqJobPriority.NORMAL, // Process after
},
{
jobName: BullMqJobs.UPDATE_ANALYTICS_JOB,
priority: BullMqJobPriority.LOW, // Process last
},
],Understanding the Bridge between Redis Stream and BullMQ
The Redis-to-BullMQ Bridge is provided by the framework and works automatically.
How It Works
- Receives a Redis event from the stream subscriber
- Extracts the event payload
- Adds metadata (event name, timestamp, correlationId)
- Creates a BullMQ job with the specified priority
- Adds the job to the appropriate queue
Job Data Structure
The bridge enriches the job data with Redis event metadata:
{
// Original event payload (spread)
optimizationId: "123",
tenantId: "tenant-1",
medicalStayId: "stay-1",
// ... other payload properties
// Added by bridge
jobName: 'PROCESS_OPTIMIZATION_JOB',
_redisEvent: {
name: 'domain_coding_optimization_evaluated',
timestamp: 1234567890,
correlationId: '123',
}
}Subscription Flow
CRITICAL: A service only receives events if it explicitly imports the RedisBullMqStreamSubscriberModule in its app.module.ts.
Module Registration in app.module.ts: The subscribing service imports
RedisBullMqStreamSubscriberModulein its root moduletypescript// services/coding/src/app.module.ts @Module({ imports: [ // ... other imports RedisBullMqStreamSubscriberModule.registerAsync({}), ], }) export class AppModule {}- This is required for the service to receive events
- Without this import, the service won't subscribe to any channels
- The service must also configure
REDIS_STREAM_CONSUMER_GROUP.
Subscriber Initialization: When the application starts,
RedisBullMqStreamSubscriberinitializes (viaOnModuleInit)- Creates a Redis client connection dedicated to subscribing
- Loads all the subscriptions for each channel (stream)
- Ensures Consumer Groups exist for each stream
Event Reception: The subscriber client polls the streams using
XREADGROUP:- Reads messages assigned to its consumer group
- Deserializes the JSON message into an
IEvent - Invokes each handler
- Acknowledges (
XACK) the message upon successful handling
Bridge to BullMQ: Each handler (created by
RedisEventToBullMqBridge) transforms the event and queues it in BullMQ.
Horizontal Scaling with Consumer Groups (Mental Model)
To understand how scaling works, it's important to distinguish between the Service Pods (physical consumers) and the Internal Handlers.
The Scenario
Imagine you have a service coding-service that subscribes to an event. You scale this service to 2 Pods (Pod 1 and Pod 2) for high availability.
- Stream (The Source): A Redis Stream is like an append-only log file (hence just a classical log file). Each published event is a new row with an ID (a date).
- Consumer Group: Both pods belong to the same group (e.g.,
service-coding). - Consumers (The Workers):
- Pod 1 has 1 Consumer (The Redis Client connection).
- Pod 2 has 1 Consumer (The Redis Client connection).
- Note: Even if your code defines 3 different handlers (A, B, C) for the event, they are all executed by the single Consumer inside that Pod.
The "Race" (Load Balancing)
When an event arrives in the Stream:
- Event Published: A new message is appended to the Stream.
- Redis Decision: Redis sees both Pod 1 and Pod 2 asking for new messages (using
XREADGROUPwith the special>ID). - Delivery: Redis picks ONE of them to handle this specific message.
- Let's say Pod 2 wins the "race" (or was next in line).
- Processing:
- Pod 2 receives the message immediately.
- Pod 2 executes all internal handlers (A, B, and C) for that message.
- Pod 2 acknowledges (
ACK) the message.
- The "Loser":
- Pod 1 receives nothing. It does not see this message at all.
- Pod 1 continues waiting for the next message.
Result: This is Load Balancing. Message 1 goes to Pod 2, Message 2 might go to Pod 1. They share the workload.
The "Polling" Mechanism (Blocking Pop)
You might wonder: "Do they query the file every X seconds?"
No, it's better than that. It uses a Blocking Pop mechanism (BLOCK 2000):
- Pod connects: It sends a command: "Give me new messages. If none, wait up to 2 seconds."
- Scenario A (Message exists): Redis returns the message instantly (sub-millisecond latency). It does not wait.
- Scenario B (Stream empty): Redis holds the connection open.
- If a message arrives at 1.5s, Redis pushes it immediately.
- If 2s pass with no message, Redis returns "empty", and the Pod immediately asks again.
This creates a Real-Time Push experience while avoiding the overhead of constant polling loops.
Best Practices for Consumer Groups
One Consumer Group per Microservice:
- You should have one Consumer Group per Microservice (e.g.,
coding-service,screenflow-service). - This allows each service to scale horizontally (load balancing events within the service) while still ensuring that different services all receive the same event (Pub/Sub).
- You should have one Consumer Group per Microservice (e.g.,
Naming:
- Name your group after the service, e.g.,
service-coding,service-screenflow. - Do NOT include random IDs or pod names in the group name, or you will lose the load balancing benefit and persistence.
- Name your group after the service, e.g.,
Persistence:
- Consumer Groups track the "last read" position server-side (on Redis cache instance).
- If all your pods crash, when they restart with the same group name, they pick up exactly where they left off.
Fan-Out vs Load Balancing
Fan-Out (Pub/Sub): To broadcast an event to multiple different applications, give each application a different Consumer Group name.
- Example:
coding-serviceandscreenflow-serviceboth subscribe toMedicalStayUpdated. Redis creates 2 copies of the event, one for each group.
- Example:
Load Balancing: To distribute work among instances of the same application, give them the same Consumer Group name.
- Example: 3 pods of
coding-serviceall share the groupcoding-service. Redis delivers the event to only one of the 3 pods.
- Example: 3 pods of