Store Adapters
Nagare separates the what from the where. Your aggregates, events, and projections are database-agnostic — the store adapter handles the dialect-specific details.
The framework ships with adapters for the most common databases. Adding your own is a matter of overriding a few methods.
Built-in backends
| Package | Database | Subscriptions | Distributed locks | Notes |
|---|---|---|---|---|
Nagare.PostgreSql | PostgreSQL | Real-time | Advisory locks | Recommended. LISTEN/NOTIFY for instant subscription wake-up |
Nagare.SqlServer | SQL Server | Polling | sp_getapplock | |
Nagare.MySql | MySQL / MariaDB | Polling | GET_LOCK | |
Nagare.Sqlite | SQLite | Polling | None (single process) | Development only |
Choosing a backend
- Development: SQLite — zero setup, fast, runs anywhere
- Production (recommended): PostgreSQL — real-time subscriptions via LISTEN/NOTIFY, advisory locks, strongest JSON support
- Production (Microsoft stack): SQL Server — polling subscriptions, full feature support
- Production (existing MySQL): MySQL — polling subscriptions, distributed locks via
GET_LOCK
Adding your own
Nagare's store layer uses the template method pattern. Base classes in the core Nagare package implement all the shared logic — append, read, archive, purge, checkpoint tracking. Database adapters override only the dialect-specific parts.
Event store
Extend RelationalEventStore<TEvent> and override four members:
public class MyDbEventStore<TEvent> : RelationalEventStore<TEvent>
where TEvent : class, IAggregateEvent<TEvent>
{
public MyDbEventStore(DbConnection connection, EventStoreSemaphore semaphore,
string tableName, EventUpcasterChain? upcasterChain = null)
: base(connection, semaphore, tableName, upcasterChain) { }
protected override void CreateDatabaseStructure()
{
_connection.Execute($"""
CREATE TABLE IF NOT EXISTS {_tableName} (
Position BIGINT PRIMARY KEY,
AggregateId VARCHAR(255) NOT NULL,
EventType VARCHAR(255) NOT NULL,
EventTag VARCHAR(255) NOT NULL,
EventPayload TEXT NOT NULL,
MetaPayload TEXT,
AggregateVersion INT NOT NULL,
Deleted BOOLEAN DEFAULT FALSE,
CreatedAt TIMESTAMPTZ DEFAULT NOW()
)
""");
}
protected override string BuildLimitQuery(string baseSql, int batchSize) =>
$"{baseSql} LIMIT {batchSize}";
protected override string BooleanTrue => "TRUE";
protected override string BooleanFalse => "FALSE";
}What the base class gives you
Everything that doesn't change between databases:
Append()with optimistic concurrency (WrongExpectedVersionException)ReadFromVersion()for aggregate rehydrationReadFromPosition()for subscriptions with position consistencyReadRawFromPosition()for raw event accessArchive()andPurge()for event lifecycle management- Event mapping, metadata parsing, upcasting
- Transaction handling with rollback
What you override
| Member | Purpose | Default |
|---|---|---|
CreateDatabaseStructure() | DDL for your database | (abstract) |
BuildLimitQuery() | LIMIT N vs TOP N vs FETCH FIRST N | (abstract) |
BooleanTrue / BooleanFalse | SQL boolean literals | "1" / "0" |
UseExplicitPosition | Explicit position on INSERT? | true |
Snapshot store
Same pattern — extend RelationalSnapshotStore<TState>:
public class MyDbSnapshotStore<TState> : RelationalSnapshotStore<TState>
where TState : IAggregateState<TState>
{
public MyDbSnapshotStore(DbConnection connection, string tableName)
: base(connection, tableName) { }
protected override void CreateDatabaseStructure() { /* DDL */ }
protected override string BuildLimitQuery(string baseSql, int batchSize) =>
$"{baseSql} LIMIT {batchSize}";
}Checkpoint store
Extend RelationalCheckpointStore — here the upsert syntax varies most between databases:
public class MyDbCheckpointStore : RelationalCheckpointStore
{
public MyDbCheckpointStore(DbConnection connection) : base(connection) { }
protected override void CreateDatabaseStructure() { /* DDL */ }
public override async Task SaveCheckpoint(
SubscriptionId subscriptionId, Position position)
{
await _connection.ExecuteAsync($"""
INSERT INTO {TABLE_NAME} (SubscriptionId, Checkpoint)
VALUES (@Id, @Checkpoint)
ON CONFLICT (SubscriptionId) DO UPDATE SET Checkpoint = @Checkpoint
""",
new { Id = subscriptionId.Value, Checkpoint = position.ToJson() });
}
}Registration
Wire up your adapter with a ServiceCollection extension:
public static class MyDbExtensions
{
public static IServiceCollection AddMyDbEventStore<TEvent>(
this IServiceCollection services, string tableName)
where TEvent : class, IAggregateEvent<TEvent>
{
services.AddSingleton<IEventStore<TEvent>>(sp =>
new MyDbEventStore<TEvent>(
sp.GetRequiredService<DbConnection>(),
sp.GetRequiredService<EventStoreSemaphore>(),
tableName,
sp.GetService<EventUpcasterChain>()));
return services;
}
}Look at SqliteEventStore (~30 lines) for the simplest reference implementation.
PostgreSQL-specific features
PostgreSQL offers capabilities beyond what other databases provide. Nagare takes advantage of these where available.
Real-time subscription wake-up (LISTEN/NOTIFY)
By default, subscriptions poll the event store at a fixed interval (100ms) to check for new events. This works but wastes queries when the store is idle and adds up to 100ms latency when new events arrive.
PostgreSQL's LISTEN/NOTIFY mechanism eliminates both problems. When enabled, the event store fires a NOTIFY after each successful append, and subscription runners wake up near-instantly instead of waiting for the poll interval.
// LISTEN/NOTIFY is auto-enabled when you use Postgres storage — no extra setup needed:
builder.Services.AddNagarePostgresStorage(builder.Configuration);
builder.Services.AddPostgresCheckpointStore();
// Subscriptions automatically use LISTEN/NOTIFY for instant wake-up:
builder.Services.AddPostgresSubscription<MyProjection, MyEvent>();Automatic activation
AddNagarePostgresStorage registers PostgresSubscriptionWakeUp automatically. There is no separate opt-in step. If you need to use a different connection string for LISTEN (e.g., a dedicated read replica), call AddPostgresSubscriptionWakeUp explicitly to override.
How it works:
- The event store appends events and commits the transaction
- After commit, it fires
NOTIFY {table_name}on the journal's channel - Each subscription runner holds an open
LISTENconnection on its journal's channel - The notification wakes the runner immediately — no polling delay
- If no notification arrives within the fallback timeout (default:
PollDelay), the runner polls anyway — belt and suspenders
Key properties:
- Per-journal channels: each event table gets its own NOTIFY channel. Appointment events only wake appointment subscriptions, not visit subscriptions.
- Multi-node safe:
LISTEN/NOTIFYworks across all connections to the same PostgreSQL instance. Multiple service nodes all receive the notification. - Fallback polling: if the LISTEN connection drops or a notification is lost, the configured
PollDelayacts as a safety net. - Zero infrastructure: no Redis, no message bus — just PostgreSQL, which you already have.
- Eager processing preserved: when processing a backlog, the runner processes batches continuously without waiting. The wake-up signal only applies when the runner is caught up and idle.
Latency comparison:
| Mode | Idle → first event processed | Wasted queries (idle) |
|---|---|---|
| Default polling (100ms) | ~50ms average | 10/sec per subscription |
| Postgres LISTEN/NOTIFY | ~1-5ms | Zero |
This is particularly valuable for real-time projections (e.g., live dashboards, collaborative editing) where sub-10ms event delivery matters.
Live subscriptions + LISTEN/NOTIFY
Combining AddPostgresLiveSubscription with LISTEN/NOTIFY delivers sub-10ms event-to-notification latency. The live subscription skips history (no replay on startup), and LISTEN/NOTIFY wakes the runner the instant a new event is appended — no polling delay.
// Live subscription with near-instant PostgreSQL wake-up
builder.Services.AddNagarePostgresStorage(builder.Configuration);
builder.Services.AddPostgresLiveSubscription<MobileNotificationHandler, VisitEvent>();This combination is ideal for push notifications, SSE-backed views, and any side effect where latency matters but historical replay does not. See projections § Live subscriptions for the full API.
Automatic
LISTEN/NOTIFY is enabled automatically when you call AddNagarePostgresStorage. All subscriptions benefit — both regular projections and live subscriptions. No extra registration needed.
Other databases
LISTEN/NOTIFY is PostgreSQL-specific. MySQL, SQL Server, and SQLite continue to use polling. There is no equivalent mechanism in those databases that doesn't require additional infrastructure.
Next: Packages