Skip to content

Import Infrastructure Usage Guide

This document explains how to use the LBS Foundry import infrastructure for creating and executing transactional imports.

Overview

The import infrastructure provides a robust system for: - Converting external data into domain commands - Executing commands in transactional batches grouped by aggregate root - Async job queuing with background processing - Priority-based scheduling (High/Normal) - Crash recovery with automatic retry - Tracking import execution results

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                           HTTP Request                                   │
│                         POST /api/import                                 │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│                        ImportQueueService                                │
│  - Creates ImportJob with Pending status                                 │
│  - Persists to Marten for durability                                     │
│  - High-priority jobs also pushed to in-memory channel                   │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│                  ImportProcessingBackgroundService                       │
│  - Polls for pending jobs (high-priority channel first, then Marten)     │
│  - Claims jobs with optimistic concurrency                               │
│  - Processes up to MaxConcurrentImports (default: 15) in parallel        │
│  - Sends heartbeats during processing                                    │
│  - Recovers stale/crashed jobs on startup and periodically               │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│                           ImportExecutor                                 │
│  - Resolves importer by type name                                        │
│  - Calls IImporter<T> or IAsyncImporter<T>                               │
│  - Groups commands by AggregateRootId                                    │
│  - Executes each group transactionally                                   │
│  - Persists ImportExecutionResult                                        │
└─────────────────────────────────────────────────────────────────────────┘

Import Job Lifecycle

┌─────────┐     ┌────────────┐     ┌───────────┐
│ Pending │ ──► │ Processing │ ──► │ Completed │
└─────────┘     └────────────┘     └───────────┘
                      │ (failure or crash)
              ┌────────────┐
              │   Failed   │
              └────────────┘
                      │ (if RetryAttempt < MaxRetryAttempts)
              ┌─────────┐
              │ Pending │ (retry)
              └─────────┘

Job Statuses: - Pending - Waiting to be picked up by a worker - Processing - Currently being executed - Completed - Successfully finished - Failed - Execution failed (may be retried if under max attempts)

Core Components

1. Import Interfaces

// Synchronous importer - generates all commands, then executes
public interface IImporter<TInput>
{
    Task<IEnumerable<IDomainCommand>> ExecuteAsync(TInput input);
}

// Asynchronous importer - streams commands for large datasets
public interface IAsyncImporter<TInput>
{
    IAsyncEnumerable<IDomainCommand> ExecuteAsync(TInput input);
}

2. Import Job Model

public sealed class ImportJobContract
{
    public Guid Id { get; set; }
    public Guid? RequestId { get; set; }           // Client-provided tracking ID
    public string ImporterType { get; set; }        // e.g., "FoxSportsParticipant"
    public JsonObject ImportData { get; set; }      // Full payload for audit/replay
    public ImportPriority Priority { get; set; }    // High or Normal
    public ImportJobStatus Status { get; set; }     // Pending, Processing, Completed, Failed
    public string? WorkerId { get; set; }           // Worker that claimed the job
    public DateTimeOffset CreatedAt { get; set; }
    public DateTimeOffset? StartedAt { get; set; }
    public DateTimeOffset? CompletedAt { get; set; }
    public string? ErrorMessage { get; set; }
    public string? ErrorCode { get; set; }
    public Guid? ResultId { get; set; }             // Link to ImportExecutionResult
    public int RetryAttempt { get; set; }           // Current attempt number
    public DateTimeOffset? LastHeartbeatAt { get; set; }  // For stale detection
}

3. HTTP API

POST /api/import
{
  "requestId": "optional-client-id",
  "importerType": "ModelOutput",
  "priority": "Normal",           // "High" or "Normal" (default)
  "importData": { ... }
}

Response (job queued):

{
  "jobId": "d4350138-d0a1-4d93-90fa-2e808eb634db",
  "status": "Pending",
  "message": "Import job queued successfully"
}

Configuration

Configure import processing in appsettings.json:

{
  "ImportProcessing": {
    "MaxConcurrentImports": 15,
    "PollingIntervalMs": 1000,
    "HighPriorityChannelCapacity": 100,
    "EnableSyncFallbackForHighPriority": true,
    "StaleJobTimeoutMinutes": 30,
    "HeartbeatIntervalMinutes": 5,
    "MaxRetryAttempts": 3,
    "StaleJobRecoveryIntervalMinutes": 5
  }
}
Setting Default Description
MaxConcurrentImports 15 Maximum parallel import jobs
PollingIntervalMs 1000 How often to poll for new jobs (ms)
HighPriorityChannelCapacity 100 Size of in-memory high-priority queue
EnableSyncFallbackForHighPriority true Execute high-priority imports synchronously if no backlog
StaleJobTimeoutMinutes 30 Time before a Processing job is considered crashed
HeartbeatIntervalMinutes 5 How often running jobs signal they're alive
MaxRetryAttempts 3 Maximum retries before permanent failure
StaleJobRecoveryIntervalMinutes 5 How often to check for stale jobs

