Middleware & Advanced
Command middleware
Every command in Nagare passes through a middleware pipeline before reaching the aggregate. This follows the same pattern as ASP.NET Core middleware — a chain of handlers that can inspect, enrich, or short-circuit the request.
public class AuditMiddleware : ICommandMiddleware
{
public async Task<IReply> InvokeAsync(AskContext context, AskDelegate next)
{
// Before: inspect or enrich the context
var enriched = context.WithProperty("timestamp", DateTimeOffset.UtcNow);
// Execute the next middleware or the command handler
var reply = await next(enriched);
// After: inspect the result
return reply;
}
}AskContext is immutable — use WithProperty() to pass data down the pipeline without mutation.
Registration
builder.Services.AddCommandMiddleware<AuditMiddleware>();
builder.Services.AddCommandMiddleware<AuthorizationMiddleware>();Middleware executes in registration order (first registered = outermost). This means AuditMiddleware wraps AuthorizationMiddleware, which wraps the aggregate's command handler.
Example: logging middleware
public class LoggingMiddleware(ILogger<LoggingMiddleware> logger) : ICommandMiddleware
{
public async Task<IReply> InvokeAsync(AskContext context, AskDelegate next)
{
logger.LogInformation("Handling {CommandType} on {AggregateId}",
context.Command.GetType().Name, context.AggregateId);
var reply = await next(context);
reply.Match(
onAccepted: () => logger.LogInformation("Accepted"),
onRejected: ex => logger.LogWarning("Rejected: {Reason}", ex.Message),
onIgnored: () => logger.LogDebug("Ignored"));
return reply;
}
}Common middleware patterns
| Pattern | Purpose |
|---|---|
| Audit | Record who executed what command and when |
| Authorization | Check permissions before the command reaches the aggregate |
| Validation | Validate command data (format, ranges) before business rule checks |
| Rate limiting | Throttle commands per aggregate or per user |
| Retry | Retry on transient failures (optimistic concurrency conflicts) |
Event upcasting
Events are immutable, but schemas evolve. When you need to change an event's structure, you have two options:
- Add a new event type with a version suffix (
order-placed-v2) and update the aggregate to handle both - Add an upcaster that transforms old events to the new format at read time
Upcasters are the cleaner option for most scenarios:
public class OrderPlacedV1ToV2Upcaster : IEventUpcaster
{
public string FromEventTag => "order-placed-v1";
public string ToEventTag => "order-placed";
public string Upcast(string jsonPayload)
{
var node = JsonNode.Parse(jsonPayload)!;
// V1 had "amount", V2 has "Price"
node["Price"] = node["amount"]!.GetValue<decimal>();
((JsonObject)node).Remove("amount");
return node.ToJsonString();
}
}
// Register
builder.Services.AddEventUpcaster<OrderPlacedV1ToV2Upcaster>();Upcasters are applied at read time — the stored events are never modified. This means:
- Old and new events coexist in the same stream
- Chain upcasters for multi-version evolution: V1 → V2 → V3 → current
- Rolling back a deployment doesn't corrupt data
- Different consumers can read at different schema versions
OpenTelemetry
Nagare instruments every operation with System.Diagnostics.Activity — zero external dependencies. Any OpenTelemetry collector picks them up automatically.
Setup
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource("Nagare") // That's it
.AddOtlpExporter());Traced operations
| Operation | Activity name |
|---|---|
| Command handling | nagare.aggregate.ask |
| Event append | nagare.eventstore.append |
| Event read | nagare.eventstore.read |
| Subscription processing | nagare.subscription.handle |
| Checkpoint save | nagare.subscription.checkpoint |
Tags
| Tag | Description |
|---|---|
nagare.aggregate.id | The aggregate ID |
nagare.aggregate.type | The aggregate type name |
nagare.event.type | The event type name |
nagare.event.count | Number of events in operation |
nagare.command.type | The command type name |
nagare.subscription.id | The subscription ID |
nagare.position | Event store position |
nagare.version | Aggregate version |
These tags give you full observability into the event sourcing pipeline — from the command that triggered a write, through the events that were persisted, to the subscriptions that consumed them.
Event metadata
Attach tracing context to every command:
var metadata = new EventMetadata(
CorrelationId: requestId,
CausationId: $"http:{Request.Path}",
UserId: currentUser.Id);
await aggregate.Ask(new PlaceOrder(...), metadata);Metadata is persisted alongside events and available in projections via EventEnvelope<TEvent>.Metadata. This enables:
- Correlation — trace a chain of events back to the original request
- Causation — understand which command or event caused another
- Audit — know who initiated every change in the system
- Debugging — find all events triggered by a specific HTTP request
Next: Event Versioning