Enterprise Architecture (Outbox + Kafka + SQL Server)

🏗 Final Solution Structure

KafkaEnterpriseDemo/

├── docker-compose.yml

├── Kafka.Shared/

   ├── Events/

      └── OrderCreatedEvent.cs

   └── Contracts/

       └── KafkaTopics.cs

├── Kafka.ProducerApi/

   ├── Controllers/

      └── OrdersController.cs

   ├── Application/

      ├── Services/

         └── OrderService.cs

      └── Interfaces/

          └── IOrderService.cs

   ├── Infrastructure/

      ├── Data/

         ├── AppDbContext.cs

         └── Migrations/

      ├── Outbox/

         ├── OutboxMessage.cs

         ├── OutboxStatus.cs

         └── OutboxRelayWorker.cs

      └── Kafka/

          ├── IKafkaProducer.cs

          └── KafkaProducer.cs

   ├── appsettings.json

   └── Program.cs

└── Kafka.ConsumerWorker/

    ├── Workers/

       └── OrderConsumerWorker.cs

    ├── Services/

       └── DlqProducer.cs

    ├── appsettings.json

    └── Program.cs


🐳 Step 1: Docker Compose (Kafka + Schema Registry + SQL Server)

docker-compose.yml

version: '3.8'

 

services:

  zookeeper:

    image: confluentinc/cp-zookeeper:latest

    environment:

      ZOOKEEPER_CLIENT_PORT: 2181

 

  kafka:

    image: confluentinc/cp-kafka:latest

    depends_on:

      - zookeeper

    ports:

      - "9092:9092"

    environment:

      KAFKA_BROKER_ID: 1

      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

 

  schema-registry:

    image: confluentinc/cp-schema-registry:latest

    depends_on:

      - kafka

    ports:

      - "8081:8081"

    environment:

      SCHEMA_REGISTRY_HOST_NAME: schema-registry

      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092

      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

 

  sqlserver:

    image: mcr.microsoft.com/mssql/server:2022-latest

    container_name: kafka_sqlserver

    ports:

      - "1433:1433"

    environment:

      ACCEPT_EULA: "Y"

      SA_PASSWORD: "YourStrong@Password123"

Run:

docker-compose up -d


🧱 Step 2: Install Required Packages (Producer API)

Inside Kafka.ProducerApi project install:

dotnet add package Confluent.Kafka

dotnet add package Microsoft.EntityFrameworkCore.SqlServer

dotnet add package Microsoft.EntityFrameworkCore.Design


🧾 Step 3: appsettings.json (Producer API)

Kafka.ProducerApi/appsettings.json

{

  "ConnectionStrings": {

    "DefaultConnection": "Server=localhost,1433;Database=KafkaOutboxDb;User Id=sa;Password=YourStrong@Password123;TrustServerCertificate=True"

  },

  "Kafka": {

    "BootstrapServers": "localhost:9092",

    "OrdersTopic": "orders-topic"

  },

  "Logging": {

    "LogLevel": {

      "Default": "Information"

    }

  }

}


🗃 Step 4: Create EF Core DbContext

Infrastructure/Data/AppDbContext.cs

using Kafka.ProducerApi.Infrastructure.Outbox;

using Microsoft.EntityFrameworkCore;

 

public class AppDbContext : DbContext

{

    public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }

 

    public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();

 

    protected override void OnModelCreating(ModelBuilder modelBuilder)

    {

        modelBuilder.Entity<OutboxMessage>(entity =>

        {

            entity.HasKey(x => x.Id);

            entity.Property(x => x.Topic).IsRequired().HasMaxLength(200);

            entity.Property(x => x.Payload).IsRequired();

            entity.Property(x => x.Status).IsRequired();

            entity.Property(x => x.CreatedAtUtc).IsRequired();

        });

    }

}


📦 Step 5: Outbox Entity + Status

Infrastructure/Outbox/OutboxStatus.cs

public enum OutboxStatus

{

    Pending = 0,

    Processed = 1,

    Failed = 2

}

Infrastructure/Outbox/OutboxMessage.cs

public class OutboxMessage

{

    public Guid Id { get; set; } = Guid.NewGuid();

    public string Topic { get; set; } = string.Empty;

    public string Payload { get; set; } = string.Empty;

 

    public OutboxStatus Status { get; set; } = OutboxStatus.Pending;

 

    public int RetryCount { get; set; } = 0;

 

    public string? LastError { get; set; }

 

    public DateTime CreatedAtUtc { get; set; } = DateTime.UtcNow;

    public DateTime? ProcessedAtUtc { get; set; }

}


🧠 Step 6: Kafka Producer (Singleton)

Infrastructure/Kafka/IKafkaProducer.cs

public interface IKafkaProducer

{

    Task ProduceAsync(string topic, string message);

}

Infrastructure/Kafka/KafkaProducer.cs

using Confluent.Kafka;

 

public class KafkaProducer : IKafkaProducer

{

    private readonly IProducer<Null, string> _producer;

 

    public KafkaProducer(IConfiguration configuration)

    {

        var config = new ProducerConfig

        {

            BootstrapServers = configuration["Kafka:BootstrapServers"],

            Acks = Acks.All,

            EnableIdempotence = true

        };

 

        _producer = new ProducerBuilder<Null, string>(config).Build();

    }

 

    public async Task ProduceAsync(string topic, string message)

    {

        await _producer.ProduceAsync(topic, new Message<Null, string>

        {

            Value = message

        });

    }

}


🧾 Step 7: Business Event Contract

Kafka.Shared/Events/OrderCreatedEvent.cs

public class OrderCreatedEvent

{

    public Guid OrderId { get; set; }

