Skip to content

Event System - Subscriber

Overview

Subscribing to events involves using Redis Consumer Groups to read events from Redis Streams and route them to BullMQ queues for reliable, asynchronous processing. This architecture supports horizontal scaling natively.

Add a Subscriber

Step 1: Create a BullMQ Consumer

Create a consumer to process the events forwarded by the bridge.

Consumer Implementation

typescript
import { Processor } from '@nestjs/bullmq';
import { Inject } from '@nestjs/common';
import {
  BaseDomainConsumerErrorHandler,
  BullMqDomainAwareConsumer,
  BullMqJobs,
  BullMqQueues,
  BullMqQueueService,
  CustomLoggerService,
  DOMAIN_CONSUMER_ERROR_HANDLER,
  getQueueOptions,
} from '@package/nest';
import { Job } from 'bullmq';

/**
 * Dumb Subscriber Consumer
 *
 * A simple consumer that logs events received from Redis Stream via BullMQ.
 * This is used to test the Redis Stream integration.
 */
@Processor(BullMqQueues.DUMB_SUBSCRIBER_QUEUE, {
  ...getQueueOptions[BullMqQueues.DUMB_SUBSCRIBER_QUEUE].workerOptions,
})
export class DumbSubscriberConsumer extends BullMqDomainAwareConsumer {
  constructor(
    bullMqQueueService: BullMqQueueService,
    @Inject(DOMAIN_CONSUMER_ERROR_HANDLER)
    domainErrorHandler: BaseDomainConsumerErrorHandler,
    private readonly loggerService: CustomLoggerService,
  ) {
    super(bullMqQueueService, domainErrorHandler);
  }

  async processJob(job: Job): Promise<void> {
    switch (job.name) {
      case BullMqJobs.DUMB_SUBSCRIBER_JOB: {
        const data = job.data;

        this.loggerService.log('🎉 DumbSubscriberConsumer received event!', {
          jobId: job.id,
          jobName: job.name,
          eventData: data,
          eventType: data._redisEvent?.name,
          timestamp: data._redisEvent?.timestamp,
          correlationId: data.correlationId,
        });

        // Just log the event for now
        this.loggerService.log('📦 Event payload:', {
          payload: JSON.stringify(data, null, 2),
        });

        break;
      }
      default:
        throw new Error(`No job processor found for job ${job.name}`);
    }
  }
}

Consumer Best Practices

  • Extend BullMqDomainAwareConsumer for automatic error handling
  • Log correlation IDs for distributed tracing
  • Keep consumers focused on a single responsibility
  • Handle errors gracefully (failed jobs will be retried)
  • Use _redisEvent metadata for debugging and observability
  • Inject use cases or services, don't create business logic in consumers

Step 2: Configure Subscriptions

Define your subscriptions in the framework's RedisBullMqStreamSubscriber class by editing the getSubscriptions() method.

Location

packages/nest/src/framework/redis-stream/redis-bullmq-subscriber/redis-bullmq-stream-subscriber.ts

Subscription Configuration

The getSubscriptions() method maps Redis channels (streams) to BullMQ jobs:

typescript
private getSubscriptions(): EventHandlerMap {
    return this.redisToBullMq.createMultipleHandlers({
        [EventPublisherChannel.DOMAIN_CODING_OPTIMIZATION_EVALUATED]: [
            {
                jobName: BullMqJobs.DUMB_SUBSCRIBER_JOB,
                priority: BullMqJobPriority.NORMAL,
            },
        ],
        [EventPublisherChannel.DOMAIN_CODING_ORCHESTRATOR_RE_EVALUATED_FROM_IGNORED]: [
            {
                jobName: BullMqJobs.DUMB_SUBSCRIBER_JOB,
                priority: BullMqJobPriority.NORMAL,
            },
        ],
    })
}

Subscription Structure

Each subscription consists of:

  1. Channel - The Redis stream to subscribe to (from EventPublisherChannel enum)
  2. Handlers Array - One or more job configurations for that channel
  3. Job Configuration:
    • jobName: The BullMQ job to create when an event is received
    • priority: Optional priority for the job (HIGH, NORMAL, LOW)

Multiple Handlers Per Channel

You can create multiple BullMQ subscribers from a single event:

typescript
[EventPublisherChannel.DOMAIN_CODING_OPTIMIZATION_EVALUATED]: [
    {
        jobName: BullMqJobs.UPDATE_MEDICAL_STAY_JOB,
        priority: BullMqJobPriority.HIGH,      // Process first
    },
    {
        jobName: BullMqJobs.SEND_NOTIFICATION_JOB,
        priority: BullMqJobPriority.NORMAL,    // Process after
    },
    {
        jobName: BullMqJobs.UPDATE_ANALYTICS_JOB,
        priority: BullMqJobPriority.LOW,       // Process last
    },
],

Understanding the Bridge between Redis Stream and BullMQ

The Redis-to-BullMQ Bridge is provided by the framework and works automatically.

How It Works

  1. Receives a Redis event from the stream subscriber
  2. Extracts the event payload
  3. Adds metadata (event name, timestamp, correlationId)
  4. Creates a BullMQ job with the specified priority
  5. Adds the job to the appropriate queue

Job Data Structure

