Skip to content

Store Adapters

Nagare separates the what from the where. Your aggregates, events, and projections are database-agnostic — the store adapter handles the dialect-specific details.

The framework ships with adapters for the most common databases. Adding your own is a matter of overriding a few methods.

Built-in backends

PackageDatabaseSubscriptionsDistributed locksNotes
Nagare.PostgreSqlPostgreSQLReal-timeAdvisory locksRecommended. LISTEN/NOTIFY for instant subscription wake-up
Nagare.SqlServerSQL ServerPollingsp_getapplock
Nagare.MySqlMySQL / MariaDBPollingGET_LOCK
Nagare.SqliteSQLitePollingNone (single process)Development only

Choosing a backend

  • Development: SQLite — zero setup, fast, runs anywhere
  • Production (recommended): PostgreSQL — real-time subscriptions via LISTEN/NOTIFY, advisory locks, strongest JSON support
  • Production (Microsoft stack): SQL Server — polling subscriptions, full feature support
  • Production (existing MySQL): MySQL — polling subscriptions, distributed locks via GET_LOCK

Adding your own

Nagare's store layer uses the template method pattern. Base classes in the core Nagare package implement all the shared logic — append, read, archive, purge, checkpoint tracking. Database adapters override only the dialect-specific parts.

Event store

Extend RelationalEventStore<TEvent> and override four members:

csharp
public class MyDbEventStore<TEvent> : RelationalEventStore<TEvent>
    where TEvent : class, IAggregateEvent<TEvent>
{
    public MyDbEventStore(DbConnection connection, EventStoreSemaphore semaphore,
        string tableName, EventUpcasterChain? upcasterChain = null)
        : base(connection, semaphore, tableName, upcasterChain) { }

    protected override void CreateDatabaseStructure()
    {
        _connection.Execute($"""
            CREATE TABLE IF NOT EXISTS {_tableName} (
                Position BIGINT PRIMARY KEY,
                AggregateId VARCHAR(255) NOT NULL,
                EventType VARCHAR(255) NOT NULL,
                EventTag VARCHAR(255) NOT NULL,
                EventPayload TEXT NOT NULL,
                MetaPayload TEXT,
                AggregateVersion INT NOT NULL,
                Deleted BOOLEAN DEFAULT FALSE,
                CreatedAt TIMESTAMPTZ DEFAULT NOW()
            )
            """);
    }

    protected override string BuildLimitQuery(string baseSql, int batchSize) =>
        $"{baseSql} LIMIT {batchSize}";

    protected override string BooleanTrue => "TRUE";
    protected override string BooleanFalse => "FALSE";
}

What the base class gives you

Everything that doesn't change between databases:

  • Append() with optimistic concurrency (WrongExpectedVersionException)
  • ReadFromVersion() for aggregate rehydration
  • ReadFromPosition() for subscriptions with position consistency
  • ReadRawFromPosition() for raw event access
  • Archive() and Purge() for event lifecycle management
  • Event mapping, metadata parsing, upcasting
  • Transaction handling with rollback

What you override

MemberPurposeDefault
CreateDatabaseStructure()DDL for your database(abstract)
BuildLimitQuery()LIMIT N vs TOP N vs FETCH FIRST N(abstract)
BooleanTrue / BooleanFalseSQL boolean literals"1" / "0"
UseExplicitPositionExplicit position on INSERT?true

Snapshot store

Same pattern — extend RelationalSnapshotStore<TState>:

csharp
public class MyDbSnapshotStore<TState> : RelationalSnapshotStore<TState>
    where TState : IAggregateState<TState>
{
    public MyDbSnapshotStore(DbConnection connection, string tableName)
        : base(connection, tableName) { }

    protected override void CreateDatabaseStructure() { /* DDL */ }
    protected override string BuildLimitQuery(string baseSql, int batchSize) =>
        $"{baseSql} LIMIT {batchSize}";
}

Checkpoint store

Extend RelationalCheckpointStore — here the upsert syntax varies most between databases:

csharp
public class MyDbCheckpointStore : RelationalCheckpointStore
{
    public MyDbCheckpointStore(DbConnection connection) : base(connection) { }

    protected override void CreateDatabaseStructure() { /* DDL */ }

    public override async Task SaveCheckpoint(
        SubscriptionId subscriptionId, Position position)
    {
        await _connection.ExecuteAsync($"""
            INSERT INTO {TABLE_NAME} (SubscriptionId, Checkpoint)
            VALUES (@Id, @Checkpoint)
            ON CONFLICT (SubscriptionId) DO UPDATE SET Checkpoint = @Checkpoint
            """,
            new { Id = subscriptionId.Value, Checkpoint = position.ToJson() });
    }
}

