Import Infrastructure Usage Guide¶
This document explains how to use the LBS Foundry import infrastructure for creating and executing transactional imports.
Overview¶
The import infrastructure provides a robust system for: - Converting external data into domain commands - Executing commands in transactional batches grouped by aggregate root - Async job queuing with background processing - Priority-based scheduling (High/Normal) - Crash recovery with automatic retry - Tracking import execution results
Architecture¶
┌─────────────────────────────────────────────────────────────────────────┐
│ HTTP Request │
│ POST /api/import │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ ImportQueueService │
│ - Creates ImportJob with Pending status │
│ - Persists to Marten for durability │
│ - High-priority jobs also pushed to in-memory channel │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ ImportProcessingBackgroundService │
│ - Polls for pending jobs (high-priority channel first, then Marten) │
│ - Claims jobs with optimistic concurrency │
│ - Processes up to MaxConcurrentImports (default: 15) in parallel │
│ - Sends heartbeats during processing │
│ - Recovers stale/crashed jobs on startup and periodically │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ ImportExecutor │
│ - Resolves importer by type name │
│ - Calls IImporter<T> or IAsyncImporter<T> │
│ - Groups commands by AggregateRootId │
│ - Executes each group transactionally │
│ - Persists ImportExecutionResult │
└─────────────────────────────────────────────────────────────────────────┘
Import Job Lifecycle¶
┌─────────┐ ┌────────────┐ ┌───────────┐
│ Pending │ ──► │ Processing │ ──► │ Completed │
└─────────┘ └────────────┘ └───────────┘
│
│ (failure or crash)
▼
┌────────────┐
│ Failed │
└────────────┘
│
│ (if RetryAttempt < MaxRetryAttempts)
▼
┌─────────┐
│ Pending │ (retry)
└─────────┘
Job Statuses:
- Pending - Waiting to be picked up by a worker
- Processing - Currently being executed
- Completed - Successfully finished
- Failed - Execution failed (may be retried if under max attempts)
Core Components¶
1. Import Interfaces¶
// Synchronous importer - generates all commands, then executes
public interface IImporter<TInput>
{
Task<IEnumerable<IDomainCommand>> ExecuteAsync(TInput input);
}
// Asynchronous importer - streams commands for large datasets
public interface IAsyncImporter<TInput>
{
IAsyncEnumerable<IDomainCommand> ExecuteAsync(TInput input);
}
2. Import Job Model¶
public sealed class ImportJobContract
{
public Guid Id { get; set; }
public Guid? RequestId { get; set; } // Client-provided tracking ID
public string ImporterType { get; set; } // e.g., "FoxSportsParticipant"
public JsonObject ImportData { get; set; } // Full payload for audit/replay
public ImportPriority Priority { get; set; } // High or Normal
public ImportJobStatus Status { get; set; } // Pending, Processing, Completed, Failed
public string? WorkerId { get; set; } // Worker that claimed the job
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset? StartedAt { get; set; }
public DateTimeOffset? CompletedAt { get; set; }
public string? ErrorMessage { get; set; }
public string? ErrorCode { get; set; }
public Guid? ResultId { get; set; } // Link to ImportExecutionResult
public int RetryAttempt { get; set; } // Current attempt number
public DateTimeOffset? LastHeartbeatAt { get; set; } // For stale detection
}
3. HTTP API¶
POST /api/import
{
"requestId": "optional-client-id",
"importerType": "ModelOutput",
"priority": "Normal", // "High" or "Normal" (default)
"importData": { ... }
}
Response (job queued):
{
"jobId": "d4350138-d0a1-4d93-90fa-2e808eb634db",
"status": "Pending",
"message": "Import job queued successfully"
}
Configuration¶
Configure import processing in appsettings.json:
{
"ImportProcessing": {
"MaxConcurrentImports": 15,
"PollingIntervalMs": 1000,
"HighPriorityChannelCapacity": 100,
"EnableSyncFallbackForHighPriority": true,
"StaleJobTimeoutMinutes": 30,
"HeartbeatIntervalMinutes": 5,
"MaxRetryAttempts": 3,
"StaleJobRecoveryIntervalMinutes": 5
}
}
| Setting | Default | Description |
|---|---|---|
MaxConcurrentImports |
15 | Maximum parallel import jobs |
PollingIntervalMs |
1000 | How often to poll for new jobs (ms) |
HighPriorityChannelCapacity |
100 | Size of in-memory high-priority queue |
EnableSyncFallbackForHighPriority |
true | Execute high-priority imports synchronously if no backlog |
StaleJobTimeoutMinutes |
30 | Time before a Processing job is considered crashed |
HeartbeatIntervalMinutes |
5 | How often running jobs signal they're alive |
MaxRetryAttempts |
3 | Maximum retries before permanent failure |
StaleJobRecoveryIntervalMinutes |
5 | How often to check for stale jobs |
Crash Recovery¶
The import infrastructure automatically handles crashes and stuck jobs:
- Heartbeats: Jobs send periodic heartbeats while processing
- Stale Detection: Jobs with no heartbeat for
StaleJobTimeoutMinutesare considered crashed - Automatic Retry: Crashed jobs are reset to
Pending(up toMaxRetryAttempts) - Permanent Failure: After max retries, jobs are marked
FailedwithMAX_RETRIES_EXCEEDED
Recovery runs:
- On service startup (catches jobs from previous crashes)
- Every StaleJobRecoveryIntervalMinutes during runtime
Creating an Importer¶
Step 1: Define Input Contract¶
The input contract must have a [DataContract] attribute with a Name that matches the [Importer] attribute on the importer class. This name is used by the SDK and API for type identification.
[DataContract(Name = "MyDataImport", Namespace = LbsNamespace.CoreImports)]
public sealed class MyImportContract
{
[DataMember]
public string Category { get; set; }
[DataMember]
public List<MyDataItem> Items { get; set; } = [];
}
public sealed class MyDataItem
{
public string ExternalId { get; set; }
public string Name { get; set; }
public decimal Value { get; set; }
}
Step 2: Implement Importer¶
Every importer must have an [Importer] attribute that explicitly declares its registered name. This name must match the [DataContract(Name = "...")] on the input contract for SDK compatibility.
[Importer("MyDataImport")] // Required - must match DataContract name
[RequiresModule(ModuleDefinition.Core)] // Optional - for module-based activation
public sealed class MyDataImporter : IImporter<MyImportContract>
{
private readonly IDocumentSession documentSession;
private readonly ILogger<MyDataImporter> logger;
public MyDataImporter(IDocumentSession documentSession, ILogger<MyDataImporter> logger)
{
this.documentSession = documentSession;
this.logger = logger;
}
public async Task<IEnumerable<IDomainCommand>> ExecuteAsync(MyImportContract input)
{
var commands = new List<IDomainCommand>();
this.logger.LogInformation("Processing {ItemCount} items for category {Category}",
input.Items.Count, input.Category);
foreach (var item in input.Items)
{
// Look up existing entity or create new aggregate ID
var aggregateId = await this.ResolveAggregateId(item.ExternalId);
// Create domain command
var command = new UpdateMyEntityCommand
{
AggregateRootId = aggregateId,
Name = item.Name,
Value = item.Value,
Category = input.Category
};
commands.Add(command);
}
return commands;
}
private async Task<MyEntityId> ResolveAggregateId(string externalId)
{
var existing = await this.documentSession.Query<MyEntityContract>()
.Where(e => e.ExternalId == externalId)
.FirstOrDefaultAsync();
return existing?.Id ?? MyEntityId.NewId();
}
}
Important: The [Importer] attribute is required. Importers without this attribute will not be registered and will cause "Unknown importer type" errors at runtime.
Step 3: Registration (Automatic)¶
Importers are automatically discovered and registered at startup if they:
1. Have the [Importer("...")] attribute
2. Implement IImporter<T> or IAsyncImporter<T>
3. Are in an assembly passed to AddImportInfrastructure()
The registration stores a bidirectional mapping (name ↔ type) at startup, so no reflection occurs at runtime.
Step 4: Using Importer Names in Code¶
When calling an importer from code (e.g., in a timer service), use IImporterTypeRegistry for type-safe name lookup:
public class MyTimerService
{
private readonly IImporterTypeRegistry importerTypeRegistry;
private readonly IImportExecutor importExecutor;
public MyTimerService(
IImporterTypeRegistry importerTypeRegistry,
IImportExecutor importExecutor)
{
this.importerTypeRegistry = importerTypeRegistry;
this.importExecutor = importExecutor;
}
public async Task RunImportAsync()
{
var importerName = this.importerTypeRegistry.GetImporterName<MyDataImporter>();
// Returns "MyDataImport" - fast dictionary lookup, no reflection
var data = new MyImportContract { Category = "test", Items = [...] };
var result = await this.importExecutor.ExecuteImportAsync(importerName!, data);
}
}
Do NOT use magic strings - always use the registry to get importer names:
// Bad - magic string prone to typos
await executor.ExecuteImportAsync("MyDataImport", data);
// Good - type-safe, compile-time checked
await executor.ExecuteImportAsync(registry.GetImporterName<MyDataImporter>()!, data);
Async Importer Example¶
For large datasets, use async importers to stream commands:
public sealed class LargeDatasetImporter : IAsyncImporter<LargeDatasetContract>
{
public async IAsyncEnumerable<IDomainCommand> ExecuteAsync(LargeDatasetContract input)
{
await foreach (var batch in this.ProcessInBatches(input.DataSource))
{
foreach (var item in batch)
{
var command = this.CreateCommandFromItem(item);
yield return command;
}
}
}
private async IAsyncEnumerable<IEnumerable<DataItem>> ProcessInBatches(string dataSource)
{
// Stream data in batches to avoid memory issues
// Implementation depends on your data source
}
}
Transactional Behavior¶
Automatic Batching by Aggregate¶
Commands are automatically grouped by AggregateRootId and executed in separate transactions:
// These commands will be in separate transactions
var commands = new[]
{
new UpdatePlayerCommand { AggregateRootId = PlayerId.From("player1") }, // Transaction 1
new UpdatePlayerCommand { AggregateRootId = PlayerId.From("player1") }, // Transaction 1
new UpdateTeamCommand { AggregateRootId = TeamId.From("team1") }, // Transaction 2
new UpdatePlayerCommand { AggregateRootId = PlayerId.From("player2") } // Transaction 3
};
Failure Handling¶
- If any aggregate transaction fails, the entire import fails
- Failed aggregate transactions are rolled back
- Successful aggregate transactions before the failure remain committed
- Detailed error information is provided in the result
Querying Import Jobs¶
Available Queries¶
// Get all jobs for an importer type
var query = new AllImportJobsQuery
{
ImporterType = "FoxSportsParticipant",
Status = "Processing", // Optional filter
Priority = "High" // Optional filter
};
// Get specific job status
var statusQuery = new ImportJobStatusQuery
{
JobId = Guid.Parse("d4350138-d0a1-4d93-90fa-2e808eb634db")
};
// Get import results
var resultsQuery = new AllImportResultsQuery
{
ImporterType = "FoxSportsParticipant",
Success = true // Optional filter
};
Via HTTP API¶
POST /api/readmodel
{
"queryType": "AllImportJobs",
"importerType": "FoxSportsParticipant",
"status": "Completed"
}
Result Tracking¶
Import results are automatically persisted:
public sealed class ImportExecutionResultContract
{
public Guid ImportId { get; set; }
public string ImporterType { get; set; }
public bool Success { get; set; }
public int TotalCommands { get; set; }
public int SuccessfulCommands { get; set; }
public int FailedCommands { get; set; }
public string? ErrorMessage { get; set; }
public string? ErrorCode { get; set; }
public DateTimeOffset StartedAt { get; set; }
public DateTimeOffset? CompletedAt { get; set; }
public TimeSpan? Duration { get; set; }
public List<AggregateImportResultContract> AggregateResults { get; set; }
}
Best Practices¶
1. Importer Design¶
- Keep importers focused on single business scenarios
- Use dependency injection for services and repositories
- Include comprehensive logging for debugging
- Handle errors gracefully and continue processing when possible
2. Command Generation¶
- Ensure commands are grouped by logical aggregate boundaries
- Include all necessary data in commands to avoid dependencies
- Use existing aggregate IDs when updating, create new ones when creating
3. Performance Considerations¶
- Use async importers for large datasets (>1000 items)
- Consider batching external API calls within importers
- Monitor memory usage with large imports
- Use appropriate logging levels to avoid performance impact
- Long-running imports (>30 min) are safe due to heartbeat mechanism
4. Error Handling¶
- Log warnings for skipped items (e.g., missing external references)
- Log errors for processing failures but continue with other items
- Provide meaningful error messages for debugging
- Automatic retry handles transient failures (up to 3 attempts by default)
5. Priority Selection¶
- Use
Highpriority for user-initiated imports that need quick feedback - Use
Normalpriority for scheduled/background imports - High-priority imports may execute synchronously if no backlog exists
6. Testing¶
- Create unit tests for importer logic with mock dependencies
- Test both success and failure scenarios
- Verify command generation matches expected business rules
- Test transaction rollback behavior
Querying Available Importers¶
You can query the system to get a list of all registered importers:
Via API¶
Response:
{
"results": [
{ "importerType": "FoxSportsImport", "importDataContract": "FoxSportsImport" },
{ "importerType": "SuperCoachImport", "importDataContract": "SuperCoachImport" },
{ "importerType": "ModelOutputImport", "importDataContract": "ModelOutputImport" }
],
"totalCount": 3
}
Via SDK¶
import { coreQueries } from '@luckboxstudios/foundry-sdk';
const importers = await client.query(coreQueries.availableImporters());
// Returns: ImporterInfo[] with { importerType, importDataContract, description }
Note: This query requires the Admin role.
Example: Complete Importer¶
Input Contract¶
[DataContract(Name = "ModelOutputImport", Namespace = LbsNamespace.SuperCoachImports)]
public sealed class ModelOutputImporterContract
{
[DataMember]
public int Round { get; set; }
[DataMember]
public List<PlayerProjection> PlayerProjections { get; set; } = [];
}
Importer Implementation¶
[Importer("ModelOutputImport")] // Must match DataContract name
[RequiresModule(ModuleDefinition.SuperCoach)]
public sealed class ModelOutputImporter : IImporter<ModelOutputImporterContract>
{
private readonly IDocumentSession documentSession;
private readonly ILogger<ModelOutputImporter> logger;
public ModelOutputImporter(
IDocumentSession documentSession,
ILogger<ModelOutputImporter> logger)
{
this.documentSession = documentSession;
this.logger = logger;
}
public async Task<IEnumerable<IDomainCommand>> ExecuteAsync(ModelOutputImporterContract input)
{
var commands = new List<IDomainCommand>();
foreach (var projection in input.PlayerProjections)
{
var participant = await this.FindParticipantByFoxFeedId(projection.FoxFeedId);
if (participant == null)
{
this.logger.LogWarning("Participant not found for FoxFeedId {Id}", projection.FoxFeedId);
continue;
}
var command = new UpdatePlayerProjectionCommand
{
AggregateRootId = ParticipantId.From(participant.Id),
Round = input.Round,
ExpectedMinutes = projection.ExpectedMinutes,
ExpectedPoints = projection.ExpectedPoints
};
commands.Add(command);
}
return commands;
}
}
Calling from a Timer Service¶
[RequiresModule(ModuleDefinition.SuperCoach)]
public sealed class ModelOutputTimerService
{
private readonly IImporterTypeRegistry importerTypeRegistry;
private readonly IServiceProvider serviceProvider;
public ModelOutputTimerService(
IImporterTypeRegistry importerTypeRegistry,
IServiceProvider serviceProvider)
{
this.importerTypeRegistry = importerTypeRegistry;
this.serviceProvider = serviceProvider;
}
[Timer(maxConcurrency: 1, intervalInSeconds: TimerInterval.Hourly)]
public async Task ProcessModelOutputAsync(CancellationToken cancellationToken)
{
using var scope = this.serviceProvider.CreateScope();
var importExecutor = scope.ServiceProvider.GetRequiredService<IImportExecutor>();
var data = new ModelOutputImporterContract
{
Round = 15,
PlayerProjections = await this.FetchProjectionsAsync()
};
// Type-safe importer name lookup - no magic strings!
var importerName = this.importerTypeRegistry.GetImporterName<ModelOutputImporter>();
var result = await importExecutor.ExecuteImportAsync(importerName!, data);
if (!result.Success)
{
throw new InvalidOperationException($"Import failed: {result.ErrorMessage}");
}
}
}
Usage via HTTP API:
POST /api/import
{
"importerType": "ModelOutput",
"priority": "High",
"importData": {
"round": 15,
"playerProjections": [
{
"foxFeedId": "12345",
"expectedMinutes": 65.5,
"expectedPoints": 45.2
}
]
}
}
Response:
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"status": "Pending",
"message": "Import job queued successfully"
}
This infrastructure provides a robust, scalable solution for importing external data while maintaining data consistency through the event sourcing pattern.