Event Integrators Guide¶
Last Updated: 2025-11-01
Event integrators allow you to react to domain events in the event store, enabling integration with external systems, cross-aggregate workflows, and event-driven side effects.
Quick Start¶
1. Create an Integrator¶
// Location: src/Domain/LBS.Domain.Sport/SportingEvent/Integrators/SportingEventIntegrator.cs
using LBS.EventSourcing.Integrators;
using Microsoft.Extensions.Logging;
[RequiresModule(ModuleDefinition.SportCore)]
[IntegratorConfiguration(Name = "sporting-event-integrator")]
public sealed class SportingEventIntegrator : MartenIntegratorBase
{
public SportingEventIntegrator(ILogger<SportingEventIntegrator> logger)
: base(logger)
{
}
// Automatically receives only SportingEventCreatedEvent
public async Task HandleAsync(SportingEventCreatedEvent domainEvent, CancellationToken ct)
{
this.Logger.LogInformation(
"Handling SportingEventCreatedEvent for {SportingEventId}",
domainEvent.AggregateRootId);
// Your integration logic here
await Task.CompletedTask;
}
}
2. Register in Feature Definition¶
// Location: src/Domain/LBS.Domain.Sport/Configuration/SportDomainCoreFeature.cs
public class SportDomainCoreFeature : IFeatureDefinition
{
public Type[] GetIntegratorTypes()
{
return new[]
{
typeof(SportingEventIntegrator)
};
}
}
That's it! The integrator will automatically: - Subscribe to events via Marten's async daemon - Only receive events you have handlers for (automatic filtering) - Track its processing position - Retry on transient errors - Resume from last position on restart
Core Concepts¶
MartenIntegratorBase¶
The base class providing event integration infrastructure:
public abstract class MartenIntegratorBase : SubscriptionBase
{
// Available to handlers
protected IDocumentOperations DocumentSession { get; }
protected ILogger Logger { get; }
// Customization points
protected virtual bool ShouldProcessEvent(JasperFx.Events.IEvent @event);
protected virtual bool ShouldRetry(Exception ex);
protected virtual Task HandleErrorAsync(JasperFx.Events.IEvent @event, Exception exception, CancellationToken ct);
}
Integrator Configuration¶
Configure integrator behavior using the [IntegratorConfiguration] attribute:
[IntegratorConfiguration(
Name = "my-integrator", // Defaults to class name
BatchSize = 50, // Defaults to 10
StartPosition = IntegratorStartPosition.FromBeginning, // Defaults to FromPresent
IncludeArchivedEvents = false, // Defaults to false
StopOnError = false // Defaults to false
)]
public sealed class MyIntegrator : MartenIntegratorBase
{
// ...
}
Automatic Event Filtering¶
Marten automatically filters events based on your handler methods:
public sealed class MyIntegrator : MartenIntegratorBase
{
// Only these two event types will be sent to this integrator
public async Task HandleAsync(FixtureCreatedEvent evt, CancellationToken ct) { }
public async Task HandleAsync(FixtureUpdatedEvent evt, CancellationToken ct) { }
// FixtureCancelledEvent, TeamCreatedEvent, etc. are never sent - filtered by Marten
}
Performance Benefits: - Marten doesn't load events you don't handle - Reduced memory usage - Faster processing - No manual filtering needed
Configuration Options¶
Start Position¶
Control where the integrator begins processing events:
// Only process new events (default)
[IntegratorConfiguration(StartPosition = IntegratorStartPosition.FromPresent)]
public sealed class MyIntegrator : MartenIntegratorBase { }
// Process ALL historical events from beginning
// WARNING: May process very old events!
[IntegratorConfiguration(StartPosition = IntegratorStartPosition.FromBeginning)]
public sealed class MyIntegrator : MartenIntegratorBase { }
Batch Size¶
Configure how many events to process per batch:
// Process 50 events at a time
[IntegratorConfiguration(BatchSize = 50)]
public sealed class MyIntegrator : MartenIntegratorBase { }
// Process one at a time for critical ordering
[IntegratorConfiguration(BatchSize = 1)]
public sealed class MyIntegrator : MartenIntegratorBase { }
Error Handling¶
Stop on Error - For critical integrators:
// Stop Marten daemon on any error
[IntegratorConfiguration(StopOnError = true)]
public sealed class MyCriticalIntegrator : MartenIntegratorBase { }
Custom Retry Logic:
protected override bool ShouldRetry(Exception ex)
{
return base.ShouldRetry(ex) // IOException, HttpRequestException, TimeoutException, etc.
|| ex is MyCustomTransientException;
}
Custom Error Handling:
protected override async Task HandleErrorAsync(
JasperFx.Events.IEvent @event,
Exception exception,
CancellationToken ct)
{
await base.HandleErrorAsync(@event, exception, ct); // Logs error
// Send alerts, create tickets, etc.
await this.alertService.SendAsync($"Integration failed: {exception.Message}");
}
Session Management¶
Using DocumentSession¶
The DocumentSession property provides access to the same transaction that Marten uses to track event position:
public async Task HandleAsync(UserCreatedEvent evt, CancellationToken ct)
{
// Execute command in the same transaction
var command = new SendWelcomeEmailCommand { UserId = evt.UserId };
await this.commandExecutor.ExecuteAsync(command, ct);
// Store a document in the same transaction
this.DocumentSession.Store(new UserProfile
{
UserId = evt.UserId,
Email = evt.Email
});
// Marten commits everything together:
// - Event position update
// - Command execution results
// - Stored documents
}
Separate Database Sessions¶
For different databases (analytics, reporting, etc.):
public async Task HandleAsync(OrderCreatedEvent evt, CancellationToken ct)
{
// Write to separate analytics database
using (var analyticsSession = this.analyticsStore.LightweightSession())
{
analyticsSession.Store(new AnalyticsEvent
{
EventType = "OrderCreated",
Data = evt
});
await analyticsSession.SaveChangesAsync(ct); // Explicit commit
}
// Main session (event position) committed by Marten separately
}
Common Patterns¶
External API Integration¶
[RequiresModule(ModuleDefinition.SportCore)]
public sealed class SportEventKafkaPublisher : MartenIntegratorBase
{
private readonly IProducer<Guid, string> producer;
public SportEventKafkaPublisher(
IProducer<Guid, string> producer,
ILogger<SportEventKafkaPublisher> logger)
: base(logger)
{
this.producer = producer;
}
protected override int BatchSize => 50;
public async Task HandleAsync(FixtureCreatedEvent evt, CancellationToken ct)
{
var message = new Message<Guid, string>
{
Key = evt.FixtureId.Value,
Value = JsonSerializer.Serialize(evt)
};
await this.producer.ProduceAsync("sports.fixture.created", message, ct);
}
}
Cross-Aggregate Workflows¶
[RequiresModule(ModuleDefinition.Core)]
public sealed class NotificationIntegrator : MartenIntegratorBase
{
private readonly ICommandExecutor commandExecutor;
public NotificationIntegrator(
ICommandExecutor commandExecutor,
ILogger<NotificationIntegrator> logger)
: base(logger)
{
this.commandExecutor = commandExecutor;
}
public async Task HandleAsync(MatchCompletedEvent evt, CancellationToken ct)
{
// Execute command in same transaction as event position tracking
var command = new SendMatchNotificationCommand
{
MatchId = evt.MatchId,
HomeScore = evt.HomeScore,
AwayScore = evt.AwayScore
};
await this.commandExecutor.ExecuteAsync(command, ct);
}
}
Process All Events (No Module Filter)¶
// No [RequiresModule] = processes ALL events
public sealed class AuditLogIntegrator : MartenIntegratorBase
{
private readonly IAuditStore auditStore;
public AuditLogIntegrator(
IAuditStore auditStore,
ILogger<AuditLogIntegrator> logger)
: base(logger)
{
this.auditStore = auditStore;
}
// Generic handler for any domain event
public async Task HandleAsync(IDomainEvent evt, CancellationToken ct)
{
await this.auditStore.LogEventAsync(new AuditEntry
{
EventType = evt.GetType().Name,
AggregateId = evt.AggregateRootId,
Timestamp = DateTimeOffset.UtcNow,
Data = evt
}, ct);
}
}
Custom Event Filtering¶
protected override bool ShouldProcessEvent(JasperFx.Events.IEvent @event)
{
// First check base filtering (module, etc.)
if (!base.ShouldProcessEvent(@event))
return false;
// Only process Fixture events
return @event.StreamKey?.StartsWith("Fixture") == true;
}
Wiring and Configuration¶
Application Startup¶
Integrators are automatically registered when you call:
// Program.cs - LBS.Api
var moduleProvider = new FoundryModuleProvider();
builder.Services.AddSingleton<IModuleProvider>(moduleProvider);
// IMPORTANT: Register features BEFORE configuring Marten
builder.Services.AddFeaturesFromModules(builder.Configuration, moduleProvider);
// This automatically discovers and registers all integrators
builder.Services.AddFoundryWithEventStore(builder.Configuration);
Marten Configuration¶
The RegisterIntegrators method is called automatically by AddFoundryWithEventStore:
public static void RegisterIntegrators(
this IServiceCollection services,
MartenServiceCollectionExtensions.MartenConfigurationExpression martenBuilder)
{
// Get integrator types from features
var integratorOptions = tempProviderForOptions.GetService<IOptions<FeatureIntegratorOptions>>();
// Register each integrator type in DI
foreach (var integratorType in integratorOptions.Value.IntegratorTypes)
{
services.AddTransient(integratorType);
}
// Subscribe each integrator with Marten
foreach (var integratorType in integratorOptions.Value.IntegratorTypes)
{
// Get dispatch policy to discover handled event types
var dispatchPolicy = LambdaDispatchPolicy.GetLambdaDispatchPolicy(integratorType);
// Register with Marten's subscription infrastructure
martenBuilder.AddSubscriptionWithServices<TIntegrator>(ServiceLifetime.Singleton, options =>
{
options.Name = integrator.IntegratorName;
options.Options.BatchSize = integrator.BatchSize;
// Automatic event filtering
foreach (var eventType in dispatchPolicy.HandledEventTypes)
{
options.IncludeType(eventType);
}
});
}
}
Testing¶
Unit Testing Integrators¶
public class SportingEventIntegratorTests
{
[Fact]
public async Task HandleAsync_LogsCreationEvent()
{
// Arrange
var logger = Substitute.For<ILogger<SportingEventIntegrator>>();
var integrator = new SportingEventIntegrator(logger);
var evt = new SportingEventCreatedEvent
{
AggregateRootId = new SportingEventId { Value = Guid.NewGuid() },
SportingEventName = "Test Match"
};
// Act
await integrator.HandleAsync(evt, CancellationToken.None);
// Assert - verify handler was called correctly
logger.Received(1).LogInformation(
Arg.Is<string>(s => s.Contains("Handling SportingEventCreatedEvent")),
Arg.Any<object[]>());
}
}
Integration Testing with Marten¶
[Collection("Database")]
public class IntegratorIntegrationTests
{
[Fact]
public async Task Integrator_ProcessesEventsFromEventStore()
{
// Setup Marten with integrator
var services = new ServiceCollection();
services.AddMarten(opts =>
{
opts.Connection(connectionString);
// Add integrator for test
})
.AddAsyncDaemon(DaemonMode.HotCold);
var provider = services.BuildServiceProvider();
var store = provider.GetRequiredService<IDocumentStore>();
// Raise event
using (var session = store.LightweightSession())
{
session.Events.Append(streamId, new SportingEventCreatedEvent { ... });
await session.SaveChangesAsync();
}
// Wait for integrator to process
await Task.Delay(1000);
// Verify side effects occurred
}
}
Monitoring and Health¶
Marten Dashboard¶
Marten provides built-in monitoring for async subscriptions:
# View subscription status
GET /marten/subscriptions
# Response shows:
# - Subscription name
# - Current position
# - Last processed event
# - Error count
# - Status (running/stopped)
Logging¶
All integrators log key events:
// Success
Logger.LogInformation("Handling {EventType} for {AggregateId}", ...);
// Retry-able errors
Logger.LogWarning("Integrator encountered retry-able exception", ...);
// Non-retry-able errors
Logger.LogError("Integrator failed processing event {EventId}", ...);
Health Checks¶
Marten's async daemon provides health check endpoints showing subscription health.
Best Practices¶
DO¶
- Use
[RequiresModule]for module-specific integrators - Keep handlers focused on single responsibility
- Use
DocumentSessionfor operations in same transaction - Override
BatchSizebased on throughput needs - Test handlers independently of Marten infrastructure
- Use
FromPresentstart position (default) for new integrators - Log important events for observability
DON'T¶
- Don't perform long-running operations in handlers (> 30 seconds)
- Don't use
FromBeginningwithout understanding implications - Don't access
DocumentSessionoutside handler methods - Don't create new Marten sessions when you need the main transaction
- Don't catch all exceptions without rethrowing critical ones
- Don't modify event data - events are immutable
Troubleshooting¶
Integrator Not Receiving Events¶
Check:
1. Feature registered in IFeatureDefinition.GetIntegratorTypes()
2. [RequiresModule] matches enabled module
3. Handler method signature is correct: Task HandleAsync(TEvent evt, CancellationToken ct)
4. AddFeaturesFromModules called BEFORE AddFoundryWithEventStore
Events Being Skipped¶
Check:
1. ShouldProcessEvent override not filtering events incorrectly
2. Event type matches handler parameter type exactly
3. No exceptions being silently swallowed
Position Not Advancing¶
Check:
1. Handler throwing unhandled exceptions
2. StopOnError = true and error occurred
3. Marten daemon is running (check health endpoint)
4. Database connectivity
Migration from Old Patterns¶
If migrating from OldEventBasedIntegrationAgent:
Before¶
[IntegrationAgent("GUID")]
[RequiresModule(ModuleDefinition.Images)]
public class ImageManipulationIntegrator : OldEventBasedIntegrationAgent
{
public ImageManipulationIntegrator(IModuleProvider moduleProvider, ILogger logger)
: base(moduleProvider, logger, "images") { }
}
After¶
[RequiresModule(ModuleDefinition.Images)]
public class ImageManipulationIntegrator : MartenIntegratorBase
{
public ImageManipulationIntegrator(ILogger<ImageManipulationIntegrator> logger)
: base(logger) { }
}
Changes:
- Remove [IntegrationAgent] attribute (no longer needed)
- Remove prefix parameter (automatic filtering)
- Remove IModuleProvider from constructor (automatic)
- Use strongly-typed ILogger<T>
- Update feature registration to use GetIntegratorTypes()
Further Reading¶
- Event Sourcing Guide - Event patterns and best practices
- Common Tasks - Step-by-step implementation guides
- Module System - Feature modules and dependencies