Skip to content

Event Sourcing Architecture

This document describes the event sourcing implementation in LBS Foundry, including patterns, practices, and technical details.

Overview

Event sourcing is a powerful architectural pattern that ensures data consistency and scalability by storing the full sequence of events that lead to the current state of the system. Unlike traditional CRUD (Create, Read, Update, Delete) systems, where the state is stored directly in a database, event sourcing stores a series of immutable events that describe state transitions over time.

Event Sourcing Diagram

Event Sourcing is a fundamental architectural pattern in LBS Foundry where we capture all changes to application state as a sequence of immutable events. Instead of storing just the current state, we store the events that led to that state.

Key Benefits

Complete Audit Trail

  • Every change is recorded as an immutable event
  • Full history of what happened, when, and why
  • Compliance with audit requirements
  • Debugging capabilities with complete context

Temporal Queries

  • Query state at any point in time
  • Replay events to rebuild state
  • Time-travel debugging for complex issues
  • Historical reporting and analytics

Event-Driven Architecture

  • Loose coupling between components
  • Real-time notifications based on events
  • Integration with external systems via events
  • Background processing triggered by events

Additional Benefits

  • Auditability: Complete history of every action through immutable events
  • Scalability: Separation of read/write sides with optimized projections
  • Data Consistency: Full history reconstruction ensures system-wide consistency
  • Domain-Driven Design: Follows DDD principles for complex business modeling

System Flow

1. User Interaction (Write Side)

  • User makes a change: Actions trigger commands through the Surface API
  • Data Feed Provider: External data feeds processed into commands
  • Command Execution: Commands sent to aggregates that enforce business rules

2. Event Stream (Aggregate)

  • Commands create events appended to immutable event streams
  • Each stream identified by strongly-typed IDs
  • Events represent immutable facts about system changes

3. Event Storage

  • Events stored in PostgreSQL with DataContract serialization
  • Ensures compatibility and versioning for event evolution

4. Read Side (Query Side)

  • Readmodel/Projection Builder: Events processed into optimized read models
  • Separate database for efficient querying without affecting writes

5. Projections and Queries

  • Query API retrieves data from pre-built projections
  • Real-time updates as new events are added

Implementation Architecture

Core Components

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│    Commands     │───▶│   Aggregates    │───▶│     Events      │
│  (Intentions)   │    │ (Business Logic)│    │    (Facts)      │
└─────────────────┘    └─────────────────┘    └─────────────────┘
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  Read Models    │◀───│   Projections   │◀───│  Event Store    │
│ (Query Models)  │    │   (Builders)    │    │   (Marten)      │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Event Store Implementation

  • PostgreSQL + Marten for event storage
  • JSONB columns for efficient event serialization
  • Automatic indexing on aggregate ID and sequence
  • Stream-based storage for each aggregate

Event Design Patterns

Event Structure

[DataContract(Name = "PlayerCreated", Namespace = LBSNamespace.Default)]
public sealed record PlayerCreatedEvent : DomainEvent<PlayerId>
{
    [DataMember]
    public string PlayerName { get; init; } = string.Empty;

    [DataMember]
    public decimal Salary { get; init; }

    [DataMember]
    public string Position { get; init; } = string.Empty;

    [DataMember]
    public TeamId? TeamId { get; init; }

    [DataMember]
    public DateTime CreatedAt { get; init; } = DateTime.UtcNow;
}

Event Naming Conventions

  • Past tense - Events represent facts that already happened
  • Domain language - Use ubiquitous language from business
  • Specific intent - Clear about what changed and why

Examples: - PlayerCreatedEvent - Clear, past tense - PlayerSalaryUpdatedEvent - Specific change - TeamRosterModifiedEvent - Domain-specific - PlayerChangeEvent - Too vague - CreatePlayerEvent - Not past tense

Command-Event Flow

1. Command Processing

public void Execute(CreatePlayerCommand command)
{
    // 1. Validate business rules
    if (string.IsNullOrEmpty(command.PlayerName))
        throw new ArgumentException("Player name required");

    // 2. Check business invariants
    if (this.IsPlayerNameTaken(command.PlayerName))
        throw new InvalidOperationException("Player name already exists");

    // 3. Raise domain event
    this.RaiseEvent(new PlayerCreatedEvent
    {
        AggregateRootId = command.AggregateRootId,
        PlayerName = command.PlayerName,
        Salary = command.Salary,
        Position = command.Position,
        CreatedAt = DateTime.UtcNow
    });
}

2. Event Application

public void Apply(PlayerCreatedEvent @event)
{
    // Update aggregate state based on event
    this.PlayerId = @event.AggregateRootId;
    this.PlayerName = @event.PlayerName;
    this.Salary = @event.Salary;
    this.Position = @event.Position;
    this.CreatedAt = @event.CreatedAt;
    this.IsActive = true;
}

3. Projection Building

