Skip to content

Event Integrators Guide

Last Updated: 2025-11-01

Event integrators allow you to react to domain events in the event store, enabling integration with external systems, cross-aggregate workflows, and event-driven side effects.


Quick Start

1. Create an Integrator

// Location: src/Domain/LBS.Domain.Sport/SportingEvent/Integrators/SportingEventIntegrator.cs
using LBS.EventSourcing.Integrators;
using Microsoft.Extensions.Logging;

[RequiresModule(ModuleDefinition.SportCore)]
[IntegratorConfiguration(Name = "sporting-event-integrator")]
public sealed class SportingEventIntegrator : MartenIntegratorBase
{
    public SportingEventIntegrator(ILogger<SportingEventIntegrator> logger)
        : base(logger)
    {
    }

    // Automatically receives only SportingEventCreatedEvent
    public async Task HandleAsync(SportingEventCreatedEvent domainEvent, CancellationToken ct)
    {
        this.Logger.LogInformation(
            "Handling SportingEventCreatedEvent for {SportingEventId}",
            domainEvent.AggregateRootId);

        // Your integration logic here
        await Task.CompletedTask;
    }
}

2. Register in Feature Definition

// Location: src/Domain/LBS.Domain.Sport/Configuration/SportDomainCoreFeature.cs
public class SportDomainCoreFeature : IFeatureDefinition
{
    public Type[] GetIntegratorTypes()
    {
        return new[]
        {
            typeof(SportingEventIntegrator)
        };
    }
}

That's it! The integrator will automatically: - Subscribe to events via Marten's async daemon - Only receive events you have handlers for (automatic filtering) - Track its processing position - Retry on transient errors - Resume from last position on restart


Core Concepts

MartenIntegratorBase

The base class providing event integration infrastructure:

public abstract class MartenIntegratorBase : SubscriptionBase
{
    // Available to handlers
    protected IDocumentOperations DocumentSession { get; }
    protected ILogger Logger { get; }

    // Customization points
    protected virtual bool ShouldProcessEvent(JasperFx.Events.IEvent @event);
    protected virtual bool ShouldRetry(Exception ex);
    protected virtual Task HandleErrorAsync(JasperFx.Events.IEvent @event, Exception exception, CancellationToken ct);
}

Integrator Configuration

Configure integrator behavior using the [IntegratorConfiguration] attribute:

[IntegratorConfiguration(
    Name = "my-integrator",              // Defaults to class name
    BatchSize = 50,                      // Defaults to 10
    StartPosition = IntegratorStartPosition.FromBeginning,  // Defaults to FromPresent
    IncludeArchivedEvents = false,       // Defaults to false
    StopOnError = false                  // Defaults to false
)]
public sealed class MyIntegrator : MartenIntegratorBase
{
    // ...
}

Automatic Event Filtering

Marten automatically filters events based on your handler methods:

public sealed class MyIntegrator : MartenIntegratorBase
{
    // Only these two event types will be sent to this integrator
    public async Task HandleAsync(FixtureCreatedEvent evt, CancellationToken ct) { }
    public async Task HandleAsync(FixtureUpdatedEvent evt, CancellationToken ct) { }

    // FixtureCancelledEvent, TeamCreatedEvent, etc. are never sent - filtered by Marten
}

Performance Benefits: - Marten doesn't load events you don't handle - Reduced memory usage - Faster processing - No manual filtering needed


Configuration Options

Start Position

Control where the integrator begins processing events:

// Only process new events (default)
[IntegratorConfiguration(StartPosition = IntegratorStartPosition.FromPresent)]
public sealed class MyIntegrator : MartenIntegratorBase { }

// Process ALL historical events from beginning
// WARNING: May process very old events!
[IntegratorConfiguration(StartPosition = IntegratorStartPosition.FromBeginning)]
public sealed class MyIntegrator : MartenIntegratorBase { }

Batch Size

Configure how many events to process per batch:

// Process 50 events at a time
[IntegratorConfiguration(BatchSize = 50)]
public sealed class MyIntegrator : MartenIntegratorBase { }

// Process one at a time for critical ordering
[IntegratorConfiguration(BatchSize = 1)]
public sealed class MyIntegrator : MartenIntegratorBase { }

Error Handling

Stop on Error - For critical integrators:

// Stop Marten daemon on any error
[IntegratorConfiguration(StopOnError = true)]
public sealed class MyCriticalIntegrator : MartenIntegratorBase { }

Custom Retry Logic:

protected override bool ShouldRetry(Exception ex)
{
    return base.ShouldRetry(ex)  // IOException, HttpRequestException, TimeoutException, etc.
        || ex is MyCustomTransientException;
}

Custom Error Handling:

protected override async Task HandleErrorAsync(
    JasperFx.Events.IEvent @event,
    Exception exception,
    CancellationToken ct)
{
    await base.HandleErrorAsync(@event, exception, ct); // Logs error

    // Send alerts, create tickets, etc.
    await this.alertService.SendAsync($"Integration failed: {exception.Message}");
}

Session Management

Using DocumentSession

