Skip to content

Real-Time Notification System - Technical Design

Document Information


Implementation note (current)

This document is the original draft design. The system shipped with different component names; the design body below is retained for context, but treat this note as the source of truth for what exists in code today.

  • WebSocket entry point: WebSocketHandler.cs (src/Services/LBS.NotificationService/Endpoints/).
  • Routing: NotificationRouter.cs (src/Services/LBS.NotificationService/Services/).
  • Payload shaping: PayloadStrategyService.cs (src/Services/LBS.NotificationService/Services/), implementing IPayloadStrategy.
  • Entity-type resolution is done via IEntityTypeResolver (src/Core/LBS.Notification.Abstractions/Interfaces/). There is no IChannelNameProvider type in the codebase; the interface described later in this draft was not implemented.
  • Redis channel naming lives in RedisPubSubProvider.cs (src/Core/LBS.Notification.Redis/). The default ChannelPrefix is "foundry" (see RedisOptions.cs), producing channels of the form foundry:<channel>.

Table of Contents

  1. Architecture Overview
  2. Component Design
  3. Database Integration
  4. WebSocket Protocol
  5. Security & Authorization
  6. Message Routing & Payload Strategy
  7. Client SDK
  8. Deployment & Scaling
  9. Monitoring & Observability
  10. Testing Strategy

Architecture Overview

High-Level Flow

┌─────────────────────────────────────────────────────────────────────────┐
│                          PostgreSQL Databases                            │
│  (Main, Analytics, Reporting, Archive, Ballr)                           │
│                                                                           │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │ Marten Projection Tables (mt_doc_*)                               │  │
│  │                                                                    │  │
│  │  INSERT/UPDATE/DELETE → notify_document_change() trigger          │  │
│  └────────────────────────────────┬─────────────────────────────────┘  │
└────────────────────────────────────┼────────────────────────────────────┘
                                     │ pg_notify('foundry:db_changes', json)
