Event Sourcing Architecture¶
This document describes the event sourcing implementation in LBS Foundry, including patterns, practices, and technical details.
Overview¶
Event sourcing is a powerful architectural pattern that ensures data consistency and scalability by storing the full sequence of events that lead to the current state of the system. Unlike traditional CRUD (Create, Read, Update, Delete) systems, where the state is stored directly in a database, event sourcing stores a series of immutable events that describe state transitions over time.
Event Sourcing is a fundamental architectural pattern in LBS Foundry where we capture all changes to application state as a sequence of immutable events. Instead of storing just the current state, we store the events that led to that state.
Key Benefits¶
Complete Audit Trail¶
- Every change is recorded as an immutable event
- Full history of what happened, when, and why
- Compliance with audit requirements
- Debugging capabilities with complete context
Temporal Queries¶
- Query state at any point in time
- Replay events to rebuild state
- Time-travel debugging for complex issues
- Historical reporting and analytics
Event-Driven Architecture¶
- Loose coupling between components
- Real-time notifications based on events
- Integration with external systems via events
- Background processing triggered by events
Additional Benefits¶
- Auditability: Complete history of every action through immutable events
- Scalability: Separation of read/write sides with optimized projections
- Data Consistency: Full history reconstruction ensures system-wide consistency
- Domain-Driven Design: Follows DDD principles for complex business modeling
System Flow¶
1. User Interaction (Write Side)¶
- User makes a change: Actions trigger commands through the Surface API
- Data Feed Provider: External data feeds processed into commands
- Command Execution: Commands sent to aggregates that enforce business rules
2. Event Stream (Aggregate)¶
- Commands create events appended to immutable event streams
- Each stream identified by strongly-typed IDs
- Events represent immutable facts about system changes
3. Event Storage¶
- Events stored in PostgreSQL with DataContract serialization
- Ensures compatibility and versioning for event evolution
4. Read Side (Query Side)¶
- Readmodel/Projection Builder: Events processed into optimized read models
- Separate database for efficient querying without affecting writes
5. Projections and Queries¶
- Query API retrieves data from pre-built projections
- Real-time updates as new events are added
Implementation Architecture¶
Core Components¶
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Commands │───▶│ Aggregates │───▶│ Events │
│ (Intentions) │ │ (Business Logic)│ │ (Facts) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Read Models │◀───│ Projections │◀───│ Event Store │
│ (Query Models) │ │ (Builders) │ │ (Marten) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Event Store Implementation¶
- PostgreSQL + Marten for event storage
- JSONB columns for efficient event serialization
- Automatic indexing on aggregate ID and sequence
- Stream-based storage for each aggregate
Event Design Patterns¶
Event Structure¶
[DataContract(Name = "PlayerCreated", Namespace = LBSNamespace.Default)]
public sealed record PlayerCreatedEvent : DomainEvent<PlayerId>
{
[DataMember]
public string PlayerName { get; init; } = string.Empty;
[DataMember]
public decimal Salary { get; init; }
[DataMember]
public string Position { get; init; } = string.Empty;
[DataMember]
public TeamId? TeamId { get; init; }
[DataMember]
public DateTime CreatedAt { get; init; } = DateTime.UtcNow;
}
Event Naming Conventions¶
- Past tense - Events represent facts that already happened
- Domain language - Use ubiquitous language from business
- Specific intent - Clear about what changed and why
Examples:
- PlayerCreatedEvent - Clear, past tense
- PlayerSalaryUpdatedEvent - Specific change
- TeamRosterModifiedEvent - Domain-specific
- PlayerChangeEvent - Too vague
- CreatePlayerEvent - Not past tense
Command-Event Flow¶
1. Command Processing¶
public void Execute(CreatePlayerCommand command)
{
// 1. Validate business rules
if (string.IsNullOrEmpty(command.PlayerName))
throw new ArgumentException("Player name required");
// 2. Check business invariants
if (this.IsPlayerNameTaken(command.PlayerName))
throw new InvalidOperationException("Player name already exists");
// 3. Raise domain event
this.RaiseEvent(new PlayerCreatedEvent
{
AggregateRootId = command.AggregateRootId,
PlayerName = command.PlayerName,
Salary = command.Salary,
Position = command.Position,
CreatedAt = DateTime.UtcNow
});
}
2. Event Application¶
public void Apply(PlayerCreatedEvent @event)
{
// Update aggregate state based on event
this.PlayerId = @event.AggregateRootId;
this.PlayerName = @event.PlayerName;
this.Salary = @event.Salary;
this.Position = @event.Position;
this.CreatedAt = @event.CreatedAt;
this.IsActive = true;
}
3. Projection Building¶
public class PlayerContractBuilder : MultiStreamProjection<PlayerContract, PlayerId>
{
public void Apply(PlayerCreatedEvent @event, PlayerContract contract)
{
contract.Id = @event.AggregateRootId.Value;
contract.PlayerName = @event.PlayerName;
contract.Salary = @event.Salary;
contract.Position = @event.Position;
contract.CreatedAt = @event.CreatedAt;
contract.IsActive = true;
}
public void Apply(PlayerSalaryUpdatedEvent @event, PlayerContract contract)
{
contract.Salary = @event.NewSalary;
contract.LastUpdated = @event.UpdatedAt;
}
}
Aggregate Lifecycle¶
1. Creation¶
// New aggregate - version 0
var player = new PlayerAggregate();
player.Execute(createCommand);
// Raises PlayerCreatedEvent
// Version becomes 1
2. Modification¶
// Load existing aggregate
var player = await repository.LoadAsync<PlayerAggregate>(playerId);
// Current version: N
player.Execute(updateCommand);
// Raises PlayerSalaryUpdatedEvent
// Version becomes N+1
3. Event Sourcing Repository Pattern¶
public async Task<T> LoadAsync<T>(StrongTypedIdBase id) where T : IAggregateRoot, new()
{
// 1. Load events from store
var events = await this.eventStore.LoadEventsAsync(id);
// 2. Create new aggregate instance
var aggregate = new T();
aggregate.AggregateRootId = id;
// 3. Replay events to rebuild state
foreach (var @event in events)
{
aggregate.ApplyEvent(@event);
}
return aggregate;
}
Projection Patterns¶
Real-time Projections¶
// Async projections update read models in near real-time
[AsyncProjection]
public class PlayerStatsProjection : MultiStreamProjection<PlayerStatsContract, PlayerId>
{
public void Apply(PlayerCreatedEvent @event, PlayerStatsContract stats)
{
stats.PlayerId = @event.AggregateRootId.Value;
stats.TotalPoints = 0;
stats.GamesPlayed = 0;
stats.LastUpdated = @event.CreatedAt;
}
public void Apply(PlayerScoredEvent @event, PlayerStatsContract stats)
{
stats.TotalPoints += @event.Points;
stats.LastScored = @event.ScoredAt;
stats.LastUpdated = DateTime.UtcNow;
}
}
Inline Projections¶
// Inline projections update synchronously with event storage
public class PlayerSummaryProjection : SingleStreamProjection<PlayerSummary>
{
public void Apply(PlayerCreatedEvent @event, PlayerSummary summary)
{
summary.Id = @event.AggregateRootId.Value;
summary.Name = @event.PlayerName;
summary.CreatedAt = @event.CreatedAt;
}
}
Event Versioning & Evolution¶
Schema Evolution Strategy¶
// V1 Event
[DataContract(Name = "PlayerCreated", Namespace = "LBS.Events.V1")]
public sealed record PlayerCreatedEventV1 : DomainEvent<PlayerId>
{
[DataMember]
public string PlayerName { get; init; } = string.Empty;
[DataMember]
public decimal Salary { get; init; }
}
// V2 Event - Added Position
[DataContract(Name = "PlayerCreated", Namespace = "LBS.Events.V2")]
public sealed record PlayerCreatedEventV2 : DomainEvent<PlayerId>
{
[DataMember]
public string PlayerName { get; init; } = string.Empty;
[DataMember]
public decimal Salary { get; init; }
[DataMember]
public string Position { get; init; } = string.Empty; // New field
}
Upcasting Pattern¶
public class PlayerEventUpcaster : IEventUpcaster
{
public object? Upcast(object @event)
{
return @event switch
{
PlayerCreatedEventV1 v1 => new PlayerCreatedEventV2
{
AggregateRootId = v1.AggregateRootId,
PlayerName = v1.PlayerName,
Salary = v1.Salary,
Position = "Unknown" // Default for legacy events
},
_ => @event
};
}
}
Event Store Queries¶
Stream Queries¶
// Load events for specific aggregate
var events = await session
.Events
.FetchStreamAsync(playerId.Value);
// Load events from specific version
var eventsFromVersion = await session
.Events
.FetchStreamAsync(playerId.Value, fromVersion: 5);
Cross-Stream Queries¶
// Find all events of specific type
var playerCreatedEvents = await session
.Events
.QueryAllRawEvents()
.Where(e => e.EventTypeName == "PlayerCreated")
.ToListAsync();
// Events within time range
var recentEvents = await session
.Events
.QueryAllRawEvents()
.Where(e => e.Timestamp >= DateTime.UtcNow.AddDays(-7))
.ToListAsync();
Security & Event Sourcing¶
Event Authorization¶
// Commands are authorized, events are facts
[RequiresRoles(RoleDefinition.UserManager)]
public sealed record UpdatePlayerSalaryCommand : DomainCommand<PlayerId>
{
// Command requires authorization
}
// Events don't need authorization - they're immutable facts
public sealed record PlayerSalaryUpdatedEvent : DomainEvent<PlayerId>
{
// Event is a fact that already happened
}
Audit Trail¶
// Events include audit information
public sealed record PlayerSalaryUpdatedEvent : DomainEvent<PlayerId>
{
[DataMember]
public decimal PreviousSalary { get; init; }
[DataMember]
public decimal NewSalary { get; init; }
[DataMember]
public string UpdatedBy { get; init; } = string.Empty; // User ID
[DataMember]
public string Reason { get; init; } = string.Empty;
[DataMember]
public DateTime UpdatedAt { get; init; } = DateTime.UtcNow;
}
Performance Considerations¶
Event Store Optimization¶
// Use lightweight sessions for read operations
using var session = store.LightweightSession();
var events = await session.Events.FetchStreamAsync(aggregateId);
// Use identity sessions for writes
using var session = store.IdentitySession();
await session.Events.AppendExclusive(aggregateId, events);
await session.SaveChangesAsync();
Projection Performance¶
// Batch projection updates
public class BatchPlayerStatsProjection : MultiStreamProjection<PlayerStatsContract, PlayerId>
{
public void Apply(IEvent<PlayerScoredEvent>[] events, PlayerStatsContract stats)
{
var totalPoints = events.Sum(e => e.Data.Points);
stats.TotalPoints += totalPoints;
stats.GamesPlayed += events.Length;
stats.LastUpdated = DateTime.UtcNow;
}
}
Snapshot Pattern¶
// For aggregates with many events, use snapshots
public class PlayerAggregateWithSnapshot : PlayerAggregate, ISnapshot
{
public void ApplySnapshot(object snapshot)
{
if (snapshot is PlayerSnapshot playerSnapshot)
{
this.PlayerId = playerSnapshot.PlayerId;
this.PlayerName = playerSnapshot.PlayerName;
this.Salary = playerSnapshot.Salary;
// ... other properties
}
}
public object CreateSnapshot()
{
return new PlayerSnapshot
{
PlayerId = this.PlayerId,
PlayerName = this.PlayerName,
Salary = this.Salary,
// ... other properties
};
}
}
Testing Event Sourcing¶
Command Testing¶
[Test]
public async Task CreatePlayer_ShouldRaisePlayerCreatedEvent()
{
// Arrange
var aggregate = new PlayerAggregate();
var command = new CreatePlayerCommand
{
AggregateRootId = PlayerId.New(),
PlayerName = "Test Player",
Salary = 100000
};
// Act
aggregate.Execute(command);
// Assert
var events = aggregate.GetUncommittedEvents();
events.Should().HaveCount(1);
var playerCreatedEvent = events[0].Should().BeOfType<PlayerCreatedEvent>().Subject;
playerCreatedEvent.PlayerName.Should().Be("Test Player");
playerCreatedEvent.Salary.Should().Be(100000);
}
Event Application Testing¶
[Test]
public void Apply_PlayerCreatedEvent_ShouldSetAggregateState()
{
// Arrange
var aggregate = new PlayerAggregate();
var @event = new PlayerCreatedEvent
{
AggregateRootId = PlayerId.New(),
PlayerName = "Test Player",
Salary = 100000
};
// Act
aggregate.Apply(@event);
// Assert
aggregate.PlayerName.Should().Be("Test Player");
aggregate.Salary.Should().Be(100000);
}
Projection Testing¶
[Test]
public async Task PlayerCreatedEvent_ShouldUpdateProjection()
{
// Arrange
var @event = new PlayerCreatedEvent
{
AggregateRootId = PlayerId.New(),
PlayerName = "Test Player",
Salary = 100000
};
var contract = new PlayerContract();
// Act
var builder = new PlayerContractBuilder();
builder.Apply(@event, contract);
// Assert
contract.PlayerName.Should().Be("Test Player");
contract.Salary.Should().Be(100000);
}
Best Practices¶
Do¶
- Make events immutable - Never change events once stored
- Use past tense naming - Events represent facts
- Include sufficient context - Events should be self-describing
- Version events carefully - Plan for schema evolution
- Test event application - Verify aggregate rebuilding works
- Use domain language - Events should match business terminology
Don't¶
- Store mutable references in events
- Include behavior in events (only data)
- Make events dependent on external state
- Forget about serialization - Events must be serializable
- Ignore versioning - Events live forever
- Use technical language - Keep events business-focused
Related Documentation¶
- Aggregates, Events, and Commands Guide - Detailed implementation guidelines
- Query/Command Alignment Design - Command/Query separation
- Database Schema (Mermaid ERD) - Aggregate contract relationships
- Security Guide - Authorization in event sourcing
- Developer Guide - Practical implementation
- Common Tasks - Daily development patterns
Event sourcing is the foundation of LBS Foundry's architecture. Understanding these patterns is crucial for effective development on the platform.