Messaging
Within a bounded context, projections subscribe directly to the event store. No broker needed. But when events need to cross context boundaries, you need durable delivery. If the process crashes between emitting and handling, an in-memory approach loses the event.
Nagare's messaging layer provides transport-agnostic abstractions for publishing and consuming integration events. You write handlers and mappers once; the transport is a single registration line.
The two packages
| Package | Purpose |
|---|---|
Nagare.Messaging | Channel, publisher, handler, and mapper abstractions |
Nagare.Messaging.Kafka | Apache Kafka transport using the Confluent client |
dotnet add package Nagare.Messaging
dotnet add package Nagare.Messaging.KafkaDefining a channel
A channel names the conduit that messages flow through. In Kafka this maps to a topic. In other transports it maps to whatever the equivalent is — a queue, an exchange, a subscription.
public class BookChannel : IMessageChannel<BookIntegrationEvent>
{
public string ChannelName => "book-integration-events";
}The generic parameter is the message type that flows through this channel.
Publishing events
The typical pattern is a mapper that reads domain events from the event store and transforms them into integration messages. Define the mapper:
public class BookMessageMapper : IMessageMapper<BookEvent, BookIntegrationEvent>
{
public BookIntegrationEvent? Map(BookEvent @event, EventMapperContext context)
{
return @event switch
{
BookBorrowed e => new BookBorrowedIntegration(
context.AggregateId, e.BorrowerId),
BookReturned _ => new BookReturnedIntegration(context.AggregateId),
_ => null // filter out events we don't publish
};
}
}Returning null filters an event out — it won't be published.
The EventMapperContext gives you the aggregate ID, position, timestamp, and correlation metadata from the source event, without coupling the mapper to the full EventEnvelope type.
Registration
builder.Services.AddMessageProducer<
BookMessageMapper, // the mapper
BookEvent, // source (domain) event type
BookChannel, // the channel
BookIntegrationEvent // target (integration) message type
>();This registers a subscription on the BookEvent stream. Each event passes through the mapper; non-null results are published to the channel.
Consuming messages
On the other side, a bounded context subscribes to the channel and reacts:
public class InventoryBookHandler : IMessageHandler<BookIntegrationEvent>
{
private readonly IAggregateRepository<InventoryAggregate,
InventoryCommand, InventoryEvent, InventoryState> _repo;
public InventoryBookHandler(IAggregateRepository<InventoryAggregate,
InventoryCommand, InventoryEvent, InventoryState> repo) => _repo = repo;
public async Task Handle(
BookIntegrationEvent message, MessageContext context, CancellationToken ct)
{
if (message is BookBorrowedIntegration borrowed)
{
var aggregate = await _repo.Load(new AggregateId(borrowed.BookId));
await aggregate.Ask(new MarkAsUnavailable());
}
}
}The handler receives the message directly — no envelope wrapping. The MessageContext carries the channel name, message ID, timestamp, and any metadata.
Registration
builder.Services.AddMessageSubscription<
InventoryBookHandler,
BookIntegrationEvent,
BookChannel
>(options =>
{
options.GroupId = "inventory-service";
options.BatchSize = 100;
options.IdleDelay = TimeSpan.FromMilliseconds(500);
});The GroupId maps to whatever competing-consumer mechanism the transport provides (Kafka consumer groups, Service Bus subscriptions, etc.). Multiple instances of the same service share the same group, so each message is processed once.
Kafka transport
builder.Services.AddNagareKafkaTransport(builder.Configuration);In appsettings.json:
{
"Kafka": {
"BootstrapServers": "localhost:9092",
"Namespace": "my-app"
}
}For Azure Event Hubs, use the fully qualified namespace:
{
"Kafka": {
"FullyQualifiedNamespace": "my-eventhub.servicebus.windows.net",
"Namespace": "my-app"
}
}Additional Kafka properties (SASL, SSL) go in the Properties dictionary:
{
"Kafka": {
"BootstrapServers": "broker:9092",
"Namespace": "my-app",
"Properties": {
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "$ConnectionString",
"sasl.password": "your-connection-string"
}
}
}How it fits together
The full flow from domain event to cross-context reaction:
- Aggregate emits
BookBorrowed(domain event, stored in the event store) MessageProducerpicks it up via an event store subscription, maps it toBookBorrowedIntegrationBookBorrowedIntegrationis published to thebook-integration-eventschannelInventoryBookHandlerconsumes it and issues a command to the Inventory aggregate
Domain events stay internal. Integration events cross boundaries. The transport provides durability, ordering, and replay.
Why a broker and not a direct subscription? If you use in-process subscriptions between bounded contexts, you create invisible coupling — one context reading directly from another's event store. When you eventually extract a context into its own service, you discover that half its behaviour depends on another context's internal streams. That's not a boundary, it's a shared database with extra steps. A broker enforces the separation from day one. See Modular Monolith — Two levels of communication for the full architectural argument.
Swapping transports
The channel, mapper, and handler stay the same regardless of transport. Only the registration line changes:
// Apache Kafka
builder.Services.AddNagareKafkaTransport(builder.Configuration);
// In-memory for integration tests (no broker needed)
builder.Services.AddInMemoryTransport();
// Future: Azure Service Bus, RabbitMQ
// builder.Services.AddNagareServiceBusTransport(builder.Configuration);Testing with InMemoryTransport
For integration tests, replace Kafka with the built-in in-memory transport. Messages flow through System.Threading.Channels — no broker, no containers, no latency.
builder.Services.AddInMemoryTransport();The InMemoryTransport instance is available from DI for test assertions:
var transport = serviceProvider.GetRequiredService<InMemoryTransport>();
// Assert messages were published
var published = transport.GetPublished("book-integration-events");
Assert.Single(published);
// Wait for async producers to catch up
var messages = await transport.DrainAsync("book-integration-events", expected: 3);
// Reset between tests
transport.Clear();Each consumer gets its own copy of every message (fan-out), matching Kafka's consumer-group semantics.
Domain events are private
This is the most important rule in the messaging layer, and the one most likely to be violated when moving fast.
Domain events (BookBorrowed, BookReturned, BookLost) belong to the aggregate. They are the source of truth in the event store, they feed projections within the context, and they can evolve freely through upcasters. No other context can depend on them.
Integration events (BookBorrowedIntegration) are a public contract. They live in a shared contract assembly, they travel over the broker, and changing their schema is a breaking change for every consumer.
These are separate types for a reason. A domain event carries whatever the aggregate needs for internal state transitions. An integration event carries whatever the consuming context needs — often less, sometimes shaped differently. The mapper is the translation layer between the two.
Internal Public
──────── ──────
BookAdded(Title, Author, Isbn) → (not published — internal to the library context)
BookBorrowed(BorrowerId, At) → BookBorrowedIntegration(BookId, BorrowerId)
BookReturned(ReturnedAt) → BookReturnedIntegration(BookId)
BookLost(ReportedAt) → (not published — handled internally)If you find yourself passing a domain event type to AddMessageProducer as both the source and the target, you're exposing internals. Create a separate integration type.
Design guidelines
Keep integration events coarse-grained. They're contracts between bounded contexts.
BookBorrowedis useful.BookBorrowerIdFieldUpdatedis not.Don't share types across contexts. The publishing context defines the integration event. The consuming context can define its own type that deserializes from the same JSON.
Use the mapper to filter. Not every domain event needs to leave the context. Return
nullfromMapfor events that are purely internal.One channel per bounded context is a good starting point. You can split later if different consumers need different ordering guarantees.
Next: Modular Monolith