┌─────────────────────────────────────────────────────────────────────────┐
│                     LBS.NotificationService                              │
│                                                                           │
│  ┌──────────────────┐    ┌──────────────────┐    ┌─────────────────┐  │
│  │ PostgreSQL       │    │ Subscription     │    │ WebSocket       │  │
│  │ Listener         │───▶│ Manager          │◀───│ Connection      │  │
│  │ (LISTEN/NOTIFY)  │    │                  │    │ Manager         │  │
│  └──────────────────┘    └────────┬─────────┘    └─────────────────┘  │
│                                    │                                     │
│                                    ↓                                     │
│  ┌────────────────────────────────────────────────────────────────┐   │
│  │              Redis Pub/Sub (IPubSubProvider)                    │   │
│  │                                                                  │   │
│  │  Channels:                                                       │   │
│  │    - foundry:notifications:{EntityType}:{EntityId}               │   │
│  └────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│                      Multiple Service Instances                          │
│              (Horizontal Scaling via Redis Backplane)                    │
│                                                                           │
│  Instance 1         Instance 2         Instance 3                        │
│  WebSocket 1-N      WebSocket N-M      WebSocket M-P                     │
└─────────────────────────────────────────────────────────────────────────┘
                                     ↓ WebSocket (wss://)
┌─────────────────────────────────────────────────────────────────────────┐
│                         React Clients (ballr.live)                       │
│                                                                           │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  LBS.Notification.Client (TypeScript SDK)                         │  │
│  │   - Connection management                                         │  │
│  │   - Auto-reconnection                                             │  │
│  │   - Subscribe/Unsubscribe                                         │  │
│  │   - Message routing to React components                           │  │
│  └──────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────┘

Component Design

Project Structure

src/
├── Core/
│   ├── LBS.Notification.Abstractions/
│   │   ├── Contracts/
│   │   │   ├── INotificationMessage.cs
│   │   │   ├── ISubscriptionRequest.cs
│   │   │   └── INotificationPayload.cs
│   │   ├── IPubSubProvider.cs
│   │   ├── ISubscriptionManager.cs
│   │   ├── IConnectionManager.cs
│   │   ├── IPayloadStrategy.cs
│   │   └── IEntityTypeResolver.cs
│   │
│   └── LBS.Notification.Redis/
│       ├── RedisPubSubProvider.cs
│       ├── RedisBackplaneOptions.cs
│       └── RedisServiceExtensions.cs
├── Services/
│   └── LBS.NotificationService/
│       ├── Program.cs
│       ├── WebSocketEndpoint.cs
│       ├── ConnectionManager.cs
│       ├── SubscriptionManager.cs
│       ├── PostgresNotificationListener.cs
│       ├── PayloadStrategyResolver.cs
│       ├── EntityTypeMapper.cs
│       └── AuthorizationValidator.cs
└── ClientSDK/
    └── LBS.Notification.Client/  (TypeScript)
        ├── src/
        │   ├── NotificationClient.ts
        │   ├── types.ts
        │   ├── reconnection.ts
        │   └── hooks/
        │       ├── useNotification.ts
        │       └── useEntitySubscription.ts
        └── package.json

Core Abstractions

LBS.Notification.Abstractions

IPubSubProvider.cs

namespace LBS.Notification.Abstractions;

/// <summary>
/// Abstraction for pub/sub messaging to enable swappable implementations (Redis, NATS, etc.)
/// </summary>
public interface IPubSubProvider
{
    /// <summary>
    /// Publish a notification to a specific channel
    /// </summary>
    Task PublishAsync(string channel, INotificationMessage message, CancellationToken cancellationToken = default);

    /// <summary>
    /// Subscribe to notifications on a specific channel
    /// </summary>
    Task SubscribeAsync(string channel, Func<INotificationMessage, Task> handler, CancellationToken cancellationToken = default);

    /// <summary>
    /// Unsubscribe from a specific channel
    /// </summary>
    Task UnsubscribeAsync(string channel, CancellationToken cancellationToken = default);
}

/// <summary>
/// Factory for creating pub/sub channels based on entity type and ID
/// </summary>
public interface IChannelNameProvider
{
    string GetChannelName(string entityType, string entityId);
    string GetDatabaseChangeChannel();
}

INotificationMessage.cs

namespace LBS.Notification.Abstractions.Contracts;

public interface INotificationMessage
{
    string Type { get; }
    string Id { get; }
    string ChangeType { get; }
    PayloadType PayloadType { get; }
    object? Data { get; }
    DateTimeOffset Timestamp { get; }
}

public static class PayloadType
{
    public const string Minimal = "minimal";
    public const string Full = "full";

    public static readonly string[] All = { Minimal, Full };
    public static bool IsValid(string payloadType) => All.Contains(payloadType);
}

public static class ChangeType
{
    public const string Created = "created";
    public const string Updated = "updated";
    public const string Deleted = "deleted";

    public static readonly string[] All = { Created, Updated, Deleted };
    public static bool IsValid(string changeType) => All.Contains(changeType);
}

public record NotificationMessage : INotificationMessage
{
    public string Type { get; init; } = string.Empty;
    public string Id { get; init; } = string.Empty;
    public string ChangeType { get; init; } = ChangeType.Updated;
    public string PayloadType { get; init; } = PayloadType.Minimal;
    public object? Data { get; init; }
    public DateTimeOffset Timestamp { get; init; } = DateTimeOffset.UtcNow;
}

ISubscriptionRequest.cs

namespace LBS.Notification.Abstractions.Contracts;

/// <summary>
/// Client request to subscribe to entity changes
/// </summary>
public interface ISubscriptionRequest
{
    string Type { get; }
    string Id { get; }
    string? SubscriptionId { get; }
}

public record SubscriptionRequest : ISubscriptionRequest
{
    public string Type { get; init; } = string.Empty;
    public string Id { get; init; } = string.Empty;
    public string? SubscriptionId { get; init; }
}

/// <summary>
/// Client request to unsubscribe from entity changes
/// </summary>
public interface IUnsubscribeRequest
{
    string? SubscriptionId { get; }
    string? Type { get; }
    string? Id { get; }
}

public record UnsubscribeRequest : IUnsubscribeRequest
{
    public string? SubscriptionId { get; init; }
    public string? Type { get; init; }
    public string? Id { get; init; }
}

ISubscriptionManager.cs

namespace LBS.Notification.Abstractions;

/// <summary>
/// Manages client subscriptions to entity changes
/// </summary>
public interface ISubscriptionManager
{
    /// <summary>
    /// Add a subscription for a connection
    /// </summary>
    Task<SubscriptionResult> SubscribeAsync(
        string connectionId,
        ISubscriptionRequest request,
        ClaimsPrincipal user,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Remove a specific subscription
    /// </summary>
    Task UnsubscribeAsync(string connectionId, IUnsubscribeRequest request, CancellationToken cancellationToken = default);

    /// <summary>
    /// Remove all subscriptions for a connection
    /// </summary>
    Task UnsubscribeAllAsync(string connectionId, CancellationToken cancellationToken = default);

    /// <summary>
    /// Get all connections subscribed to a specific entity
    /// </summary>
    IEnumerable<string> GetSubscribers(string entityType, string entityId);

    /// <summary>
    /// Get subscription count for a connection
    /// </summary>
    int GetSubscriptionCount(string connectionId);
}

public record SubscriptionResult
{
    public bool Success { get; init; }
    public string? SubscriptionId { get; init; }
    public string? ErrorMessage { get; init; }

    public static SubscriptionResult Succeeded(string subscriptionId) =>
        new() { Success = true, SubscriptionId = subscriptionId };

    public static SubscriptionResult Failed(string errorMessage) =>
        new() { Success = false, ErrorMessage = errorMessage };
}

IPayloadStrategy.cs

namespace LBS.Notification.Abstractions;

/// <summary>
/// Determines whether to send full payload or minimal notification based on entity type
/// </summary>
public interface IPayloadStrategy
{
    /// <summary>
    /// Determine payload type for an entity
    /// </summary>
    string DeterminePayloadType(string entityType);

    /// <summary>
    /// Fetch full entity data for full payload notifications
    /// </summary>
    Task<object?> FetchEntityDataAsync(string entityType, string entityId, CancellationToken cancellationToken = default);
}

/// <summary>
/// Configuration for payload strategy
/// </summary>
public class PayloadStrategyOptions
{
    /// <summary>
    /// Entity types that should use full payload (e.g., "RugbyGameState", "Fixture")
    /// </summary>
    public HashSet<string> FullPayloadEntityTypes { get; set; } = new();

    /// <summary>
    /// Default payload type for unlisted entities
    /// </summary>
    public string DefaultPayloadType { get; set; } = PayloadType.Minimal;
}

IEntityTypeResolver.cs

namespace LBS.Notification.Abstractions;

/// <summary>
/// Maps between database table names and domain entity types
/// </summary>
public interface IEntityTypeResolver
{
    /// <summary>
    /// Get entity type from Marten table name (e.g., "mt_doc_team" → "Team")
    /// </summary>
    string? ResolveEntityType(string tableName);

    /// <summary>
    /// Get table name from entity type (e.g., "Team" → "mt_doc_team")
    /// </summary>
    string? ResolveTableName(string entityType);

    /// <summary>
    /// Get CLR type from entity type string
    /// </summary>
    Type? ResolveClrType(string entityType);
}


Database Integration

PostgreSQL Trigger Function

Migration: Create Notification Trigger Function

-- Migration: 202510_CreateNotificationTrigger.sql

-- Create notification function
CREATE OR REPLACE FUNCTION notify_document_change()
RETURNS TRIGGER AS $$
DECLARE
    notification json;
    entity_id text;
BEGIN
    -- Extract the document ID from the 'id' column
    entity_id := COALESCE(NEW.id::text, OLD.id::text);

    -- Build notification payload
    notification := json_build_object(
        'table', TG_TABLE_NAME,
        'id', entity_id,
        'operation', TG_OP,
        'timestamp', CURRENT_TIMESTAMP
    );

    -- Send notification
    PERFORM pg_notify('foundry:db_changes', notification::text);

    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create trigger installation helper function
CREATE OR REPLACE FUNCTION install_notification_trigger_on_table(table_name text)
RETURNS void AS $$
BEGIN
    EXECUTE format('
        CREATE TRIGGER notify_change_trigger
        AFTER INSERT OR UPDATE OR DELETE ON %I
        FOR EACH ROW
        EXECUTE FUNCTION notify_document_change();
    ', table_name);
END;
$$ LANGUAGE plpgsql;

-- Install on all existing Marten projection tables
DO $$
DECLARE
    table_record RECORD;
BEGIN
    FOR table_record IN
        SELECT tablename
        FROM pg_tables
        WHERE schemaname = 'public'
        AND tablename LIKE 'mt_doc_%'
    LOOP
        PERFORM install_notification_trigger_on_table(table_record.tablename);
    END LOOP;
END $$;

Marten Integration Module

NotificationMartenModule.cs

namespace LBS.NotificationService.Data;

public class NotificationMartenModule : IConfigureMarten
{
    public void Configure(IServiceProvider services, StoreOptions options)
    {
        // Ensure notification trigger is installed when Marten creates new projection tables
        options.Events.OnDocumentCreated += (store, mapping) =>
        {
            if (mapping.TableName.Value.StartsWith("mt_doc_"))
            {
                this.InstallNotificationTrigger(store, mapping.TableName.Value);
            }
        };
    }

    private void InstallNotificationTrigger(IDocumentStore store, string tableName)
    {
        using var connection = store.Storage.Database.CreateConnection();
        connection.Open();

        using var command = connection.CreateCommand();
        command.CommandText = $"SELECT install_notification_trigger_on_table('{tableName}')";
        command.ExecuteNonQuery();
    }
}


WebSocket Protocol

Message Types

Client → Server Messages

Subscribe

{
  "type": "subscribe",
  "data": {
    "entityType": "Team",
    "entityId": "team-123"
  }
}

Unsubscribe

{
  "type": "unsubscribe",
  "data": {
    "subscriptionId": "sub-abc-123"
  }
}

Ping (Heartbeat)

{
  "type": "ping"
}

Server → Client Messages

Subscription Confirmed

{
  "type": "subscribed",
  "data": {
    "subscriptionId": "sub-abc-123",
    "entityType": "Team",
    "entityId": "team-123"
  }
}

Subscription Error

{
  "type": "subscription_error",
  "data": {
    "entityType": "Team",
    "entityId": "team-123",
    "error": "Unauthorized: User does not have permission to view this entity"
  }
}

Notification - Minimal (Refetch)

{
  "type": "notification",
  "data": {
    "entityType": "Team",
    "entityId": "team-123",
    "changeType": "updated",
    "payloadType": "minimal",
    "timestamp": "2025-10-08T10:30:00Z"
  }
}

Notification - Full Payload

{
  "type": "notification",
  "data": {
    "entityType": "RugbyGameState",
    "entityId": "match-456",
    "changeType": "updated",
    "payloadType": "full",
    "timestamp": "2025-10-08T10:30:15Z",
    "payload": {
      "id": "match-456",
      "homeScore": 21,
      "awayScore": 14,
      "currentPeriod": "Second Half",
      "gameTime": "62:30",
      "lastScorer": "player-789"
    }
  }
}

Pong (Heartbeat Response)

{
  "type": "pong"
}

WebSocket Endpoint Implementation

WebSocketEndpoint.cs

namespace LBS.NotificationService;

public class WebSocketEndpoint
{
    private readonly IConnectionManager connectionManager;
    private readonly ISubscriptionManager subscriptionManager;
    private readonly ILogger<WebSocketEndpoint> logger;

    public WebSocketEndpoint(
        IConnectionManager connectionManager,
        ISubscriptionManager subscriptionManager,
        ILogger<WebSocketEndpoint> logger)
    {
        this.connectionManager = connectionManager;
        this.subscriptionManager = subscriptionManager;
        this.logger = logger;
    }

    public async Task HandleWebSocketAsync(HttpContext context)
    {
        if (!context.WebSockets.IsWebSocketRequest)
        {
            context.Response.StatusCode = StatusCodes.Status400BadRequest;
            return;
        }

        var webSocket = await context.WebSockets.AcceptWebSocketAsync();
        var connectionId = Guid.NewGuid().ToString();
        var user = context.User;

        await this.connectionManager.AddConnectionAsync(connectionId, webSocket, user);

        try
        {
            await this.HandleMessagesAsync(connectionId, webSocket, user);
        }
        finally
        {
            await this.subscriptionManager.UnsubscribeAllAsync(connectionId);
            await this.connectionManager.RemoveConnectionAsync(connectionId);
        }
    }

    private async Task HandleMessagesAsync(string connectionId, WebSocket webSocket, ClaimsPrincipal user)
    {
        var buffer = new byte[1024 * 4];

        while (webSocket.State == WebSocketState.Open)
        {
            var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

            if (result.MessageType == WebSocketMessageType.Close)
            {
                await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closed by client", CancellationToken.None);
                break;
            }

            if (result.MessageType == WebSocketMessageType.Text)
            {
                var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
                await this.ProcessClientMessageAsync(connectionId, message, user);
            }
        }
    }

    private async Task ProcessClientMessageAsync(string connectionId, string message, ClaimsPrincipal user)
    {
        // Parse and route client messages (subscribe, unsubscribe, ping)
        // Implementation details in full service code
    }
}


Security & Authorization

Authorization Validation

AuthorizationValidator.cs

namespace LBS.NotificationService;

/// <summary>
/// Validates subscription requests against existing query authorization model
/// </summary>
public class AuthorizationValidator
{
    private readonly IQueryService queryService;
    private readonly IEntityTypeResolver entityTypeResolver;
    private readonly ILogger<AuthorizationValidator> logger;

    public AuthorizationValidator(
        IQueryService queryService,
        IEntityTypeResolver entityTypeResolver,
        ILogger<AuthorizationValidator> logger)
    {
        this.queryService = queryService;
        this.entityTypeResolver = entityTypeResolver;
        this.logger = logger;
    }

    /// <summary>
    /// Validate if user has permission to subscribe to an entity
    /// Uses existing ISecureQuery / IPublicQuery authorization logic
    /// </summary>
    public async Task<AuthorizationResult> ValidateSubscriptionAsync(
        string entityType,
        string entityId,
        ClaimsPrincipal user,
        CancellationToken cancellationToken = default)
    {
        try
        {
            // Resolve entity CLR type
            var clrType = this.entityTypeResolver.ResolveClrType(entityType);
            if (clrType == null)
            {
                return AuthorizationResult.Failed($"Unknown entity type: {entityType}");
            }

            // Create a minimal query to test authorization
            // This leverages existing ISecureQuery / IPublicQuery permission checks
            var testQuery = this.CreateAuthorizationTestQuery(entityType, entityId, user);

            // Attempt to execute query - if it throws SecurityException, user lacks permission
            var result = await this.queryService.ExecuteAsync(testQuery, cancellationToken);

            return AuthorizationResult.Succeeded();
        }
        catch (SecurityAccessDeniedException ex)
        {
            this.logger.LogWarning(ex, "Authorization failed for {EntityType}:{EntityId}", entityType, entityId);
            return AuthorizationResult.Failed("Unauthorized to view this entity");
        }
        catch (Exception ex)
        {
            this.logger.LogError(ex, "Authorization validation error for {EntityType}:{EntityId}", entityType, entityId);
            return AuthorizationResult.Failed("Authorization validation failed");
        }
    }

    private IQuery CreateAuthorizationTestQuery(string entityType, string entityId, ClaimsPrincipal user)
    {
        // Create a minimal query that will trigger existing authorization checks
        // Implementation depends on your query infrastructure
        throw new NotImplementedException();
    }
}

public record AuthorizationResult
{
    public bool Success { get; init; }
    public string? ErrorMessage { get; init; }

    public static AuthorizationResult Succeeded() => new() { Success = true };
    public static AuthorizationResult Failed(string errorMessage) => new() { Success = false, ErrorMessage = errorMessage };
}


Message Routing & Payload Strategy

Payload Strategy Resolver

PayloadStrategyResolver.cs

namespace LBS.NotificationService;

public class PayloadStrategyResolver : IPayloadStrategy
{
    private readonly PayloadStrategyOptions options;
    private readonly IQueryService queryService;
    private readonly ILogger<PayloadStrategyResolver> logger;

    public PayloadStrategyResolver(
        IOptions<PayloadStrategyOptions> options,
        IQueryService queryService,
        ILogger<PayloadStrategyResolver> logger)
    {
        this.options = options.Value;
        this.queryService = queryService;
        this.logger = logger;
    }

    public string DeterminePayloadType(string entityType)
    {
        return this.options.FullPayloadEntityTypes.Contains(entityType)
            ? PayloadType.Full
            : PayloadType.Minimal;
    }

    public async Task<object?> FetchEntityDataAsync(
        string entityType,
        string entityId,
        CancellationToken cancellationToken = default)
    {
        try
        {
            // Use system context to fetch data (already authorized at subscription time)
            using (UserContext.RunAsSystem())
            {
                var query = this.CreateFetchQuery(entityType, entityId);
                return await this.queryService.ExecuteAsync(query, cancellationToken);
            }
        }
        catch (Exception ex)
        {
            this.logger.LogError(ex, "Failed to fetch entity data for {EntityType}:{EntityId}", entityType, entityId);
            return null;
        }
    }

    private IQuery CreateFetchQuery(string entityType, string entityId)
    {
        // Create appropriate query based on entity type
        // Implementation depends on your query infrastructure
        throw new NotImplementedException();
    }
}

Configuration

appsettings.json

{
  "PayloadStrategy": {
    "FullPayloadEntityTypes": [
      "RugbyGameState",
      "Fixture",
      "NrlSuperCoachPlayerStats"
    ],
    "DefaultPayloadType": "minimal"
  },
  "Subscription": {
    "MaxSubscriptionsPerConnection": 100,
    "SubscriptionTimeoutSeconds": 300
  },
  "WebSocket": {
    "PingIntervalSeconds": 30,
    "ConnectionTimeoutSeconds": 3600
  },
  "Redis": {
    "ConnectionString": "localhost:6379",
    "ChannelPrefix": "foundry:notifications"
  }
}


Client SDK

TypeScript Client Implementation

NotificationClient.ts

// LBS.Notification.Client/src/NotificationClient.ts

export interface SubscriptionOptions {
  entityType: string;
  entityId: string;
  onNotification: (notification: Notification) => void;
  onError?: (error: Error) => void;
}

export interface Notification {
  entityType: string;
  entityId: string;
  changeType: 'created' | 'updated' | 'deleted';
  payloadType: 'minimal' | 'full';
  timestamp: string;
  payload?: any;
}

export class NotificationClient {
  private ws: WebSocket | null = null;
  private subscriptions: Map<string, SubscriptionOptions> = new Map();
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 10;
  private reconnectDelay = 1000;

  constructor(
    private url: string,
    private authToken?: string
  ) {}

  async connect(): Promise<void> {
    const wsUrl = this.authToken
      ? `${this.url}?token=${this.authToken}`
      : this.url;

    this.ws = new WebSocket(wsUrl);

    this.ws.onopen = () => {
      console.log('NotificationClient connected');
      this.reconnectAttempts = 0;
      this.resubscribeAll();
    };

    this.ws.onmessage = (event) => {
      this.handleMessage(JSON.parse(event.data));
    };

    this.ws.onclose = () => {
      console.log('NotificationClient disconnected');
      this.handleReconnect();
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };
  }

  subscribe(options: SubscriptionOptions): string {
    const subscriptionId = `${options.entityType}:${options.entityId}`;
    this.subscriptions.set(subscriptionId, options);

    if (this.ws?.readyState === WebSocket.OPEN) {
      this.sendSubscribe(options.entityType, options.entityId);
    }

    return subscriptionId;
  }

  unsubscribe(subscriptionId: string): void {
    const options = this.subscriptions.get(subscriptionId);
    if (options && this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({
        type: 'unsubscribe',
        data: { subscriptionId }
      }));
    }
    this.subscriptions.delete(subscriptionId);
  }

  disconnect(): void {
    this.ws?.close();
    this.subscriptions.clear();
  }

  private sendSubscribe(entityType: string, entityId: string): void {
    this.ws?.send(JSON.stringify({
      type: 'subscribe',
      data: { entityType, entityId }
    }));
  }

  private handleMessage(message: any): void {
    if (message.type === 'notification') {
      const subscriptionId = `${message.data.entityType}:${message.data.entityId}`;
      const subscription = this.subscriptions.get(subscriptionId);
      subscription?.onNotification(message.data);
    }
  }

  private handleReconnect(): void {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);

      setTimeout(() => {
        console.log(`Reconnecting... (attempt ${this.reconnectAttempts})`);
        this.connect();
      }, delay);
    }
  }

  private resubscribeAll(): void {
    this.subscriptions.forEach((options) => {
      this.sendSubscribe(options.entityType, options.entityId);
    });
  }
}