public class PlayerContractBuilder : MultiStreamProjection<PlayerContract, PlayerId>
{
    public void Apply(PlayerCreatedEvent @event, PlayerContract contract)
    {
        contract.Id = @event.AggregateRootId.Value;
        contract.PlayerName = @event.PlayerName;
        contract.Salary = @event.Salary;
        contract.Position = @event.Position;
        contract.CreatedAt = @event.CreatedAt;
        contract.IsActive = true;
    }

    public void Apply(PlayerSalaryUpdatedEvent @event, PlayerContract contract)
    {
        contract.Salary = @event.NewSalary;
        contract.LastUpdated = @event.UpdatedAt;
    }
}

Aggregate Lifecycle

1. Creation

// New aggregate - version 0
var player = new PlayerAggregate();
player.Execute(createCommand);
// Raises PlayerCreatedEvent
// Version becomes 1

2. Modification

// Load existing aggregate
var player = await repository.LoadAsync<PlayerAggregate>(playerId);
// Current version: N

player.Execute(updateCommand);
// Raises PlayerSalaryUpdatedEvent
// Version becomes N+1

3. Event Sourcing Repository Pattern

public async Task<T> LoadAsync<T>(StrongTypedIdBase id) where T : IAggregateRoot, new()
{
    // 1. Load events from store
    var events = await this.eventStore.LoadEventsAsync(id);

    // 2. Create new aggregate instance
    var aggregate = new T();
    aggregate.AggregateRootId = id;

    // 3. Replay events to rebuild state
    foreach (var @event in events)
    {
        aggregate.ApplyEvent(@event);
    }

    return aggregate;
}

Projection Patterns

Real-time Projections

// Async projections update read models in near real-time
[AsyncProjection]
public class PlayerStatsProjection : MultiStreamProjection<PlayerStatsContract, PlayerId>
{
    public void Apply(PlayerCreatedEvent @event, PlayerStatsContract stats)
    {
        stats.PlayerId = @event.AggregateRootId.Value;
        stats.TotalPoints = 0;
        stats.GamesPlayed = 0;
        stats.LastUpdated = @event.CreatedAt;
    }

    public void Apply(PlayerScoredEvent @event, PlayerStatsContract stats)
    {
        stats.TotalPoints += @event.Points;
        stats.LastScored = @event.ScoredAt;
        stats.LastUpdated = DateTime.UtcNow;
    }
}

Inline Projections

// Inline projections update synchronously with event storage
public class PlayerSummaryProjection : SingleStreamProjection<PlayerSummary>
{
    public void Apply(PlayerCreatedEvent @event, PlayerSummary summary)
    {
        summary.Id = @event.AggregateRootId.Value;
        summary.Name = @event.PlayerName;
        summary.CreatedAt = @event.CreatedAt;
    }
}

Event Versioning & Evolution

Schema Evolution Strategy

// V1 Event
[DataContract(Name = "PlayerCreated", Namespace = "LBS.Events.V1")]
public sealed record PlayerCreatedEventV1 : DomainEvent<PlayerId>
{
    [DataMember]
    public string PlayerName { get; init; } = string.Empty;

    [DataMember]
    public decimal Salary { get; init; }
}

// V2 Event - Added Position
[DataContract(Name = "PlayerCreated", Namespace = "LBS.Events.V2")]
public sealed record PlayerCreatedEventV2 : DomainEvent<PlayerId>
{
    [DataMember]
    public string PlayerName { get; init; } = string.Empty;

    [DataMember]
    public decimal Salary { get; init; }

    [DataMember]
    public string Position { get; init; } = string.Empty; // New field
}

Upcasting Pattern

public class PlayerEventUpcaster : IEventUpcaster
{
    public object? Upcast(object @event)
    {
        return @event switch
        {
            PlayerCreatedEventV1 v1 => new PlayerCreatedEventV2
            {
                AggregateRootId = v1.AggregateRootId,
                PlayerName = v1.PlayerName,
                Salary = v1.Salary,
                Position = "Unknown" // Default for legacy events
            },
            _ => @event
        };
    }
}

Event Store Queries

Stream Queries

// Load events for specific aggregate
var events = await session
    .Events
    .FetchStreamAsync(playerId.Value);

// Load events from specific version
var eventsFromVersion = await session
    .Events
    .FetchStreamAsync(playerId.Value, fromVersion: 5);

Cross-Stream Queries

// Find all events of specific type
var playerCreatedEvents = await session
    .Events
    .QueryAllRawEvents()
    .Where(e => e.EventTypeName == "PlayerCreated")
    .ToListAsync();

// Events within time range
var recentEvents = await session
    .Events
    .QueryAllRawEvents()
    .Where(e => e.Timestamp >= DateTime.UtcNow.AddDays(-7))
    .ToListAsync();

Security & Event Sourcing

Event Authorization

