Skip to content

Recipes

Practical walkthroughs for situations that come up in production. Each recipe is self-contained.

Rebuild a projection from scratch

Your projection's read model schema changed, or a bug produced incorrect data. You need to start over.

Step 1: Delete the projection's checkpoint.

sql
DELETE FROM nagare_checkpoints WHERE subscription_id = 'book-projection';

Step 2: Drop or truncate the read model table.

sql
TRUNCATE TABLE book_read_models;

Step 3: Restart the service. The subscription starts from position zero and replays every event through your (now fixed) projection code. Upcasters apply automatically, so old events are transformed to the current schema.

The projection rebuilds in the background. The service is available immediately, but read model queries return incomplete results until the catch-up finishes. The SubscriptionsReadyHealthCheck reports unhealthy until the projection has caught up, so Kubernetes readiness probes can keep the service out of the load balancer during replay.

How long does it take?

It depends on your event count and projection complexity. A simple projection over 100,000 events rebuilds in seconds. A projection that makes HTTP calls per event will take longer. Keep projections fast and side-effect-free where you can.

Add a new field to an event

You need to add a field to an existing event. The approach depends on whether the field is optional or required.

Optional field (no upcaster needed)

If the field can be null or has a sensible default, add it to the record with a default value:

csharp
// Before
public record BookAdded(string Title, string Author) : BookEvent;

// After — existing events deserialize with Isbn = null
public record BookAdded(string Title, string Author, string? Isbn = null) : BookEvent;

Old events in the store don't have Isbn. System.Text.Json deserializes them with the default value. No upcaster needed.

Required field (upcaster needed)

If the field must always have a value and you can't derive it from existing data, you need a version change.

Step 1: Rename the current discriminator tag to include a version suffix.

csharp
// The old tag becomes "book-added-v1"
// The current tag stays "book-added"

Step 2: Write an upcaster that fills in the new field for old events:

csharp
public class BookAddedV1Upcaster : IEventUpcaster
{
    public string FromEventTag => "book-added-v1";
    public string ToEventTag => "book-added";

    public string Upcast(string jsonPayload)
    {
        var node = JsonNode.Parse(jsonPayload)!;
        node["Isbn"] ??= "";  // default for old events
        return node.ToJsonString();
    }
}

Step 3: Register the upcaster:

csharp
builder.Services.AddEventUpcaster<BookAddedV1Upcaster>();

Step 4: Update the event record:

csharp
public record BookAdded(string Title, string Author, string Isbn) : BookEvent;

Old events are transparently upgraded at read time. New events are written with the current schema.

See Event Versioning for more migration scenarios.

Handle eventual consistency in an API

A user submits a command and immediately loads the updated page. The projection hasn't processed the new event yet. The page shows stale data.

There are several ways to handle this.

Return the command result directly

If the API needs to show the updated state after a write, read it from the aggregate instead of the projection:

csharp
app.MapPost("/books/{id}/borrow", async (
    string id,
    BorrowRequest request,
    IAggregateRepository<BookAggregate, BookCommand, BookEvent, BookState> repo) =>
{
    var aggregate = await repo.Load(new AggregateId(id));
    var reply = await aggregate.Ask(new BorrowBook(request.BorrowerId));

    return reply.Match(
        onAccepted: () => Results.Ok(new { Status = "borrowed" }),
        onRejected: ex => Results.Conflict(new { Error = ex.Message }),
        onIgnored: () => Results.Ok(new { Status = "already borrowed" }));
});

The response comes from the aggregate's state, which is always consistent. The read model catches up in the background.

Optimistic UI updates

On the client side, assume the command succeeded and update the UI optimistically. When the projection catches up and the client next fetches data, it confirms what the UI already shows.

This works well for single-user interactions. It breaks down when multiple users watch the same data.

Poll with version

Include the aggregate version in the command response. The client polls the read model until it sees a version equal to or greater than what the command returned:

csharp
// Command response includes the version
app.MapPost("/books/{id}/borrow", async (...) =>
{
    var aggregate = await repo.Load(new AggregateId(id));
    var reply = await aggregate.Ask(new BorrowBook(request.BorrowerId));
    return Results.Ok(new { Version = aggregate.Version.Value });
});