React Hook

useEntitySubscription.ts

// LBS.Notification.Client/src/hooks/useEntitySubscription.ts

import { useEffect, useRef } from 'react';
import { NotificationClient, Notification } from '../NotificationClient';

export function useEntitySubscription(
  client: NotificationClient,
  entityType: string,
  entityId: string,
  onNotification: (notification: Notification) => void
) {
  const subscriptionIdRef = useRef<string | null>(null);

  useEffect(() => {
    subscriptionIdRef.current = client.subscribe({
      entityType,
      entityId,
      onNotification
    });

    return () => {
      if (subscriptionIdRef.current) {
        client.unsubscribe(subscriptionIdRef.current);
      }
    };
  }, [client, entityType, entityId, onNotification]);
}


Deployment & Scaling

Aspire Integration

AspireHost Program.cs

var builder = DistributedApplication.CreateBuilder(args);

var redis = builder.AddRedis("redis");

var notificationService = builder.AddProject<Projects.LBS_NotificationService>("notification-service")
    .WithReference(redis)
    .WithReplicas(3); // Horizontal scaling

var ballrApi = builder.AddProject<Projects.Ballr_WebApi>("ballr-api")
    .WithReference(redis)
    .WithReference(notificationService);

builder.Build().Run();

Azure Container Apps Configuration