The DocumentSession property provides access to the same transaction that Marten uses to track event position:

public async Task HandleAsync(UserCreatedEvent evt, CancellationToken ct)
{
    // Execute command in the same transaction
    var command = new SendWelcomeEmailCommand { UserId = evt.UserId };
    await this.commandExecutor.ExecuteAsync(command, ct);

    // Store a document in the same transaction
    this.DocumentSession.Store(new UserProfile
    {
        UserId = evt.UserId,
        Email = evt.Email
    });

    // Marten commits everything together:
    // - Event position update
    // - Command execution results
    // - Stored documents
}

Separate Database Sessions

For different databases (analytics, reporting, etc.):

public async Task HandleAsync(OrderCreatedEvent evt, CancellationToken ct)
{
    // Write to separate analytics database
    using (var analyticsSession = this.analyticsStore.LightweightSession())
    {
        analyticsSession.Store(new AnalyticsEvent
        {
            EventType = "OrderCreated",
            Data = evt
        });

        await analyticsSession.SaveChangesAsync(ct);  // Explicit commit
    }

    // Main session (event position) committed by Marten separately
}

Common Patterns

External API Integration

[RequiresModule(ModuleDefinition.SportCore)]
public sealed class SportEventKafkaPublisher : MartenIntegratorBase
{
    private readonly IProducer<Guid, string> producer;

    public SportEventKafkaPublisher(
        IProducer<Guid, string> producer,
        ILogger<SportEventKafkaPublisher> logger)
        : base(logger)
    {
        this.producer = producer;
    }

    protected override int BatchSize => 50;

    public async Task HandleAsync(FixtureCreatedEvent evt, CancellationToken ct)
    {
        var message = new Message<Guid, string>
        {
            Key = evt.FixtureId.Value,
            Value = JsonSerializer.Serialize(evt)
        };

        await this.producer.ProduceAsync("sports.fixture.created", message, ct);
    }
}

Cross-Aggregate Workflows

[RequiresModule(ModuleDefinition.Core)]
public sealed class NotificationIntegrator : MartenIntegratorBase
{
    private readonly ICommandExecutor commandExecutor;

    public NotificationIntegrator(
        ICommandExecutor commandExecutor,
        ILogger<NotificationIntegrator> logger)
        : base(logger)
    {
        this.commandExecutor = commandExecutor;
    }

    public async Task HandleAsync(MatchCompletedEvent evt, CancellationToken ct)
    {
        // Execute command in same transaction as event position tracking
        var command = new SendMatchNotificationCommand
        {
            MatchId = evt.MatchId,
            HomeScore = evt.HomeScore,
            AwayScore = evt.AwayScore
        };

        await this.commandExecutor.ExecuteAsync(command, ct);
    }
}

Process All Events (No Module Filter)

// No [RequiresModule] = processes ALL events
public sealed class AuditLogIntegrator : MartenIntegratorBase
{
    private readonly IAuditStore auditStore;

    public AuditLogIntegrator(
        IAuditStore auditStore,
        ILogger<AuditLogIntegrator> logger)
        : base(logger)
    {
        this.auditStore = auditStore;
    }

    // Generic handler for any domain event
    public async Task HandleAsync(IDomainEvent evt, CancellationToken ct)
    {
        await this.auditStore.LogEventAsync(new AuditEntry
        {
            EventType = evt.GetType().Name,
            AggregateId = evt.AggregateRootId,
            Timestamp = DateTimeOffset.UtcNow,
            Data = evt
        }, ct);
    }
}

Custom Event Filtering

protected override bool ShouldProcessEvent(JasperFx.Events.IEvent @event)
{
    // First check base filtering (module, etc.)
    if (!base.ShouldProcessEvent(@event))
        return false;

    // Only process Fixture events
    return @event.StreamKey?.StartsWith("Fixture") == true;
}

Wiring and Configuration

Application Startup

Integrators are automatically registered when you call:

// Program.cs - LBS.Api
var moduleProvider = new FoundryModuleProvider();
builder.Services.AddSingleton<IModuleProvider>(moduleProvider);

// IMPORTANT: Register features BEFORE configuring Marten
builder.Services.AddFeaturesFromModules(builder.Configuration, moduleProvider);

// This automatically discovers and registers all integrators
builder.Services.AddFoundryWithEventStore(builder.Configuration);

Marten Configuration

The RegisterIntegrators method is called automatically by AddFoundryWithEventStore:

public static void RegisterIntegrators(
    this IServiceCollection services,
    MartenServiceCollectionExtensions.MartenConfigurationExpression martenBuilder)
{
    // Get integrator types from features
    var integratorOptions = tempProviderForOptions.GetService<IOptions<FeatureIntegratorOptions>>();

    // Register each integrator type in DI
    foreach (var integratorType in integratorOptions.Value.IntegratorTypes)
    {
        services.AddTransient(integratorType);
    }

    // Subscribe each integrator with Marten
    foreach (var integratorType in integratorOptions.Value.IntegratorTypes)
    {
        // Get dispatch policy to discover handled event types
        var dispatchPolicy = LambdaDispatchPolicy.GetLambdaDispatchPolicy(integratorType);

        // Register with Marten's subscription infrastructure
        martenBuilder.AddSubscriptionWithServices<TIntegrator>(ServiceLifetime.Singleton, options =>
        {
            options.Name = integrator.IntegratorName;
            options.Options.BatchSize = integrator.BatchSize;

            // Automatic event filtering
            foreach (var eventType in dispatchPolicy.HandledEventTypes)
            {
                options.IncludeType(eventType);
            }
        });
    }
}

