Skip to content

Messaging

Within a bounded context, projections subscribe directly to the event store. No broker needed. But when events need to cross context boundaries, you need durable delivery. If the process crashes between emitting and handling, an in-memory approach loses the event.

Nagare's messaging layer provides transport-agnostic abstractions for publishing and consuming integration events. You write handlers and mappers once; the transport is a single registration line.

The two packages

PackagePurpose
Nagare.MessagingChannel, publisher, handler, and mapper abstractions
Nagare.Messaging.KafkaApache Kafka transport using the Confluent client
bash
dotnet add package Nagare.Messaging
dotnet add package Nagare.Messaging.Kafka

Defining a channel

A channel names the conduit that messages flow through. In Kafka this maps to a topic. In other transports it maps to whatever the equivalent is — a queue, an exchange, a subscription.

csharp
public class BookChannel : IMessageChannel<BookIntegrationEvent>
{
    public string ChannelName => "book-integration-events";
}

The generic parameter is the message type that flows through this channel.

Publishing events

The typical pattern is a mapper that reads domain events from the event store and transforms them into integration messages. Define the mapper:

csharp
public class BookMessageMapper : IMessageMapper<BookEvent, BookIntegrationEvent>
{
    public BookIntegrationEvent? Map(BookEvent @event, EventMapperContext context)
    {
        return @event switch
        {
            BookBorrowed e => new BookBorrowedIntegration(
                context.AggregateId, e.BorrowerId),
            BookReturned _ => new BookReturnedIntegration(context.AggregateId),
            _ => null  // filter out events we don't publish
        };
    }
}

Returning null filters an event out — it won't be published.

The EventMapperContext gives you the aggregate ID, position, timestamp, and correlation metadata from the source event, without coupling the mapper to the full EventEnvelope type.

Registration

csharp
builder.Services.AddMessageProducer<
    BookMessageMapper,          // the mapper
    BookEvent,                  // source (domain) event type
    BookChannel,                // the channel
    BookIntegrationEvent        // target (integration) message type
>();

This registers a subscription on the BookEvent stream. Each event passes through the mapper; non-null results are published to the channel.

Consuming messages

On the other side, a bounded context subscribes to the channel and reacts:

csharp
public class InventoryBookHandler : IMessageHandler<BookIntegrationEvent>
{
    private readonly IAggregateRepository<InventoryAggregate,
        InventoryCommand, InventoryEvent, InventoryState> _repo;

    public InventoryBookHandler(IAggregateRepository<InventoryAggregate,
        InventoryCommand, InventoryEvent, InventoryState> repo) => _repo = repo;

    public async Task Handle(
        BookIntegrationEvent message, MessageContext context, CancellationToken ct)
    {
        if (message is BookBorrowedIntegration borrowed)
        {
            var aggregate = await _repo.Load(new AggregateId(borrowed.BookId));
            await aggregate.Ask(new MarkAsUnavailable());
        }
    }
}

The handler receives the message directly — no envelope wrapping. The MessageContext carries the channel name, message ID, timestamp, and any metadata.

Registration

csharp
builder.Services.AddMessageSubscription<
    InventoryBookHandler,
    BookIntegrationEvent,
    BookChannel
>(options =>
{
    options.GroupId = "inventory-service";
    options.BatchSize = 100;
    options.IdleDelay = TimeSpan.FromMilliseconds(500);
});

The GroupId maps to whatever competing-consumer mechanism the transport provides (Kafka consumer groups, Service Bus subscriptions, etc.). Multiple instances of the same service share the same group, so each message is processed once.

Kafka transport

csharp
builder.Services.AddNagareKafkaTransport(builder.Configuration);

In appsettings.json:

json
{
  "Kafka": {
    "BootstrapServers": "localhost:9092",
    "Namespace": "my-app"
  }
}

For Azure Event Hubs, use the fully qualified namespace:

json
{
  "Kafka": {
    "FullyQualifiedNamespace": "my-eventhub.servicebus.windows.net",
    "Namespace": "my-app"
  }
}