notification-service.bicep

resource notificationService 'Microsoft.App/containerApps@2023-05-01' = {
  name: 'lbs-notification-service'
  location: location
  properties: {
    managedEnvironmentId: containerAppEnvironment.id
    configuration: {
      ingress: {
        external: true
        targetPort: 8080
        transport: 'http'
        allowInsecure: false
      }
      dapr: {
        enabled: false
      }
      secrets: [
        {
          name: 'redis-connection'
          value: redisConnectionString
        }
      ]
    }
    template: {
      containers: [
        {
          name: 'notification-service'
          image: 'lbsfoundry/notification-service:latest'
          resources: {
            cpu: json('0.5')
            memory: '1.0Gi'
          }
          env: [
            {
              name: 'Redis__ConnectionString'
              secretRef: 'redis-connection'
            }
            {
              name: 'ASPNETCORE_ENVIRONMENT'
              value: 'Production'
            }
          ]
        }
      ]
      scale: {
        minReplicas: 2
        maxReplicas: 10
        rules: [
          {
            name: 'http-scaling'
            http: {
              metadata: {
                concurrentRequests: '1000'
              }
            }
          }
        ]
      }
    }
  }
}


Monitoring & Observability

OpenTelemetry Metrics

Key Metrics

namespace LBS.NotificationService.Observability;