Testing

Unit Testing Integrators

public class SportingEventIntegratorTests
{
    [Fact]
    public async Task HandleAsync_LogsCreationEvent()
    {
        // Arrange
        var logger = Substitute.For<ILogger<SportingEventIntegrator>>();
        var integrator = new SportingEventIntegrator(logger);

        var evt = new SportingEventCreatedEvent
        {
            AggregateRootId = new SportingEventId { Value = Guid.NewGuid() },
            SportingEventName = "Test Match"
        };

        // Act
        await integrator.HandleAsync(evt, CancellationToken.None);

        // Assert - verify handler was called correctly
        logger.Received(1).LogInformation(
            Arg.Is<string>(s => s.Contains("Handling SportingEventCreatedEvent")),
            Arg.Any<object[]>());
    }
}

Integration Testing with Marten

[Collection("Database")]
public class IntegratorIntegrationTests
{
    [Fact]
    public async Task Integrator_ProcessesEventsFromEventStore()
    {
        // Setup Marten with integrator
        var services = new ServiceCollection();
        services.AddMarten(opts =>
        {
            opts.Connection(connectionString);
            // Add integrator for test
        })
        .AddAsyncDaemon(DaemonMode.HotCold);

        var provider = services.BuildServiceProvider();
        var store = provider.GetRequiredService<IDocumentStore>();

        // Raise event
        using (var session = store.LightweightSession())
        {
            session.Events.Append(streamId, new SportingEventCreatedEvent { ... });
            await session.SaveChangesAsync();
        }

        // Wait for integrator to process
        await Task.Delay(1000);

        // Verify side effects occurred
    }
}

Monitoring and Health

Marten Dashboard

Marten provides built-in monitoring for async subscriptions:

# View subscription status
GET /marten/subscriptions

# Response shows:
# - Subscription name
# - Current position
# - Last processed event
# - Error count
# - Status (running/stopped)

Logging

All integrators log key events:

// Success
Logger.LogInformation("Handling {EventType} for {AggregateId}", ...);

// Retry-able errors
Logger.LogWarning("Integrator encountered retry-able exception", ...);

// Non-retry-able errors
Logger.LogError("Integrator failed processing event {EventId}", ...);

Health Checks

Marten's async daemon provides health check endpoints showing subscription health.


Best Practices

DO

  • Use [RequiresModule] for module-specific integrators
  • Keep handlers focused on single responsibility
  • Use DocumentSession for operations in same transaction
  • Override BatchSize based on throughput needs
  • Test handlers independently of Marten infrastructure
  • Use FromPresent start position (default) for new integrators
  • Log important events for observability

DON'T

  • Don't perform long-running operations in handlers (> 30 seconds)
  • Don't use FromBeginning without understanding implications
  • Don't access DocumentSession outside handler methods
  • Don't create new Marten sessions when you need the main transaction
  • Don't catch all exceptions without rethrowing critical ones
  • Don't modify event data - events are immutable

Troubleshooting

Integrator Not Receiving Events

Check: 1. Feature registered in IFeatureDefinition.GetIntegratorTypes() 2. [RequiresModule] matches enabled module 3. Handler method signature is correct: Task HandleAsync(TEvent evt, CancellationToken ct) 4. AddFeaturesFromModules called BEFORE AddFoundryWithEventStore

Events Being Skipped

Check: 1. ShouldProcessEvent override not filtering events incorrectly 2. Event type matches handler parameter type exactly 3. No exceptions being silently swallowed

Position Not Advancing

Check: 1. Handler throwing unhandled exceptions 2. StopOnError = true and error occurred 3. Marten daemon is running (check health endpoint) 4. Database connectivity


Migration from Old Patterns

If migrating from OldEventBasedIntegrationAgent:

Before

[IntegrationAgent("GUID")]
[RequiresModule(ModuleDefinition.Images)]
public class ImageManipulationIntegrator : OldEventBasedIntegrationAgent
{
    public ImageManipulationIntegrator(IModuleProvider moduleProvider, ILogger logger)
        : base(moduleProvider, logger, "images") { }
}

After

[RequiresModule(ModuleDefinition.Images)]
public class ImageManipulationIntegrator : MartenIntegratorBase
{
    public ImageManipulationIntegrator(ILogger<ImageManipulationIntegrator> logger)
        : base(logger) { }
}

Changes: - Remove [IntegrationAgent] attribute (no longer needed) - Remove prefix parameter (automatic filtering) - Remove IModuleProvider from constructor (automatic) - Use strongly-typed ILogger<T> - Update feature registration to use GetIntegratorTypes()


Further Reading