Real-Time Notification System - Technical Design¶
Document Information¶
- Status: Draft
- Date: October 2025
- Related ADR: real-time-notification-system.md
- Author: LBS Development Team
Implementation note (current)¶
This document is the original draft design. The system shipped with different component names; the design body below is retained for context, but treat this note as the source of truth for what exists in code today.
- WebSocket entry point:
WebSocketHandler.cs(src/Services/LBS.NotificationService/Endpoints/). - Routing:
NotificationRouter.cs(src/Services/LBS.NotificationService/Services/). - Payload shaping:
PayloadStrategyService.cs(src/Services/LBS.NotificationService/Services/), implementingIPayloadStrategy. - Entity-type resolution is done via
IEntityTypeResolver(src/Core/LBS.Notification.Abstractions/Interfaces/). There is noIChannelNameProvidertype in the codebase; the interface described later in this draft was not implemented. - Redis channel naming lives in
RedisPubSubProvider.cs(src/Core/LBS.Notification.Redis/). The defaultChannelPrefixis"foundry"(seeRedisOptions.cs), producing channels of the formfoundry:<channel>.
Table of Contents¶
- Architecture Overview
- Component Design
- Database Integration
- WebSocket Protocol
- Security & Authorization
- Message Routing & Payload Strategy
- Client SDK
- Deployment & Scaling
- Monitoring & Observability
- Testing Strategy
Architecture Overview¶
High-Level Flow¶
┌─────────────────────────────────────────────────────────────────────────┐
│ PostgreSQL Databases │
│ (Main, Analytics, Reporting, Archive, Ballr) │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Marten Projection Tables (mt_doc_*) │ │
│ │ │ │
│ │ INSERT/UPDATE/DELETE → notify_document_change() trigger │ │
│ └────────────────────────────────┬─────────────────────────────────┘ │
└────────────────────────────────────┼────────────────────────────────────┘
│ pg_notify('foundry:db_changes', json)
↓
┌─────────────────────────────────────────────────────────────────────────┐
│ LBS.NotificationService │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │
│ │ PostgreSQL │ │ Subscription │ │ WebSocket │ │
│ │ Listener │───▶│ Manager │◀───│ Connection │ │
│ │ (LISTEN/NOTIFY) │ │ │ │ Manager │ │
│ └──────────────────┘ └────────┬─────────┘ └─────────────────┘ │
│ │ │
│ ↓ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Redis Pub/Sub (IPubSubProvider) │ │
│ │ │ │
│ │ Channels: │ │
│ │ - foundry:notifications:{EntityType}:{EntityId} │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────────────────┐
│ Multiple Service Instances │
│ (Horizontal Scaling via Redis Backplane) │
│ │
│ Instance 1 Instance 2 Instance 3 │
│ WebSocket 1-N WebSocket N-M WebSocket M-P │
└─────────────────────────────────────────────────────────────────────────┘
│
↓ WebSocket (wss://)
┌─────────────────────────────────────────────────────────────────────────┐
│ React Clients (ballr.live) │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ LBS.Notification.Client (TypeScript SDK) │ │
│ │ - Connection management │ │
│ │ - Auto-reconnection │ │
│ │ - Subscribe/Unsubscribe │ │
│ │ - Message routing to React components │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Component Design¶
Project Structure¶
src/
├── Core/
│ ├── LBS.Notification.Abstractions/
│ │ ├── Contracts/
│ │ │ ├── INotificationMessage.cs
│ │ │ ├── ISubscriptionRequest.cs
│ │ │ └── INotificationPayload.cs
│ │ ├── IPubSubProvider.cs
│ │ ├── ISubscriptionManager.cs
│ │ ├── IConnectionManager.cs
│ │ ├── IPayloadStrategy.cs
│ │ └── IEntityTypeResolver.cs
│ │
│ └── LBS.Notification.Redis/
│ ├── RedisPubSubProvider.cs
│ ├── RedisBackplaneOptions.cs
│ └── RedisServiceExtensions.cs
│
├── Services/
│ └── LBS.NotificationService/
│ ├── Program.cs
│ ├── WebSocketEndpoint.cs
│ ├── ConnectionManager.cs
│ ├── SubscriptionManager.cs
│ ├── PostgresNotificationListener.cs
│ ├── PayloadStrategyResolver.cs
│ ├── EntityTypeMapper.cs
│ └── AuthorizationValidator.cs
│
└── ClientSDK/
└── LBS.Notification.Client/ (TypeScript)
├── src/
│ ├── NotificationClient.ts
│ ├── types.ts
│ ├── reconnection.ts
│ └── hooks/
│ ├── useNotification.ts
│ └── useEntitySubscription.ts
└── package.json
Core Abstractions¶
LBS.Notification.Abstractions¶
IPubSubProvider.cs
namespace LBS.Notification.Abstractions;
/// <summary>
/// Abstraction for pub/sub messaging to enable swappable implementations (Redis, NATS, etc.)
/// </summary>
public interface IPubSubProvider
{
/// <summary>
/// Publish a notification to a specific channel
/// </summary>
Task PublishAsync(string channel, INotificationMessage message, CancellationToken cancellationToken = default);
/// <summary>
/// Subscribe to notifications on a specific channel
/// </summary>
Task SubscribeAsync(string channel, Func<INotificationMessage, Task> handler, CancellationToken cancellationToken = default);
/// <summary>
/// Unsubscribe from a specific channel
/// </summary>
Task UnsubscribeAsync(string channel, CancellationToken cancellationToken = default);
}
/// <summary>
/// Factory for creating pub/sub channels based on entity type and ID
/// </summary>
public interface IChannelNameProvider
{
string GetChannelName(string entityType, string entityId);
string GetDatabaseChangeChannel();
}
INotificationMessage.cs
namespace LBS.Notification.Abstractions.Contracts;
public interface INotificationMessage
{
string Type { get; }
string Id { get; }
string ChangeType { get; }
PayloadType PayloadType { get; }
object? Data { get; }
DateTimeOffset Timestamp { get; }
}
public static class PayloadType
{
public const string Minimal = "minimal";
public const string Full = "full";
public static readonly string[] All = { Minimal, Full };
public static bool IsValid(string payloadType) => All.Contains(payloadType);
}
public static class ChangeType
{
public const string Created = "created";
public const string Updated = "updated";
public const string Deleted = "deleted";
public static readonly string[] All = { Created, Updated, Deleted };
public static bool IsValid(string changeType) => All.Contains(changeType);
}
public record NotificationMessage : INotificationMessage
{
public string Type { get; init; } = string.Empty;
public string Id { get; init; } = string.Empty;
public string ChangeType { get; init; } = ChangeType.Updated;
public string PayloadType { get; init; } = PayloadType.Minimal;
public object? Data { get; init; }
public DateTimeOffset Timestamp { get; init; } = DateTimeOffset.UtcNow;
}
ISubscriptionRequest.cs
namespace LBS.Notification.Abstractions.Contracts;
/// <summary>
/// Client request to subscribe to entity changes
/// </summary>
public interface ISubscriptionRequest
{
string Type { get; }
string Id { get; }
string? SubscriptionId { get; }
}
public record SubscriptionRequest : ISubscriptionRequest
{
public string Type { get; init; } = string.Empty;
public string Id { get; init; } = string.Empty;
public string? SubscriptionId { get; init; }
}
/// <summary>
/// Client request to unsubscribe from entity changes
/// </summary>
public interface IUnsubscribeRequest
{
string? SubscriptionId { get; }
string? Type { get; }
string? Id { get; }
}
public record UnsubscribeRequest : IUnsubscribeRequest
{
public string? SubscriptionId { get; init; }
public string? Type { get; init; }
public string? Id { get; init; }
}
ISubscriptionManager.cs
namespace LBS.Notification.Abstractions;
/// <summary>
/// Manages client subscriptions to entity changes
/// </summary>
public interface ISubscriptionManager
{
/// <summary>
/// Add a subscription for a connection
/// </summary>
Task<SubscriptionResult> SubscribeAsync(
string connectionId,
ISubscriptionRequest request,
ClaimsPrincipal user,
CancellationToken cancellationToken = default);
/// <summary>
/// Remove a specific subscription
/// </summary>
Task UnsubscribeAsync(string connectionId, IUnsubscribeRequest request, CancellationToken cancellationToken = default);
/// <summary>
/// Remove all subscriptions for a connection
/// </summary>
Task UnsubscribeAllAsync(string connectionId, CancellationToken cancellationToken = default);
/// <summary>
/// Get all connections subscribed to a specific entity
/// </summary>
IEnumerable<string> GetSubscribers(string entityType, string entityId);
/// <summary>
/// Get subscription count for a connection
/// </summary>
int GetSubscriptionCount(string connectionId);
}
public record SubscriptionResult
{
public bool Success { get; init; }
public string? SubscriptionId { get; init; }
public string? ErrorMessage { get; init; }
public static SubscriptionResult Succeeded(string subscriptionId) =>
new() { Success = true, SubscriptionId = subscriptionId };
public static SubscriptionResult Failed(string errorMessage) =>
new() { Success = false, ErrorMessage = errorMessage };
}
IPayloadStrategy.cs
namespace LBS.Notification.Abstractions;
/// <summary>
/// Determines whether to send full payload or minimal notification based on entity type
/// </summary>
public interface IPayloadStrategy
{
/// <summary>
/// Determine payload type for an entity
/// </summary>
string DeterminePayloadType(string entityType);
/// <summary>
/// Fetch full entity data for full payload notifications
/// </summary>
Task<object?> FetchEntityDataAsync(string entityType, string entityId, CancellationToken cancellationToken = default);
}
/// <summary>
/// Configuration for payload strategy
/// </summary>
public class PayloadStrategyOptions
{
/// <summary>
/// Entity types that should use full payload (e.g., "RugbyGameState", "Fixture")
/// </summary>
public HashSet<string> FullPayloadEntityTypes { get; set; } = new();
/// <summary>
/// Default payload type for unlisted entities
/// </summary>
public string DefaultPayloadType { get; set; } = PayloadType.Minimal;
}
IEntityTypeResolver.cs
namespace LBS.Notification.Abstractions;
/// <summary>
/// Maps between database table names and domain entity types
/// </summary>
public interface IEntityTypeResolver
{
/// <summary>
/// Get entity type from Marten table name (e.g., "mt_doc_team" → "Team")
/// </summary>
string? ResolveEntityType(string tableName);
/// <summary>
/// Get table name from entity type (e.g., "Team" → "mt_doc_team")
/// </summary>
string? ResolveTableName(string entityType);
/// <summary>
/// Get CLR type from entity type string
/// </summary>
Type? ResolveClrType(string entityType);
}
Database Integration¶
PostgreSQL Trigger Function¶
Migration: Create Notification Trigger Function
-- Migration: 202510_CreateNotificationTrigger.sql
-- Create notification function
CREATE OR REPLACE FUNCTION notify_document_change()
RETURNS TRIGGER AS $$
DECLARE
notification json;
entity_id text;
BEGIN
-- Extract the document ID from the 'id' column
entity_id := COALESCE(NEW.id::text, OLD.id::text);
-- Build notification payload
notification := json_build_object(
'table', TG_TABLE_NAME,
'id', entity_id,
'operation', TG_OP,
'timestamp', CURRENT_TIMESTAMP
);
-- Send notification
PERFORM pg_notify('foundry:db_changes', notification::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create trigger installation helper function
CREATE OR REPLACE FUNCTION install_notification_trigger_on_table(table_name text)
RETURNS void AS $$
BEGIN
EXECUTE format('
CREATE TRIGGER notify_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON %I
FOR EACH ROW
EXECUTE FUNCTION notify_document_change();
', table_name);
END;
$$ LANGUAGE plpgsql;
-- Install on all existing Marten projection tables
DO $$
DECLARE
table_record RECORD;
BEGIN
FOR table_record IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename LIKE 'mt_doc_%'
LOOP
PERFORM install_notification_trigger_on_table(table_record.tablename);
END LOOP;
END $$;
Marten Integration Module¶
NotificationMartenModule.cs
namespace LBS.NotificationService.Data;
public class NotificationMartenModule : IConfigureMarten
{
public void Configure(IServiceProvider services, StoreOptions options)
{
// Ensure notification trigger is installed when Marten creates new projection tables
options.Events.OnDocumentCreated += (store, mapping) =>
{
if (mapping.TableName.Value.StartsWith("mt_doc_"))
{
this.InstallNotificationTrigger(store, mapping.TableName.Value);
}
};
}
private void InstallNotificationTrigger(IDocumentStore store, string tableName)
{
using var connection = store.Storage.Database.CreateConnection();
connection.Open();
using var command = connection.CreateCommand();
command.CommandText = $"SELECT install_notification_trigger_on_table('{tableName}')";
command.ExecuteNonQuery();
}
}
WebSocket Protocol¶
Message Types¶
Client → Server Messages¶
Subscribe
Unsubscribe
Ping (Heartbeat)
Server → Client Messages¶
Subscription Confirmed
{
"type": "subscribed",
"data": {
"subscriptionId": "sub-abc-123",
"entityType": "Team",
"entityId": "team-123"
}
}
Subscription Error
{
"type": "subscription_error",
"data": {
"entityType": "Team",
"entityId": "team-123",
"error": "Unauthorized: User does not have permission to view this entity"
}
}
Notification - Minimal (Refetch)
{
"type": "notification",
"data": {
"entityType": "Team",
"entityId": "team-123",
"changeType": "updated",
"payloadType": "minimal",
"timestamp": "2025-10-08T10:30:00Z"
}
}
Notification - Full Payload
{
"type": "notification",
"data": {
"entityType": "RugbyGameState",
"entityId": "match-456",
"changeType": "updated",
"payloadType": "full",
"timestamp": "2025-10-08T10:30:15Z",
"payload": {
"id": "match-456",
"homeScore": 21,
"awayScore": 14,
"currentPeriod": "Second Half",
"gameTime": "62:30",
"lastScorer": "player-789"
}
}
}
Pong (Heartbeat Response)
WebSocket Endpoint Implementation¶
WebSocketEndpoint.cs
namespace LBS.NotificationService;
public class WebSocketEndpoint
{
private readonly IConnectionManager connectionManager;
private readonly ISubscriptionManager subscriptionManager;
private readonly ILogger<WebSocketEndpoint> logger;
public WebSocketEndpoint(
IConnectionManager connectionManager,
ISubscriptionManager subscriptionManager,
ILogger<WebSocketEndpoint> logger)
{
this.connectionManager = connectionManager;
this.subscriptionManager = subscriptionManager;
this.logger = logger;
}
public async Task HandleWebSocketAsync(HttpContext context)
{
if (!context.WebSockets.IsWebSocketRequest)
{
context.Response.StatusCode = StatusCodes.Status400BadRequest;
return;
}
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var connectionId = Guid.NewGuid().ToString();
var user = context.User;
await this.connectionManager.AddConnectionAsync(connectionId, webSocket, user);
try
{
await this.HandleMessagesAsync(connectionId, webSocket, user);
}
finally
{
await this.subscriptionManager.UnsubscribeAllAsync(connectionId);
await this.connectionManager.RemoveConnectionAsync(connectionId);
}
}
private async Task HandleMessagesAsync(string connectionId, WebSocket webSocket, ClaimsPrincipal user)
{
var buffer = new byte[1024 * 4];
while (webSocket.State == WebSocketState.Open)
{
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closed by client", CancellationToken.None);
break;
}
if (result.MessageType == WebSocketMessageType.Text)
{
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
await this.ProcessClientMessageAsync(connectionId, message, user);
}
}
}
private async Task ProcessClientMessageAsync(string connectionId, string message, ClaimsPrincipal user)
{
// Parse and route client messages (subscribe, unsubscribe, ping)
// Implementation details in full service code
}
}
Security & Authorization¶
Authorization Validation¶
AuthorizationValidator.cs
namespace LBS.NotificationService;
/// <summary>
/// Validates subscription requests against existing query authorization model
/// </summary>
public class AuthorizationValidator
{
private readonly IQueryService queryService;
private readonly IEntityTypeResolver entityTypeResolver;
private readonly ILogger<AuthorizationValidator> logger;
public AuthorizationValidator(
IQueryService queryService,
IEntityTypeResolver entityTypeResolver,
ILogger<AuthorizationValidator> logger)
{
this.queryService = queryService;
this.entityTypeResolver = entityTypeResolver;
this.logger = logger;
}
/// <summary>
/// Validate if user has permission to subscribe to an entity
/// Uses existing ISecureQuery / IPublicQuery authorization logic
/// </summary>
public async Task<AuthorizationResult> ValidateSubscriptionAsync(
string entityType,
string entityId,
ClaimsPrincipal user,
CancellationToken cancellationToken = default)
{
try
{
// Resolve entity CLR type
var clrType = this.entityTypeResolver.ResolveClrType(entityType);
if (clrType == null)
{
return AuthorizationResult.Failed($"Unknown entity type: {entityType}");
}
// Create a minimal query to test authorization
// This leverages existing ISecureQuery / IPublicQuery permission checks
var testQuery = this.CreateAuthorizationTestQuery(entityType, entityId, user);
// Attempt to execute query - if it throws SecurityException, user lacks permission
var result = await this.queryService.ExecuteAsync(testQuery, cancellationToken);
return AuthorizationResult.Succeeded();
}
catch (SecurityAccessDeniedException ex)
{
this.logger.LogWarning(ex, "Authorization failed for {EntityType}:{EntityId}", entityType, entityId);
return AuthorizationResult.Failed("Unauthorized to view this entity");
}
catch (Exception ex)
{
this.logger.LogError(ex, "Authorization validation error for {EntityType}:{EntityId}", entityType, entityId);
return AuthorizationResult.Failed("Authorization validation failed");
}
}
private IQuery CreateAuthorizationTestQuery(string entityType, string entityId, ClaimsPrincipal user)
{
// Create a minimal query that will trigger existing authorization checks
// Implementation depends on your query infrastructure
throw new NotImplementedException();
}
}
public record AuthorizationResult
{
public bool Success { get; init; }
public string? ErrorMessage { get; init; }
public static AuthorizationResult Succeeded() => new() { Success = true };
public static AuthorizationResult Failed(string errorMessage) => new() { Success = false, ErrorMessage = errorMessage };
}
Message Routing & Payload Strategy¶
Payload Strategy Resolver¶
PayloadStrategyResolver.cs
namespace LBS.NotificationService;
public class PayloadStrategyResolver : IPayloadStrategy
{
private readonly PayloadStrategyOptions options;
private readonly IQueryService queryService;
private readonly ILogger<PayloadStrategyResolver> logger;
public PayloadStrategyResolver(
IOptions<PayloadStrategyOptions> options,
IQueryService queryService,
ILogger<PayloadStrategyResolver> logger)
{
this.options = options.Value;
this.queryService = queryService;
this.logger = logger;
}
public string DeterminePayloadType(string entityType)
{
return this.options.FullPayloadEntityTypes.Contains(entityType)
? PayloadType.Full
: PayloadType.Minimal;
}
public async Task<object?> FetchEntityDataAsync(
string entityType,
string entityId,
CancellationToken cancellationToken = default)
{
try
{
// Use system context to fetch data (already authorized at subscription time)
using (UserContext.RunAsSystem())
{
var query = this.CreateFetchQuery(entityType, entityId);
return await this.queryService.ExecuteAsync(query, cancellationToken);
}
}
catch (Exception ex)
{
this.logger.LogError(ex, "Failed to fetch entity data for {EntityType}:{EntityId}", entityType, entityId);
return null;
}
}
private IQuery CreateFetchQuery(string entityType, string entityId)
{
// Create appropriate query based on entity type
// Implementation depends on your query infrastructure
throw new NotImplementedException();
}
}
Configuration¶
appsettings.json
{
"PayloadStrategy": {
"FullPayloadEntityTypes": [
"RugbyGameState",
"Fixture",
"NrlSuperCoachPlayerStats"
],
"DefaultPayloadType": "minimal"
},
"Subscription": {
"MaxSubscriptionsPerConnection": 100,
"SubscriptionTimeoutSeconds": 300
},
"WebSocket": {
"PingIntervalSeconds": 30,
"ConnectionTimeoutSeconds": 3600
},
"Redis": {
"ConnectionString": "localhost:6379",
"ChannelPrefix": "foundry:notifications"
}
}
Client SDK¶
TypeScript Client Implementation¶
NotificationClient.ts
// LBS.Notification.Client/src/NotificationClient.ts
export interface SubscriptionOptions {
entityType: string;
entityId: string;
onNotification: (notification: Notification) => void;
onError?: (error: Error) => void;
}
export interface Notification {
entityType: string;
entityId: string;
changeType: 'created' | 'updated' | 'deleted';
payloadType: 'minimal' | 'full';
timestamp: string;
payload?: any;
}
export class NotificationClient {
private ws: WebSocket | null = null;
private subscriptions: Map<string, SubscriptionOptions> = new Map();
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private reconnectDelay = 1000;
constructor(
private url: string,
private authToken?: string
) {}
async connect(): Promise<void> {
const wsUrl = this.authToken
? `${this.url}?token=${this.authToken}`
: this.url;
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log('NotificationClient connected');
this.reconnectAttempts = 0;
this.resubscribeAll();
};
this.ws.onmessage = (event) => {
this.handleMessage(JSON.parse(event.data));
};
this.ws.onclose = () => {
console.log('NotificationClient disconnected');
this.handleReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
}
subscribe(options: SubscriptionOptions): string {
const subscriptionId = `${options.entityType}:${options.entityId}`;
this.subscriptions.set(subscriptionId, options);
if (this.ws?.readyState === WebSocket.OPEN) {
this.sendSubscribe(options.entityType, options.entityId);
}
return subscriptionId;
}
unsubscribe(subscriptionId: string): void {
const options = this.subscriptions.get(subscriptionId);
if (options && this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'unsubscribe',
data: { subscriptionId }
}));
}
this.subscriptions.delete(subscriptionId);
}
disconnect(): void {
this.ws?.close();
this.subscriptions.clear();
}
private sendSubscribe(entityType: string, entityId: string): void {
this.ws?.send(JSON.stringify({
type: 'subscribe',
data: { entityType, entityId }
}));
}
private handleMessage(message: any): void {
if (message.type === 'notification') {
const subscriptionId = `${message.data.entityType}:${message.data.entityId}`;
const subscription = this.subscriptions.get(subscriptionId);
subscription?.onNotification(message.data);
}
}
private handleReconnect(): void {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
setTimeout(() => {
console.log(`Reconnecting... (attempt ${this.reconnectAttempts})`);
this.connect();
}, delay);
}
}
private resubscribeAll(): void {
this.subscriptions.forEach((options) => {
this.sendSubscribe(options.entityType, options.entityId);
});
}
}
React Hook¶
useEntitySubscription.ts
// LBS.Notification.Client/src/hooks/useEntitySubscription.ts
import { useEffect, useRef } from 'react';
import { NotificationClient, Notification } from '../NotificationClient';
export function useEntitySubscription(
client: NotificationClient,
entityType: string,
entityId: string,
onNotification: (notification: Notification) => void
) {
const subscriptionIdRef = useRef<string | null>(null);
useEffect(() => {
subscriptionIdRef.current = client.subscribe({
entityType,
entityId,
onNotification
});
return () => {
if (subscriptionIdRef.current) {
client.unsubscribe(subscriptionIdRef.current);
}
};
}, [client, entityType, entityId, onNotification]);
}
Deployment & Scaling¶
Aspire Integration¶
AspireHost Program.cs
var builder = DistributedApplication.CreateBuilder(args);
var redis = builder.AddRedis("redis");
var notificationService = builder.AddProject<Projects.LBS_NotificationService>("notification-service")
.WithReference(redis)
.WithReplicas(3); // Horizontal scaling
var ballrApi = builder.AddProject<Projects.Ballr_WebApi>("ballr-api")
.WithReference(redis)
.WithReference(notificationService);
builder.Build().Run();
Azure Container Apps Configuration¶
notification-service.bicep
resource notificationService 'Microsoft.App/containerApps@2023-05-01' = {
name: 'lbs-notification-service'
location: location
properties: {
managedEnvironmentId: containerAppEnvironment.id
configuration: {
ingress: {
external: true
targetPort: 8080
transport: 'http'
allowInsecure: false
}
dapr: {
enabled: false
}
secrets: [
{
name: 'redis-connection'
value: redisConnectionString
}
]
}
template: {
containers: [
{
name: 'notification-service'
image: 'lbsfoundry/notification-service:latest'
resources: {
cpu: json('0.5')
memory: '1.0Gi'
}
env: [
{
name: 'Redis__ConnectionString'
secretRef: 'redis-connection'
}
{
name: 'ASPNETCORE_ENVIRONMENT'
value: 'Production'
}
]
}
]
scale: {
minReplicas: 2
maxReplicas: 10
rules: [
{
name: 'http-scaling'
http: {
metadata: {
concurrentRequests: '1000'
}
}
}
]
}
}
}
}
Monitoring & Observability¶
OpenTelemetry Metrics¶
Key Metrics
namespace LBS.NotificationService.Observability;
public class NotificationMetrics
{
private readonly Meter meter;
private readonly Counter<long> connectionsCounter;
private readonly Counter<long> subscriptionsCounter;
private readonly Counter<long> notificationsSentCounter;
private readonly Histogram<double> notificationLatency;
public NotificationMetrics(IMeterFactory meterFactory)
{
this.meter = meterFactory.Create("LBS.NotificationService");
this.connectionsCounter = this.meter.CreateCounter<long>(
"notification.connections",
description: "Total WebSocket connections");
this.subscriptionsCounter = this.meter.CreateCounter<long>(
"notification.subscriptions",
description: "Total entity subscriptions");
this.notificationsSentCounter = this.meter.CreateCounter<long>(
"notification.sent",
description: "Total notifications sent to clients");
this.notificationLatency = this.meter.CreateHistogram<double>(
"notification.latency",
unit: "ms",
description: "Notification delivery latency (PostgreSQL NOTIFY → Client)");
}
public void RecordConnection() => this.connectionsCounter.Add(1);
public void RecordSubscription(string entityType) =>
this.subscriptionsCounter.Add(1, new KeyValuePair<string, object?>("entity_type", entityType));
public void RecordNotificationSent(string payloadType) =>
this.notificationsSentCounter.Add(1, new KeyValuePair<string, object?>("payload_type", payloadType));
public void RecordLatency(double milliseconds) =>
this.notificationLatency.Record(milliseconds);
}
Logging¶
Structured Logging Examples
this.logger.LogInformation(
"Client subscribed to {EntityType}:{EntityId} (ConnectionId: {ConnectionId})",
entityType, entityId, connectionId);
this.logger.LogWarning(
"Subscription limit reached for connection {ConnectionId} ({Count}/{Max})",
connectionId, currentCount, maxSubscriptions);
this.logger.LogError(
ex,
"Failed to deliver notification to {ConnectionId} for {EntityType}:{EntityId}",
connectionId, entityType, entityId);
Testing Strategy¶
Unit Tests¶
[Test]
public async Task PayloadStrategyResolver_DeterminePayloadType_ReturnsFullForRugbyGameState()
{
// Arrange
var options = Options.Create(new PayloadStrategyOptions
{
FullPayloadEntityTypes = new HashSet<string> { "RugbyGameState" }
});
var resolver = new PayloadStrategyResolver(options, null!, null!);
// Act
var payloadType = resolver.DeterminePayloadType("RugbyGameState");
// Assert
Assert.AreEqual(PayloadType.Full, payloadType);
}
Integration Tests¶
[Test]
public async Task WebSocketEndpoint_Subscribe_SendsNotificationOnEntityChange()
{
// Arrange
var client = new WebSocketTestClient(TestServer.CreateHandler());
await client.ConnectAsync();
// Act
await client.SubscribeAsync("Team", "team-123");
// Simulate database change
await TriggerDatabaseChange("mt_doc_team", "team-123");
// Assert
var notification = await client.ReceiveNotificationAsync(timeout: TimeSpan.FromSeconds(5));
Assert.AreEqual("Team", notification.EntityType);
Assert.AreEqual("team-123", notification.EntityId);
}
Load Tests¶
// Using NBomber or similar
var scenario = Scenario.Create("websocket_subscriptions", async context =>
{
var client = new NotificationClient("wss://localhost:8080/ws", authToken);
await client.Connect();
client.Subscribe(new SubscriptionOptions
{
EntityType = "RugbyGameState",
EntityId = "match-456",
OnNotification = (notification) => { /* measure latency */ }
});
return Response.Ok();
})
.WithLoadSimulations(
Simulation.RampingConstant(copies: 10_000, during: TimeSpan.FromMinutes(5))
);
Next Steps¶
- Review and approve this technical design
- Create
LBS.Notification.Abstractionsproject with core interfaces - Implement
LBS.Notification.Redispub/sub provider - Create
LBS.NotificationServicewith WebSocket endpoint - Implement PostgreSQL trigger and Marten integration
- Build TypeScript client SDK
- Add Aspire integration and deployment configs
- Write comprehensive tests (unit, integration, load)
- Deploy to Azure Container Apps (staging)
- Gradual production rollout