Skip to content

Middleware & Advanced

Command middleware

Every command in Nagare passes through a middleware pipeline before reaching the aggregate. This follows the same pattern as ASP.NET Core middleware — a chain of handlers that can inspect, enrich, or short-circuit the request.

csharp
public class AuditMiddleware : ICommandMiddleware
{
    public async Task<IReply> InvokeAsync(AskContext context, AskDelegate next)
    {
        // Before: inspect or enrich the context
        var enriched = context.WithProperty("timestamp", DateTimeOffset.UtcNow);

        // Execute the next middleware or the command handler
        var reply = await next(enriched);

        // After: inspect the result
        return reply;
    }
}

AskContext is immutable — use WithProperty() to pass data down the pipeline without mutation.

Registration

csharp
builder.Services.AddCommandMiddleware<AuditMiddleware>();
builder.Services.AddCommandMiddleware<AuthorizationMiddleware>();

Middleware executes in registration order (first registered = outermost). This means AuditMiddleware wraps AuthorizationMiddleware, which wraps the aggregate's command handler.

Example: logging middleware

csharp
public class LoggingMiddleware(ILogger<LoggingMiddleware> logger) : ICommandMiddleware
{
    public async Task<IReply> InvokeAsync(AskContext context, AskDelegate next)
    {
        logger.LogInformation("Handling {CommandType} on {AggregateId}",
            context.Command.GetType().Name, context.AggregateId);

        var reply = await next(context);

        reply.Match(
            onAccepted: () => logger.LogInformation("Accepted"),
            onRejected: ex => logger.LogWarning("Rejected: {Reason}", ex.Message),
            onIgnored: () => logger.LogDebug("Ignored"));

        return reply;
    }
}

Common middleware patterns

PatternPurpose
AuditRecord who executed what command and when
AuthorizationCheck permissions before the command reaches the aggregate
ValidationValidate command data (format, ranges) before business rule checks
Rate limitingThrottle commands per aggregate or per user
RetryRetry on transient failures (optimistic concurrency conflicts)

Event upcasting

Events are immutable, but schemas evolve. When you need to change an event's structure, you have two options:

  1. Add a new event type with a version suffix (order-placed-v2) and update the aggregate to handle both
  2. Add an upcaster that transforms old events to the new format at read time

Upcasters are the cleaner option for most scenarios:

csharp
public class OrderPlacedV1ToV2Upcaster : IEventUpcaster
{
    public string FromEventTag => "order-placed-v1";
    public string ToEventTag => "order-placed";

    public string Upcast(string jsonPayload)
    {
        var node = JsonNode.Parse(jsonPayload)!;
        // V1 had "amount", V2 has "Price"
        node["Price"] = node["amount"]!.GetValue<decimal>();
        ((JsonObject)node).Remove("amount");
        return node.ToJsonString();
    }
}

// Register
builder.Services.AddEventUpcaster<OrderPlacedV1ToV2Upcaster>();

Upcasters are applied at read time — the stored events are never modified. This means:

  • Old and new events coexist in the same stream
  • Chain upcasters for multi-version evolution: V1 → V2 → V3 → current
  • Rolling back a deployment doesn't corrupt data
  • Different consumers can read at different schema versions

OpenTelemetry

Nagare instruments every operation with System.Diagnostics.Activity — zero external dependencies. Any OpenTelemetry collector picks them up automatically.

Setup

csharp
builder.Services.AddOpenTelemetry()
    .WithTracing(tracing => tracing
        .AddSource("Nagare")  // That's it
        .AddOtlpExporter());

Traced operations

OperationActivity name
Command handlingnagare.aggregate.ask
Event appendnagare.eventstore.append
Event readnagare.eventstore.read
Subscription processingnagare.subscription.handle
Checkpoint savenagare.subscription.checkpoint

Tags

TagDescription
nagare.aggregate.idThe aggregate ID
nagare.aggregate.typeThe aggregate type name
nagare.event.typeThe event type name
nagare.event.countNumber of events in operation
nagare.command.typeThe command type name
nagare.subscription.idThe subscription ID
nagare.positionEvent store position
nagare.versionAggregate version

These tags give you full observability into the event sourcing pipeline — from the command that triggered a write, through the events that were persisted, to the subscriptions that consumed them.

Event metadata

Attach tracing context to every command:

csharp
var metadata = new EventMetadata(
    CorrelationId: requestId,
    CausationId: $"http:{Request.Path}",
    UserId: currentUser.Id);

await aggregate.Ask(new PlaceOrder(...), metadata);

Metadata is persisted alongside events and available in projections via EventEnvelope<TEvent>.Metadata. This enables:

  • Correlation — trace a chain of events back to the original request
  • Causation — understand which command or event caused another
  • Audit — know who initiated every change in the system
  • Debugging — find all events triggered by a specific HTTP request

Next: Event Versioning

流れ — flow.