    public string CustomerName { get; set; } = string.Empty;

    public decimal Amount { get; set; }

    public DateTime CreatedAtUtc { get; set; }

}


🏦 Step 8: Order Service (Writes Outbox)

Application/Interfaces/IOrderService.cs

public interface IOrderService

{

    Task<Guid> CreateOrderAsync(string customerName, decimal amount);

}

Application/Services/OrderService.cs

using System.Text.Json;

 

public class OrderService : IOrderService

{

    private readonly AppDbContext _db;

    private readonly IConfiguration _configuration;

 

    public OrderService(AppDbContext db, IConfiguration configuration)

    {

        _db = db;

        _configuration = configuration;

    }

 

    public async Task<Guid> CreateOrderAsync(string customerName, decimal amount)

    {

        // In real app: save Order table too (not included here)

        var orderId = Guid.NewGuid();

 

        var evt = new OrderCreatedEvent

        {

            OrderId = orderId,

            CustomerName = customerName,

            Amount = amount,

            CreatedAtUtc = DateTime.UtcNow

        };

 

        var topic = _configuration["Kafka:OrdersTopic"]!;

 

        var outbox = new OutboxMessage

        {

            Topic = topic,

            Payload = JsonSerializer.Serialize(evt),

            Status = OutboxStatus.Pending

        };

 

        _db.OutboxMessages.Add(outbox);

        await _db.SaveChangesAsync();

 

        return orderId;

    }

}


🔁 Step 9: Outbox Relay Worker (Publishes to Kafka)

This is the real magic.

Infrastructure/Outbox/OutboxRelayWorker.cs

using Microsoft.EntityFrameworkCore;

 

public class OutboxRelayWorker : BackgroundService

{

    private readonly IServiceProvider _serviceProvider;

    private readonly ILogger<OutboxRelayWorker> _logger;

 

    public OutboxRelayWorker(IServiceProvider serviceProvider, ILogger<OutboxRelayWorker> logger)

    {

        _serviceProvider = serviceProvider;

        _logger = logger;

    }

 

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    {

        _logger.LogInformation("Outbox Relay Worker started...");

 

        while (!stoppingToken.IsCancellationRequested)

        {

            try

            {

                using var scope = _serviceProvider.CreateScope();

 

                var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();

                var kafka = scope.ServiceProvider.GetRequiredService<IKafkaProducer>();

 

                var pendingMessages = await db.OutboxMessages

                    .Where(x => x.Status == OutboxStatus.Pending && x.RetryCount < 5)

                    .OrderBy(x => x.CreatedAtUtc)

                    .Take(20)

                    .ToListAsync(stoppingToken);

 

                foreach (var msg in pendingMessages)

                {

                    try

                    {

                        await kafka.ProduceAsync(msg.Topic, msg.Payload);

 

                        msg.Status = OutboxStatus.Processed;

                        msg.ProcessedAtUtc = DateTime.UtcNow;

                        msg.LastError = null;

                    }

                    catch (Exception ex)

                    {

                        msg.RetryCount++;

                        msg.LastError = ex.Message;

 

                        if (msg.RetryCount >= 5)

                            msg.Status = OutboxStatus.Failed;

                    }

                }

 

                await db.SaveChangesAsync(stoppingToken);

            }

            catch (Exception ex)

            {

                _logger.LogError(ex, "Outbox Relay Worker loop failed.");

            }

 

            await Task.Delay(2000, stoppingToken);

        }

    }

}


🌐 Step 10: OrdersController (Calls Service)

Controllers/OrdersController.cs

using Microsoft.AspNetCore.Mvc;

 

[ApiController]

[Route("api/[controller]")]

public class OrdersController : ControllerBase

{

    private readonly IOrderService _orderService;

 

    public OrdersController(IOrderService orderService)

    {

        _orderService = orderService;

    }

 

    [HttpPost]

    public async Task<IActionResult> CreateOrder([FromBody] OrderRequest request)

    {

        var id = await _orderService.CreateOrderAsync(request.CustomerName, request.Amount);

        return Ok(new { OrderId = id, Message = "Order saved + Outbox message created!" });

    }

}

 

public record OrderRequest(string CustomerName, decimal Amount);


✅ Step 11: Program.cs (DI Setup)

Kafka.ProducerApi/Program.cs

using Microsoft.EntityFrameworkCore;

 

var builder = WebApplication.CreateBuilder(args);

 

builder.Services.AddControllers();

builder.Services.AddSwaggerGen();

 

builder.Services.AddDbContext<AppDbContext>(options =>

{

    options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"));

});

 

// Kafka Producer as Singleton

builder.Services.AddSingleton<IKafkaProducer, KafkaProducer>();

 

// Outbox Relay Worker

builder.Services.AddHostedService<OutboxRelayWorker>();

 

// Business service

builder.Services.AddScoped<IOrderService, OrderService>();

 

var app = builder.Build();

 

app.UseSwagger();

app.UseSwaggerUI();

 

app.MapControllers();

app.Run();


🛠 Step 12: EF Core Migration

Run inside Kafka.ProducerApi:

dotnet ef migrations add InitOutbox

dotnet ef database update


✅ What You Achieved (Enterprise Level)

Now your system is:

✅ Reliable (Outbox prevents message loss)
✅ Scalable (Kafka handles partitions)
✅ Production style (worker relay pattern)
✅ Database-safe (no dual write inconsistency)
✅ Retry supported
✅ Ready for DLQ + Schema Registry

 

Comments

Popular posts from this blog

Debouncing & Throttling in RxJS: Optimizing API Calls and User Interactions

Promises in Angular

Comprehensive Guide to C# and .NET Core OOP Concepts and Language Features