public class NotificationMetrics
{
    private readonly Meter meter;
    private readonly Counter<long> connectionsCounter;
    private readonly Counter<long> subscriptionsCounter;
    private readonly Counter<long> notificationsSentCounter;
    private readonly Histogram<double> notificationLatency;

    public NotificationMetrics(IMeterFactory meterFactory)
    {
        this.meter = meterFactory.Create("LBS.NotificationService");

        this.connectionsCounter = this.meter.CreateCounter<long>(
            "notification.connections",
            description: "Total WebSocket connections");

        this.subscriptionsCounter = this.meter.CreateCounter<long>(
            "notification.subscriptions",
            description: "Total entity subscriptions");

        this.notificationsSentCounter = this.meter.CreateCounter<long>(
            "notification.sent",
            description: "Total notifications sent to clients");

        this.notificationLatency = this.meter.CreateHistogram<double>(
            "notification.latency",
            unit: "ms",
            description: "Notification delivery latency (PostgreSQL NOTIFY → Client)");
    }

    public void RecordConnection() => this.connectionsCounter.Add(1);
    public void RecordSubscription(string entityType) =>
        this.subscriptionsCounter.Add(1, new KeyValuePair<string, object?>("entity_type", entityType));
    public void RecordNotificationSent(string payloadType) =>
        this.notificationsSentCounter.Add(1, new KeyValuePair<string, object?>("payload_type", payloadType));
    public void RecordLatency(double milliseconds) =>
        this.notificationLatency.Record(milliseconds);
}