Crash Recovery

The import infrastructure automatically handles crashes and stuck jobs:

  1. Heartbeats: Jobs send periodic heartbeats while processing
  2. Stale Detection: Jobs with no heartbeat for StaleJobTimeoutMinutes are considered crashed
  3. Automatic Retry: Crashed jobs are reset to Pending (up to MaxRetryAttempts)
  4. Permanent Failure: After max retries, jobs are marked Failed with MAX_RETRIES_EXCEEDED

Recovery runs: - On service startup (catches jobs from previous crashes) - Every StaleJobRecoveryIntervalMinutes during runtime

Creating an Importer

Step 1: Define Input Contract

The input contract must have a [DataContract] attribute with a Name that matches the [Importer] attribute on the importer class. This name is used by the SDK and API for type identification.

[DataContract(Name = "MyDataImport", Namespace = LbsNamespace.CoreImports)]
public sealed class MyImportContract
{
    [DataMember]
    public string Category { get; set; }

    [DataMember]
    public List<MyDataItem> Items { get; set; } = [];
}

public sealed class MyDataItem
{
    public string ExternalId { get; set; }
    public string Name { get; set; }
    public decimal Value { get; set; }
}

Step 2: Implement Importer

Every importer must have an [Importer] attribute that explicitly declares its registered name. This name must match the [DataContract(Name = "...")] on the input contract for SDK compatibility.

[Importer("MyDataImport")]  // Required - must match DataContract name
[RequiresModule(ModuleDefinition.Core)]  // Optional - for module-based activation
public sealed class MyDataImporter : IImporter<MyImportContract>
{
    private readonly IDocumentSession documentSession;
    private readonly ILogger<MyDataImporter> logger;

    public MyDataImporter(IDocumentSession documentSession, ILogger<MyDataImporter> logger)
    {
        this.documentSession = documentSession;
        this.logger = logger;
    }

    public async Task<IEnumerable<IDomainCommand>> ExecuteAsync(MyImportContract input)
    {
        var commands = new List<IDomainCommand>();

        this.logger.LogInformation("Processing {ItemCount} items for category {Category}",
            input.Items.Count, input.Category);

        foreach (var item in input.Items)
        {
            // Look up existing entity or create new aggregate ID
            var aggregateId = await this.ResolveAggregateId(item.ExternalId);

            // Create domain command
            var command = new UpdateMyEntityCommand
            {
                AggregateRootId = aggregateId,
                Name = item.Name,
                Value = item.Value,
                Category = input.Category
            };

            commands.Add(command);
        }

        return commands;
    }

    private async Task<MyEntityId> ResolveAggregateId(string externalId)
    {
        var existing = await this.documentSession.Query<MyEntityContract>()
            .Where(e => e.ExternalId == externalId)
            .FirstOrDefaultAsync();

        return existing?.Id ?? MyEntityId.NewId();
    }
}

Important: The [Importer] attribute is required. Importers without this attribute will not be registered and will cause "Unknown importer type" errors at runtime.

Step 3: Registration (Automatic)

Importers are automatically discovered and registered at startup if they: 1. Have the [Importer("...")] attribute 2. Implement IImporter<T> or IAsyncImporter<T> 3. Are in an assembly passed to AddImportInfrastructure()

The registration stores a bidirectional mapping (name ↔ type) at startup, so no reflection occurs at runtime.

Step 4: Using Importer Names in Code

When calling an importer from code (e.g., in a timer service), use IImporterTypeRegistry for type-safe name lookup:

public class MyTimerService
{
    private readonly IImporterTypeRegistry importerTypeRegistry;
    private readonly IImportExecutor importExecutor;

    public MyTimerService(
        IImporterTypeRegistry importerTypeRegistry,
        IImportExecutor importExecutor)
    {
        this.importerTypeRegistry = importerTypeRegistry;
        this.importExecutor = importExecutor;
    }

    public async Task RunImportAsync()
    {
        var importerName = this.importerTypeRegistry.GetImporterName<MyDataImporter>();
        // Returns "MyDataImport" - fast dictionary lookup, no reflection

        var data = new MyImportContract { Category = "test", Items = [...] };
        var result = await this.importExecutor.ExecuteImportAsync(importerName!, data);
    }
}

