Skip to content

Event System - Publisher

Overview

Publishing domain events follows a clear path from the domain layer through infrastructure adapters to Redis. This guide explains how to properly publish events in our event-driven architecture.

Add a Publisher

Step 1: Define Domain Events

Domain events should be defined in the domain layer and extend BaseEvent. Events represent something that has happened in the domain.

Location Pattern

services/{service-name}/src/domain-layer/{aggregate}/

Example: services/coding/src/domain-layer/optimization/optimization.events.ts

Step 2: Create Event Publisher Channel

Channels must be unique across all events in the entire system. Define them in the shared framework package.

Location

packages/nest/src/framework/events/event-publisher-channels.enum.ts

Channel Definition

typescript
/**
 * Event publisher channels have to be unique across all events
 */
export enum EventPublisherChannel {
  DOMAIN_CODING_OPTIMIZATION_EVALUATED = 'domain_coding_optimization_evaluated',
  DOMAIN_CODING_MEDICAL_STAY_OPTIMIZATION_EVALUATED = 'domain_coding_medical_stay_optimization_evaluated',

  // Add your new channel here
  DOMAIN_CODING_MEDICAL_STAY_CREATED = 'domain_coding_medical_stay_created',
}

Naming Convention

Format: DOMAIN_{SERVICE}_{AGGREGATE}_{EVENT_NAME}

Examples:

  • DOMAIN_CODING_OPTIMIZATION_EVALUATED - Coding service, Optimization aggregate, Evaluated event
  • DOMAIN_SCREENFLOW_RUN_STARTED - Screenflow service, Run aggregate, Started event
  • DOMAIN_VALUATION_PRICING_CALCULATED - Valuation service, Pricing aggregate, Calculated event

Rules:

  • Always use lowercase with underscores
  • Prefix with DOMAIN_ to indicate domain events
  • Include the service name for clarity
  • Use the aggregate name (singular)
  • Use past tense for the event name

Step 3: Add your channel in the Subscriptions Map

packages/nest/src/framework/redis-stream/redis-bullmq-subscriber/redis-bullmq-stream-subscriber.ts

Add the PublisherChannel to getSubscriptions() with an empty list, as no subscribers exist yet:

typescript
private getSubscriptions(): EventHandlerMap {
    return this.redisToBullMq.createMultipleHandlers({
        // Existing subscriptions...

        // Add new subscription
        [EventPublisherChannel.YOUR_NEW_EVENT]: [],
    })
}

Step 4: Map Domain Events to Channels

Complete the mapper in your service's infrastructure layer to connect domain events to their channels.

Location Pattern

services/{service-name}/src/infrastructure-layer/events/coding-event-producer/

Example: services/coding/src/infrastructure-layer/events/coding-event-producer/coding-event-producer.mapper.ts

Mapper Implementation

typescript
function getChannelNameForEvent(
  event: BaseEvent<unknown>,
): EventPublisherChannel {
  const eventType = event.eventType as CodingDomainEventType;

  switch (eventType) {
    case OptimizationEventType.DOMAIN_CODING_OPTIMIZATION_EVALUATED:
      return EventPublisherChannel.DOMAIN_CODING_OPTIMIZATION_EVALUATED;
    case MedicalStayEventType.DOMAIN_CODING_MEDICAL_STAY_OPTIMIZATION_EVALUATED:
      return EventPublisherChannel.DOMAIN_CODING_ORCHESTRATOR_RE_EVALUATED_FROM_IGNORED;
    default:
      assertUnreachable(eventType);
  }
}

Key Points:

  • Returns null for unmapped events (won't be published to Redis)
  • Maps event types to channel names

Publishing Flow

  1. Module Registration: The publishing service imports RedisStreamPublisherModule in one of its modules

    • Example: ParallelEventProducerModule imports RedisStreamPublisherModule.registerAsync({})
    • This creates a Redis client connection dedicated to publishing
  2. Event Creation: A domain service creates a domain event (e.g., OptimizationEvaluatedEvent)

  3. Event Transformation: The ParallelEventProducerProvider transforms the domain event into a Redis message and maps it to the correct channel.

  4. Publishing to Redis: The RedisStreamPublisherClient publishes the event to Redis:

    • The message is a JSON-serialized event stored in the stream
    • Uses pipeline for batch publishing (multiple events at once)
    • Applies a retention policy (e.g. 1 hour) to keep the stream size manageable
  5. Redis Stream: Redis stores the event in the Stream.

    • Events are persisted according to the retention policy.
    • Consumers read events from the stream.