// Read endpoint accepts a minimum version
app.MapGet("/books/{id}", async (string id, int? afterVersion, IBookRepository repo) =>
{
    var book = await repo.GetById(id);
    if (afterVersion.HasValue && book?.Version < afterVersion)
        return Results.StatusCode(202); // "not yet, try again"
    return Results.Ok(book);
});

Split one aggregate into two

Your aggregate has grown too large. Commands that should be independent cause concurrency conflicts because they share a stream. Time to split.

Step 1: Define the new aggregate with its own event type, state, and command set. Move the relevant events and command handlers from the old aggregate.

Step 2: Keep the old aggregate's handlers for the moved events, but make them no-ops in the state transition. This way, existing events in the old stream still replay correctly.

csharp
// Old aggregate keeps the handler but ignores the event
events.On<BookReservationPlaced>((state, _) => state);  // no-op

Step 3: For new commands that belong to the new aggregate, route them there. Old events stay in the old stream. New events go to the new stream.

Step 4: If you need the new aggregate to start with state derived from old events, write a one-time migration script that reads the old stream, extracts the relevant events, and issues commands against the new aggregate.

This is not a framework operation. It's a deployment decision. Nagare doesn't move events between streams because that would violate immutability.

Run projections on a separate process

Your API service handles commands. A separate worker service runs projections. Both connect to the same event store.

API service:

csharp
// Registers event store, snapshot store, and aggregate
builder.Services.AddPostgresAggregate<
    BookAggregate, BookCommand, BookEvent, BookState>();

// No subscriptions registered here

Worker service:

csharp
// Registers event store (read-only, for the subscription to poll)
builder.Services.AddPostgresEventStore<BookEvent>();
builder.Services.AddPostgresCheckpointStore();

// Subscriptions run here
builder.Services.AddPostgresSubscription<BookProjection, BookEvent>();
builder.Services.AddPostgresSubscription<BookSearchProjection, BookEvent>();

Both services point to the same database. The API writes events. The worker reads them. If you need distributed locking (multiple worker instances), register a lock provider on the worker.

This separation lets you scale the write side and read side independently.

Test an upcaster

Use the Given-When-Then harness with old-format events to verify that upcasters produce correct current-format state:

csharp
[Fact]
public void V1_book_added_is_upcasted_correctly()
{
    // The harness applies upcasters during replay
    Given(new BookAddedV1("Dune", "Herbert"))  // old event format
        .When(new BorrowBook("user-42"))
        .ThenExpect<BookBorrowed>(e =>
            e.BorrowerId.Should().Be("user-42"));
}

If your test infrastructure doesn't support old event types directly, you can test the upcaster in isolation:

csharp
[Fact]
public void Upcaster_renames_name_to_title()
{
    var upcaster = new BookAddedV1Upcaster();
    var oldJson = """{"Name": "Dune", "Author": "Herbert"}""";

    var newJson = upcaster.Upcast(oldJson);

    var node = JsonNode.Parse(newJson)!;
    node["Title"]!.GetValue<string>().Should().Be("Dune");
    node["Name"].Should().BeNull();
}

Add metadata to every command automatically

Instead of manually attaching EventMetadata at every call site, use a middleware:

csharp
public class MetadataMiddleware(IHttpContextAccessor http) : ICommandMiddleware
{
    public async Task<IReply> InvokeAsync(AskContext context, AskDelegate next)
    {
        var httpContext = http.HttpContext;
        var enriched = context with
        {
            Metadata = new EventMetadata(
                CorrelationId: httpContext?.TraceIdentifier,
                CausationId: $"http:{httpContext?.Request.Path}",
                UserId: httpContext?.User.FindFirst("sub")?.Value)
        };

        return await next(enriched);
    }
}

// Register once
builder.Services.AddCommandMiddleware<MetadataMiddleware>();

Every command now carries correlation, causation, and user identity without any changes to call sites.


Back to: Getting Started

流れ — flow.