// Commands are authorized, events are facts
[RequiresRoles(RoleDefinition.UserManager)]
public sealed record UpdatePlayerSalaryCommand : DomainCommand<PlayerId>
{
    // Command requires authorization
}

// Events don't need authorization - they're immutable facts
public sealed record PlayerSalaryUpdatedEvent : DomainEvent<PlayerId>
{
    // Event is a fact that already happened
}

Audit Trail

// Events include audit information
public sealed record PlayerSalaryUpdatedEvent : DomainEvent<PlayerId>
{
    [DataMember]
    public decimal PreviousSalary { get; init; }

    [DataMember]
    public decimal NewSalary { get; init; }

    [DataMember]
    public string UpdatedBy { get; init; } = string.Empty; // User ID

    [DataMember]
    public string Reason { get; init; } = string.Empty;

    [DataMember]
    public DateTime UpdatedAt { get; init; } = DateTime.UtcNow;
}

Performance Considerations

Event Store Optimization

// Use lightweight sessions for read operations
using var session = store.LightweightSession();
var events = await session.Events.FetchStreamAsync(aggregateId);

// Use identity sessions for writes
using var session = store.IdentitySession();
await session.Events.AppendExclusive(aggregateId, events);
await session.SaveChangesAsync();

Projection Performance

// Batch projection updates
public class BatchPlayerStatsProjection : MultiStreamProjection<PlayerStatsContract, PlayerId>
{
    public void Apply(IEvent<PlayerScoredEvent>[] events, PlayerStatsContract stats)
    {
        var totalPoints = events.Sum(e => e.Data.Points);
        stats.TotalPoints += totalPoints;
        stats.GamesPlayed += events.Length;
        stats.LastUpdated = DateTime.UtcNow;
    }
}

Snapshot Pattern

// For aggregates with many events, use snapshots
public class PlayerAggregateWithSnapshot : PlayerAggregate, ISnapshot
{
    public void ApplySnapshot(object snapshot)
    {
        if (snapshot is PlayerSnapshot playerSnapshot)
        {
            this.PlayerId = playerSnapshot.PlayerId;
            this.PlayerName = playerSnapshot.PlayerName;
            this.Salary = playerSnapshot.Salary;
            // ... other properties
        }
    }

    public object CreateSnapshot()
    {
        return new PlayerSnapshot
        {
            PlayerId = this.PlayerId,
            PlayerName = this.PlayerName,
            Salary = this.Salary,
            // ... other properties
        };
    }
}

Testing Event Sourcing

Command Testing

[Test]
public async Task CreatePlayer_ShouldRaisePlayerCreatedEvent()
{
    // Arrange
    var aggregate = new PlayerAggregate();
    var command = new CreatePlayerCommand
    {
        AggregateRootId = PlayerId.New(),
        PlayerName = "Test Player",
        Salary = 100000
    };

    // Act
    aggregate.Execute(command);

    // Assert
    var events = aggregate.GetUncommittedEvents();
    events.Should().HaveCount(1);

    var playerCreatedEvent = events[0].Should().BeOfType<PlayerCreatedEvent>().Subject;
    playerCreatedEvent.PlayerName.Should().Be("Test Player");
    playerCreatedEvent.Salary.Should().Be(100000);
}

Event Application Testing

[Test]
public void Apply_PlayerCreatedEvent_ShouldSetAggregateState()
{
    // Arrange
    var aggregate = new PlayerAggregate();
    var @event = new PlayerCreatedEvent
    {
        AggregateRootId = PlayerId.New(),
        PlayerName = "Test Player",
        Salary = 100000
    };

    // Act
    aggregate.Apply(@event);

    // Assert
    aggregate.PlayerName.Should().Be("Test Player");
    aggregate.Salary.Should().Be(100000);
}

Projection Testing

[Test]
public async Task PlayerCreatedEvent_ShouldUpdateProjection()
{
    // Arrange
    var @event = new PlayerCreatedEvent
    {
        AggregateRootId = PlayerId.New(),
        PlayerName = "Test Player",
        Salary = 100000
    };

    var contract = new PlayerContract();

    // Act
    var builder = new PlayerContractBuilder();
    builder.Apply(@event, contract);

    // Assert
    contract.PlayerName.Should().Be("Test Player");
    contract.Salary.Should().Be(100000);
}

Best Practices

Do

  • Make events immutable - Never change events once stored
  • Use past tense naming - Events represent facts
  • Include sufficient context - Events should be self-describing
  • Version events carefully - Plan for schema evolution
  • Test event application - Verify aggregate rebuilding works
  • Use domain language - Events should match business terminology

Don't

  • Store mutable references in events
  • Include behavior in events (only data)
  • Make events dependent on external state
  • Forget about serialization - Events must be serializable
  • Ignore versioning - Events live forever
  • Use technical language - Keep events business-focused

Event sourcing is the foundation of LBS Foundry's architecture. Understanding these patterns is crucial for effective development on the platform.