Additional Kafka properties (SASL, SSL) go in the Properties dictionary:

json
{
  "Kafka": {
    "BootstrapServers": "broker:9092",
    "Namespace": "my-app",
    "Properties": {
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "PLAIN",
      "sasl.username": "$ConnectionString",
      "sasl.password": "your-connection-string"
    }
  }
}

How it fits together

The full flow from domain event to cross-context reaction:

  1. Aggregate emits BookBorrowed (domain event, stored in the event store)
  2. MessageProducer picks it up via an event store subscription, maps it to BookBorrowedIntegration
  3. BookBorrowedIntegration is published to the book-integration-events channel
  4. InventoryBookHandler consumes it and issues a command to the Inventory aggregate

Domain events stay internal. Integration events cross boundaries. The transport provides durability, ordering, and replay.

Why a broker and not a direct subscription? If you use in-process subscriptions between bounded contexts, you create invisible coupling — one context reading directly from another's event store. When you eventually extract a context into its own service, you discover that half its behaviour depends on another context's internal streams. That's not a boundary, it's a shared database with extra steps. A broker enforces the separation from day one. See Modular Monolith — Two levels of communication for the full architectural argument.

Swapping transports

The channel, mapper, and handler stay the same regardless of transport. Only the registration line changes:

csharp
// Apache Kafka
builder.Services.AddNagareKafkaTransport(builder.Configuration);

// In-memory for integration tests (no broker needed)
builder.Services.AddInMemoryTransport();

// Future: Azure Service Bus, RabbitMQ
// builder.Services.AddNagareServiceBusTransport(builder.Configuration);

Testing with InMemoryTransport

For integration tests, replace Kafka with the built-in in-memory transport. Messages flow through System.Threading.Channels — no broker, no containers, no latency.

csharp
builder.Services.AddInMemoryTransport();

The InMemoryTransport instance is available from DI for test assertions:

csharp
var transport = serviceProvider.GetRequiredService<InMemoryTransport>();

// Assert messages were published
var published = transport.GetPublished("book-integration-events");
Assert.Single(published);

// Wait for async producers to catch up
var messages = await transport.DrainAsync("book-integration-events", expected: 3);

// Reset between tests
transport.Clear();

Each consumer gets its own copy of every message (fan-out), matching Kafka's consumer-group semantics.

Domain events are private

This is the most important rule in the messaging layer, and the one most likely to be violated when moving fast.

Domain events (BookBorrowed, BookReturned, BookLost) belong to the aggregate. They are the source of truth in the event store, they feed projections within the context, and they can evolve freely through upcasters. No other context can depend on them.

Integration events (BookBorrowedIntegration) are a public contract. They live in a shared contract assembly, they travel over the broker, and changing their schema is a breaking change for every consumer.

These are separate types for a reason. A domain event carries whatever the aggregate needs for internal state transitions. An integration event carries whatever the consuming context needs — often less, sometimes shaped differently. The mapper is the translation layer between the two.

Internal                              Public
────────                              ──────
BookAdded(Title, Author, Isbn)   →    (not published — internal to the library context)
BookBorrowed(BorrowerId, At)     →    BookBorrowedIntegration(BookId, BorrowerId)
BookReturned(ReturnedAt)         →    BookReturnedIntegration(BookId)
BookLost(ReportedAt)             →    (not published — handled internally)

If you find yourself passing a domain event type to AddMessageProducer as both the source and the target, you're exposing internals. Create a separate integration type.


Design guidelines

  1. Keep integration events coarse-grained. They're contracts between bounded contexts. BookBorrowed is useful. BookBorrowerIdFieldUpdated is not.

  2. Don't share types across contexts. The publishing context defines the integration event. The consuming context can define its own type that deserializes from the same JSON.

  3. Use the mapper to filter. Not every domain event needs to leave the context. Return null from Map for events that are purely internal.

  4. One channel per bounded context is a good starting point. You can split later if different consumers need different ordering guarantees.


Next: Modular Monolith

流れ — flow.