The bridge enriches the job data with Redis event metadata:

typescript
{
    // Original event payload (spread)
    optimizationId: "123",
    tenantId: "tenant-1",
    medicalStayId: "stay-1",
    // ... other payload properties

    // Added by bridge
    jobName: 'PROCESS_OPTIMIZATION_JOB',
    _redisEvent: {
        name: 'domain_coding_optimization_evaluated',
        timestamp: 1234567890,
        correlationId: '123',
    }
}

Subscription Flow

CRITICAL: A service only receives events if it explicitly imports the RedisBullMqStreamSubscriberModule in its app.module.ts.

  1. Module Registration in app.module.ts: The subscribing service imports RedisBullMqStreamSubscriberModule in its root module

    typescript
    // services/coding/src/app.module.ts
    @Module({
      imports: [
        // ... other imports
        RedisBullMqStreamSubscriberModule.registerAsync({}),
      ],
    })
    export class AppModule {}
    • This is required for the service to receive events
    • Without this import, the service won't subscribe to any channels
    • The service must also configure REDIS_STREAM_CONSUMER_GROUP.
  2. Subscriber Initialization: When the application starts, RedisBullMqStreamSubscriber initializes (via OnModuleInit)

    • Creates a Redis client connection dedicated to subscribing
    • Loads all the subscriptions for each channel (stream)
    • Ensures Consumer Groups exist for each stream
  3. Event Reception: The subscriber client polls the streams using XREADGROUP:

    • Reads messages assigned to its consumer group
    • Deserializes the JSON message into an IEvent
    • Invokes each handler
    • Acknowledges (XACK) the message upon successful handling
  4. Bridge to BullMQ: Each handler (created by RedisEventToBullMqBridge) transforms the event and queues it in BullMQ.

Horizontal Scaling with Consumer Groups (Mental Model)

To understand how scaling works, it's important to distinguish between the Service Pods (physical consumers) and the Internal Handlers.

The Scenario

Imagine you have a service coding-service that subscribes to an event. You scale this service to 2 Pods (Pod 1 and Pod 2) for high availability.

  • Stream (The Source): A Redis Stream is like an append-only log file (hence just a classical log file). Each published event is a new row with an ID (a date).
  • Consumer Group: Both pods belong to the same group (e.g., service-coding).
  • Consumers (The Workers):
    • Pod 1 has 1 Consumer (The Redis Client connection).
    • Pod 2 has 1 Consumer (The Redis Client connection).
    • Note: Even if your code defines 3 different handlers (A, B, C) for the event, they are all executed by the single Consumer inside that Pod.

The "Race" (Load Balancing)

When an event arrives in the Stream:

  1. Event Published: A new message is appended to the Stream.
  2. Redis Decision: Redis sees both Pod 1 and Pod 2 asking for new messages (using XREADGROUP with the special > ID).
  3. Delivery: Redis picks ONE of them to handle this specific message.
    • Let's say Pod 2 wins the "race" (or was next in line).
  4. Processing:
    • Pod 2 receives the message immediately.
    • Pod 2 executes all internal handlers (A, B, and C) for that message.
    • Pod 2 acknowledges (ACK) the message.
  5. The "Loser":
    • Pod 1 receives nothing. It does not see this message at all.
    • Pod 1 continues waiting for the next message.

Result: This is Load Balancing. Message 1 goes to Pod 2, Message 2 might go to Pod 1. They share the workload.

The "Polling" Mechanism (Blocking Pop)

You might wonder: "Do they query the file every X seconds?"

No, it's better than that. It uses a Blocking Pop mechanism (BLOCK 2000):

  1. Pod connects: It sends a command: "Give me new messages. If none, wait up to 2 seconds."
  2. Scenario A (Message exists): Redis returns the message instantly (sub-millisecond latency). It does not wait.
  3. Scenario B (Stream empty): Redis holds the connection open.
    • If a message arrives at 1.5s, Redis pushes it immediately.
    • If 2s pass with no message, Redis returns "empty", and the Pod immediately asks again.

This creates a Real-Time Push experience while avoiding the overhead of constant polling loops.

Best Practices for Consumer Groups

  1. One Consumer Group per Microservice:

    • You should have one Consumer Group per Microservice (e.g., coding-service, screenflow-service).
    • This allows each service to scale horizontally (load balancing events within the service) while still ensuring that different services all receive the same event (Pub/Sub).
  2. Naming:

    • Name your group after the service, e.g., service-coding, service-screenflow.
    • Do NOT include random IDs or pod names in the group name, or you will lose the load balancing benefit and persistence.
  3. Persistence:

    • Consumer Groups track the "last read" position server-side (on Redis cache instance).
    • If all your pods crash, when they restart with the same group name, they pick up exactly where they left off.

Fan-Out vs Load Balancing

  • Fan-Out (Pub/Sub): To broadcast an event to multiple different applications, give each application a different Consumer Group name.

    • Example: coding-service and screenflow-service both subscribe to MedicalStayUpdated. Redis creates 2 copies of the event, one for each group.
  • Load Balancing: To distribute work among instances of the same application, give them the same Consumer Group name.

    • Example: 3 pods of coding-service all share the group coding-service. Redis delivers the event to only one of the 3 pods.