Do NOT use magic strings - always use the registry to get importer names:

// Bad - magic string prone to typos
await executor.ExecuteImportAsync("MyDataImport", data);

// Good - type-safe, compile-time checked
await executor.ExecuteImportAsync(registry.GetImporterName<MyDataImporter>()!, data);

Async Importer Example

For large datasets, use async importers to stream commands:

public sealed class LargeDatasetImporter : IAsyncImporter<LargeDatasetContract>
{
    public async IAsyncEnumerable<IDomainCommand> ExecuteAsync(LargeDatasetContract input)
    {
        await foreach (var batch in this.ProcessInBatches(input.DataSource))
        {
            foreach (var item in batch)
            {
                var command = this.CreateCommandFromItem(item);
                yield return command;
            }
        }
    }

    private async IAsyncEnumerable<IEnumerable<DataItem>> ProcessInBatches(string dataSource)
    {
        // Stream data in batches to avoid memory issues
        // Implementation depends on your data source
    }
}

Transactional Behavior

Automatic Batching by Aggregate

Commands are automatically grouped by AggregateRootId and executed in separate transactions:

// These commands will be in separate transactions
var commands = new[]
{
    new UpdatePlayerCommand { AggregateRootId = PlayerId.From("player1") }, // Transaction 1
    new UpdatePlayerCommand { AggregateRootId = PlayerId.From("player1") }, // Transaction 1
    new UpdateTeamCommand { AggregateRootId = TeamId.From("team1") },       // Transaction 2
    new UpdatePlayerCommand { AggregateRootId = PlayerId.From("player2") }  // Transaction 3
};

Failure Handling

  • If any aggregate transaction fails, the entire import fails
  • Failed aggregate transactions are rolled back
  • Successful aggregate transactions before the failure remain committed
  • Detailed error information is provided in the result

Querying Import Jobs

Available Queries

// Get all jobs for an importer type
var query = new AllImportJobsQuery
{
    ImporterType = "FoxSportsParticipant",
    Status = "Processing",  // Optional filter
    Priority = "High"       // Optional filter
};

// Get specific job status
var statusQuery = new ImportJobStatusQuery
{
    JobId = Guid.Parse("d4350138-d0a1-4d93-90fa-2e808eb634db")
};

// Get import results
var resultsQuery = new AllImportResultsQuery
{
    ImporterType = "FoxSportsParticipant",
    Success = true  // Optional filter
};

Via HTTP API

POST /api/readmodel
{
  "queryType": "AllImportJobs",
  "importerType": "FoxSportsParticipant",
  "status": "Completed"
}

Result Tracking

Import results are automatically persisted:

public sealed class ImportExecutionResultContract
{
    public Guid ImportId { get; set; }
    public string ImporterType { get; set; }
    public bool Success { get; set; }
    public int TotalCommands { get; set; }
    public int SuccessfulCommands { get; set; }
    public int FailedCommands { get; set; }
    public string? ErrorMessage { get; set; }
    public string? ErrorCode { get; set; }
    public DateTimeOffset StartedAt { get; set; }
    public DateTimeOffset? CompletedAt { get; set; }
    public TimeSpan? Duration { get; set; }
    public List<AggregateImportResultContract> AggregateResults { get; set; }
}

Best Practices

1. Importer Design

  • Keep importers focused on single business scenarios
  • Use dependency injection for services and repositories
  • Include comprehensive logging for debugging
  • Handle errors gracefully and continue processing when possible

2. Command Generation

  • Ensure commands are grouped by logical aggregate boundaries
  • Include all necessary data in commands to avoid dependencies
  • Use existing aggregate IDs when updating, create new ones when creating

3. Performance Considerations

  • Use async importers for large datasets (>1000 items)
  • Consider batching external API calls within importers
  • Monitor memory usage with large imports
  • Use appropriate logging levels to avoid performance impact
  • Long-running imports (>30 min) are safe due to heartbeat mechanism

4. Error Handling

  • Log warnings for skipped items (e.g., missing external references)
  • Log errors for processing failures but continue with other items
  • Provide meaningful error messages for debugging
  • Automatic retry handles transient failures (up to 3 attempts by default)

5. Priority Selection

  • Use High priority for user-initiated imports that need quick feedback
  • Use Normal priority for scheduled/background imports
  • High-priority imports may execute synchronously if no backlog exists