Registration

Wire up your adapter with a ServiceCollection extension:

csharp
public static class MyDbExtensions
{
    public static IServiceCollection AddMyDbEventStore<TEvent>(
        this IServiceCollection services, string tableName)
        where TEvent : class, IAggregateEvent<TEvent>
    {
        services.AddSingleton<IEventStore<TEvent>>(sp =>
            new MyDbEventStore<TEvent>(
                sp.GetRequiredService<DbConnection>(),
                sp.GetRequiredService<EventStoreSemaphore>(),
                tableName,
                sp.GetService<EventUpcasterChain>()));
        return services;
    }
}

Look at SqliteEventStore (~30 lines) for the simplest reference implementation.


PostgreSQL-specific features

PostgreSQL offers capabilities beyond what other databases provide. Nagare takes advantage of these where available.

Real-time subscription wake-up (LISTEN/NOTIFY)

By default, subscriptions poll the event store at a fixed interval (100ms) to check for new events. This works but wastes queries when the store is idle and adds up to 100ms latency when new events arrive.

PostgreSQL's LISTEN/NOTIFY mechanism eliminates both problems. When enabled, the event store fires a NOTIFY after each successful append, and subscription runners wake up near-instantly instead of waiting for the poll interval.

csharp
// LISTEN/NOTIFY is auto-enabled when you use Postgres storage — no extra setup needed:
builder.Services.AddNagarePostgresStorage(builder.Configuration);
builder.Services.AddPostgresCheckpointStore();

// Subscriptions automatically use LISTEN/NOTIFY for instant wake-up:
builder.Services.AddPostgresSubscription<MyProjection, MyEvent>();

Automatic activation

AddNagarePostgresStorage registers PostgresSubscriptionWakeUp automatically. There is no separate opt-in step. If you need to use a different connection string for LISTEN (e.g., a dedicated read replica), call AddPostgresSubscriptionWakeUp explicitly to override.

How it works:

  1. The event store appends events and commits the transaction
  2. After commit, it fires NOTIFY {table_name} on the journal's channel
  3. Each subscription runner holds an open LISTEN connection on its journal's channel
  4. The notification wakes the runner immediately — no polling delay
  5. If no notification arrives within the fallback timeout (default: PollDelay), the runner polls anyway — belt and suspenders

Key properties:

  • Per-journal channels: each event table gets its own NOTIFY channel. Appointment events only wake appointment subscriptions, not visit subscriptions.
  • Multi-node safe: LISTEN/NOTIFY works across all connections to the same PostgreSQL instance. Multiple service nodes all receive the notification.
  • Fallback polling: if the LISTEN connection drops or a notification is lost, the configured PollDelay acts as a safety net.
  • Zero infrastructure: no Redis, no message bus — just PostgreSQL, which you already have.
  • Eager processing preserved: when processing a backlog, the runner processes batches continuously without waiting. The wake-up signal only applies when the runner is caught up and idle.

Latency comparison:

ModeIdle → first event processedWasted queries (idle)
Default polling (100ms)~50ms average10/sec per subscription
Postgres LISTEN/NOTIFY~1-5msZero

This is particularly valuable for real-time projections (e.g., live dashboards, collaborative editing) where sub-10ms event delivery matters.

Live subscriptions + LISTEN/NOTIFY

Combining AddPostgresLiveSubscription with LISTEN/NOTIFY delivers sub-10ms event-to-notification latency. The live subscription skips history (no replay on startup), and LISTEN/NOTIFY wakes the runner the instant a new event is appended — no polling delay.

csharp
// Live subscription with near-instant PostgreSQL wake-up
builder.Services.AddNagarePostgresStorage(builder.Configuration);
builder.Services.AddPostgresLiveSubscription<MobileNotificationHandler, VisitEvent>();

This combination is ideal for push notifications, SSE-backed views, and any side effect where latency matters but historical replay does not. See projections § Live subscriptions for the full API.

Automatic

LISTEN/NOTIFY is enabled automatically when you call AddNagarePostgresStorage. All subscriptions benefit — both regular projections and live subscriptions. No extra registration needed.

Other databases

LISTEN/NOTIFY is PostgreSQL-specific. MySQL, SQL Server, and SQLite continue to use polling. There is no equivalent mechanism in those databases that doesn't require additional infrastructure.


Next: Packages

流れ — flow.