Logging

Structured Logging Examples

this.logger.LogInformation(
    "Client subscribed to {EntityType}:{EntityId} (ConnectionId: {ConnectionId})",
    entityType, entityId, connectionId);

this.logger.LogWarning(
    "Subscription limit reached for connection {ConnectionId} ({Count}/{Max})",
    connectionId, currentCount, maxSubscriptions);

this.logger.LogError(
    ex,
    "Failed to deliver notification to {ConnectionId} for {EntityType}:{EntityId}",
    connectionId, entityType, entityId);


Testing Strategy

Unit Tests

[Test]
public async Task PayloadStrategyResolver_DeterminePayloadType_ReturnsFullForRugbyGameState()
{
    // Arrange
    var options = Options.Create(new PayloadStrategyOptions
    {
        FullPayloadEntityTypes = new HashSet<string> { "RugbyGameState" }
    });
    var resolver = new PayloadStrategyResolver(options, null!, null!);

    // Act
    var payloadType = resolver.DeterminePayloadType("RugbyGameState");

    // Assert
    Assert.AreEqual(PayloadType.Full, payloadType);
}

Integration Tests

[Test]
public async Task WebSocketEndpoint_Subscribe_SendsNotificationOnEntityChange()
{
    // Arrange
    var client = new WebSocketTestClient(TestServer.CreateHandler());
    await client.ConnectAsync();

    // Act
    await client.SubscribeAsync("Team", "team-123");

    // Simulate database change
    await TriggerDatabaseChange("mt_doc_team", "team-123");

    // Assert
    var notification = await client.ReceiveNotificationAsync(timeout: TimeSpan.FromSeconds(5));
    Assert.AreEqual("Team", notification.EntityType);
    Assert.AreEqual("team-123", notification.EntityId);
}

Load Tests

// Using NBomber or similar
var scenario = Scenario.Create("websocket_subscriptions", async context =>
{
    var client = new NotificationClient("wss://localhost:8080/ws", authToken);
    await client.Connect();

    client.Subscribe(new SubscriptionOptions
    {
        EntityType = "RugbyGameState",
        EntityId = "match-456",
        OnNotification = (notification) => { /* measure latency */ }
    });

    return Response.Ok();
})
.WithLoadSimulations(
    Simulation.RampingConstant(copies: 10_000, during: TimeSpan.FromMinutes(5))
);

Next Steps

  1. Review and approve this technical design
  2. Create LBS.Notification.Abstractions project with core interfaces
  3. Implement LBS.Notification.Redis pub/sub provider
  4. Create LBS.NotificationService with WebSocket endpoint
  5. Implement PostgreSQL trigger and Marten integration
  6. Build TypeScript client SDK
  7. Add Aspire integration and deployment configs
  8. Write comprehensive tests (unit, integration, load)
  9. Deploy to Azure Container Apps (staging)
  10. Gradual production rollout

References