6. Testing

  • Create unit tests for importer logic with mock dependencies
  • Test both success and failure scenarios
  • Verify command generation matches expected business rules
  • Test transaction rollback behavior

Querying Available Importers

You can query the system to get a list of all registered importers:

Via API

POST /api/readmodel
Authorization: Bearer <admin-token>

{ "queryType": "AvailableImporters" }

Response:

{
  "results": [
    { "importerType": "FoxSportsImport", "importDataContract": "FoxSportsImport" },
    { "importerType": "SuperCoachImport", "importDataContract": "SuperCoachImport" },
    { "importerType": "ModelOutputImport", "importDataContract": "ModelOutputImport" }
  ],
  "totalCount": 3
}

Via SDK

import { coreQueries } from '@luckboxstudios/foundry-sdk';

const importers = await client.query(coreQueries.availableImporters());
// Returns: ImporterInfo[] with { importerType, importDataContract, description }

Note: This query requires the Admin role.

Example: Complete Importer

Input Contract

[DataContract(Name = "ModelOutputImport", Namespace = LbsNamespace.SuperCoachImports)]
public sealed class ModelOutputImporterContract
{
    [DataMember]
    public int Round { get; set; }

    [DataMember]
    public List<PlayerProjection> PlayerProjections { get; set; } = [];
}

Importer Implementation

[Importer("ModelOutputImport")]  // Must match DataContract name
[RequiresModule(ModuleDefinition.SuperCoach)]
public sealed class ModelOutputImporter : IImporter<ModelOutputImporterContract>
{
    private readonly IDocumentSession documentSession;
    private readonly ILogger<ModelOutputImporter> logger;

    public ModelOutputImporter(
        IDocumentSession documentSession,
        ILogger<ModelOutputImporter> logger)
    {
        this.documentSession = documentSession;
        this.logger = logger;
    }

    public async Task<IEnumerable<IDomainCommand>> ExecuteAsync(ModelOutputImporterContract input)
    {
        var commands = new List<IDomainCommand>();

        foreach (var projection in input.PlayerProjections)
        {
            var participant = await this.FindParticipantByFoxFeedId(projection.FoxFeedId);
            if (participant == null)
            {
                this.logger.LogWarning("Participant not found for FoxFeedId {Id}", projection.FoxFeedId);
                continue;
            }

            var command = new UpdatePlayerProjectionCommand
            {
                AggregateRootId = ParticipantId.From(participant.Id),
                Round = input.Round,
                ExpectedMinutes = projection.ExpectedMinutes,
                ExpectedPoints = projection.ExpectedPoints
            };

            commands.Add(command);
        }

        return commands;
    }
}

Calling from a Timer Service

[RequiresModule(ModuleDefinition.SuperCoach)]
public sealed class ModelOutputTimerService
{
    private readonly IImporterTypeRegistry importerTypeRegistry;
    private readonly IServiceProvider serviceProvider;

    public ModelOutputTimerService(
        IImporterTypeRegistry importerTypeRegistry,
        IServiceProvider serviceProvider)
    {
        this.importerTypeRegistry = importerTypeRegistry;
        this.serviceProvider = serviceProvider;
    }

    [Timer(maxConcurrency: 1, intervalInSeconds: TimerInterval.Hourly)]
    public async Task ProcessModelOutputAsync(CancellationToken cancellationToken)
    {
        using var scope = this.serviceProvider.CreateScope();
        var importExecutor = scope.ServiceProvider.GetRequiredService<IImportExecutor>();

        var data = new ModelOutputImporterContract
        {
            Round = 15,
            PlayerProjections = await this.FetchProjectionsAsync()
        };

        // Type-safe importer name lookup - no magic strings!
        var importerName = this.importerTypeRegistry.GetImporterName<ModelOutputImporter>();
        var result = await importExecutor.ExecuteImportAsync(importerName!, data);

        if (!result.Success)
        {
            throw new InvalidOperationException($"Import failed: {result.ErrorMessage}");
        }
    }
}

Usage via HTTP API:

POST /api/import
{
  "importerType": "ModelOutput",
  "priority": "High",
  "importData": {
    "round": 15,
    "playerProjections": [
      {
        "foxFeedId": "12345",
        "expectedMinutes": 65.5,
        "expectedPoints": 45.2
      }
    ]
  }
}

Response:

{
  "jobId": "550e8400-e29b-41d4-a716-446655440000",
  "status": "Pending",
  "message": "Import job queued successfully"
}

This infrastructure provides a robust, scalable solution for importing external data while maintaining data consistency through the event sourcing pattern.