Enterprise Kafka + .NET Core implementation

Enterprise Architecture (What we are building)

Order API (Producer)

  • Creates Order in SQL
  • Writes Outbox event in same DB transaction
  • Background job publishes Outbox to Kafka

Inventory Worker (Consumer)

  • Consumes order-created
  • Processes message
  • Uses idempotency table
  • On failure → retries
  • After retries → sends to order-created-dlt

1️ Docker Compose (Kafka + Zookeeper + SQL Server)

📌 docker-compose.yml

version: "3.8"

 

services:

  zookeeper:

    image: confluentinc/cp-zookeeper:7.5.0

    environment:

      ZOOKEEPER_CLIENT_PORT: 2181

      ZOOKEEPER_TICK_TIME: 2000

    ports:

      - "2181:2181"

 

  kafka:

    image: confluentinc/cp-kafka:7.5.0

    depends_on:

      - zookeeper

    ports:

      - "9092:9092"

    environment:

      KAFKA_BROKER_ID: 1

      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT

      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

 

  sqlserver:

    image: mcr.microsoft.com/mssql/server:2022-latest

    environment:

      SA_PASSWORD: "YourStrong@Passw0rd"

      ACCEPT_EULA: "Y"

    ports:

      - "1433:1433"


2️ SQL Server + EF Core Entities

✅ Order Entity

public class Order

{

    public Guid Id { get; set; }

    public string CustomerName { get; set; } = default!;

    public decimal TotalAmount { get; set; }

    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;

}


✅ OutboxMessage Entity

public class OutboxMessage

{

    public Guid Id { get; set; }

    public string EventType { get; set; } = default!;

    public string Payload { get; set; } = default!;

    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;

 

    public DateTime? ProcessedAt { get; set; }

    public int RetryCount { get; set; }

    public string Status { get; set; } = "Pending";

    // Pending, Processed, Failed

}


✅ ProcessedMessage Entity (Idempotency)

public class ProcessedMessage

{

    public Guid Id { get; set; } // MessageId

    public DateTime ProcessedAt { get; set; } = DateTime.UtcNow;

}


3️ EF Core DbContext

public class AppDbContext : DbContext

{

    public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }

 

    public DbSet<Order> Orders => Set<Order>();

    public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();

    public DbSet<ProcessedMessage> ProcessedMessages => Set<ProcessedMessage>();

 

    protected override void OnModelCreating(ModelBuilder modelBuilder)

    {

        modelBuilder.Entity<Order>().HasKey(x => x.Id);

        modelBuilder.Entity<OutboxMessage>().HasKey(x => x.Id);

        modelBuilder.Entity<ProcessedMessage>().HasKey(x => x.Id);

    }

}


4️ Kafka Event DTO (OrderCreated)

public record OrderCreatedEvent(

    Guid EventId,

    Guid OrderId,

    string CustomerName,

    decimal TotalAmount,

    DateTime CreatedAt,

    string CorrelationId

);


5️ API Controller (Create Order)

📌 This does NOT publish to Kafka directly
It writes to Outbox safely.

[ApiController]

[Route("api/orders")]

public class OrdersController : ControllerBase

{

    private readonly AppDbContext _db;

 

    public OrdersController(AppDbContext db)

    {

        _db = db;

    }

 

    [HttpPost]

    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)

    {

        var correlationId = HttpContext.TraceIdentifier;

 

        using var trx = await _db.Database.BeginTransactionAsync();

 

        var order = new Order

        {

            Id = Guid.NewGuid(),

            CustomerName = request.CustomerName,

            TotalAmount = request.TotalAmount

        };

 

        _db.Orders.Add(order);

 

        var evt = new OrderCreatedEvent(

            EventId: Guid.NewGuid(),

            OrderId: order.Id,

            CustomerName: order.CustomerName,

            TotalAmount: order.TotalAmount,

            CreatedAt: order.CreatedAt,

            CorrelationId: correlationId

        );

 

        _db.OutboxMessages.Add(new OutboxMessage

        {

            Id = Guid.NewGuid(),

            EventType = "OrderCreated",

            Payload = System.Text.Json.JsonSerializer.Serialize(evt),

            Status = "Pending"

        });

 

        await _db.SaveChangesAsync();

        await trx.CommitAsync();

 

        return Ok(new { order.Id, Message = "Order created. Event stored in outbox." });

    }

}

 

public record CreateOrderRequest(string CustomerName, decimal TotalAmount);


6️ Kafka Producer Setup (Confluent.Kafka)

📌 KafkaProducerFactory.cs

using Confluent.Kafka;

 

public class KafkaProducerFactory

{

    private readonly ProducerConfig _config;

 

    public KafkaProducerFactory(IConfiguration configuration)

    {

        _config = new ProducerConfig

        {

            BootstrapServers = configuration["Kafka:BootstrapServers"],

            Acks = Acks.All,

            EnableIdempotence = true,

            MessageSendMaxRetries = 3,

            RetryBackoffMs = 200

        };

    }

 

    public IProducer<string, string> CreateProducer()

    {

        return new ProducerBuilder<string, string>(_config).Build();

    }

}


7️ Outbox Publisher Background Service

📌 OutboxPublisherWorker.cs

This runs every few seconds:

  • Reads pending outbox messages
  • Publishes to Kafka
  • Marks processed

using Confluent.Kafka;

using Microsoft.EntityFrameworkCore;

 

public class OutboxPublisherWorker : BackgroundService

{

    private readonly IServiceScopeFactory _scopeFactory;

    private readonly KafkaProducerFactory _producerFactory;

    private readonly ILogger<OutboxPublisherWorker> _logger;

 

    public OutboxPublisherWorker(

        IServiceScopeFactory scopeFactory,

        KafkaProducerFactory producerFactory,

        ILogger<OutboxPublisherWorker> logger)

    {

        _scopeFactory = scopeFactory;

        _producerFactory = producerFactory;

        _logger = logger;

    }

 

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    {

        using var producer = _producerFactory.CreateProducer();

 

        while (!stoppingToken.IsCancellationRequested)

        {

            try

            {

                using var scope = _scopeFactory.CreateScope();

                var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();

 

                var messages = await db.OutboxMessages

                    .Where(x => x.Status == "Pending" && x.RetryCount < 5)

                    .OrderBy(x => x.CreatedAt)

                    .Take(10)

                    .ToListAsync(stoppingToken);

 

                foreach (var msg in messages)

                {

                    try

                    {

                        var topic = msg.EventType switch

                        {

                            "OrderCreated" => "order-created",

                            _ => "unknown-topic"

                        };

 

                        var kafkaMessage = new Message<string, string>

                        {

                            Key = msg.Id.ToString(),

                            Value = msg.Payload

                        };

 

                        await producer.ProduceAsync(topic, kafkaMessage, stoppingToken);

 

                        msg.Status = "Processed";

                        msg.ProcessedAt = DateTime.UtcNow;

                    }

                    catch (Exception ex)

                    {

                        msg.RetryCount++;

                        _logger.LogError(ex, "Failed publishing outbox message {Id}", msg.Id);

 

                        if (msg.RetryCount >= 5)

                            msg.Status = "Failed";

                    }

                }

 

                await db.SaveChangesAsync(stoppingToken);

            }

            catch (Exception ex)

            {

                _logger.LogError(ex, "Outbox worker crashed loop.");

            }

 

            await Task.Delay(TimeSpan.FromSeconds(3), stoppingToken);

        }

    }

}


8️ Kafka Consumer Worker (OrderCreated Consumer)

📌 OrderCreatedConsumerWorker.cs

using Confluent.Kafka;

using Microsoft.EntityFrameworkCore;

using System.Text.Json;

 

public class OrderCreatedConsumerWorker : BackgroundService

{

    private readonly IServiceScopeFactory _scopeFactory;

    private readonly IConfiguration _configuration;

    private readonly ILogger<OrderCreatedConsumerWorker> _logger;

 

    public OrderCreatedConsumerWorker(

        IServiceScopeFactory scopeFactory,

        IConfiguration configuration,

        ILogger<OrderCreatedConsumerWorker> logger)

    {

        _scopeFactory = scopeFactory;

        _configuration = configuration;

        _logger = logger;

    }

 

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    {

        var config = new ConsumerConfig

        {

            BootstrapServers = _configuration["Kafka:BootstrapServers"],

            GroupId = "inventory-service",

            AutoOffsetReset = AutoOffsetReset.Earliest,

            EnableAutoCommit = false

        };

 

        using var consumer = new ConsumerBuilder<string, string>(config).Build();

 

        consumer.Subscribe("order-created");

 

        while (!stoppingToken.IsCancellationRequested)

        {

            try

            {

                var result = consumer.Consume(stoppingToken);

 

                if (result?.Message?.Value == null)

                    continue;

 

                var evt = JsonSerializer.Deserialize<OrderCreatedEvent>(result.Message.Value);

 

                if (evt == null)

                    continue;

 

                var success = await ProcessWithRetry(evt, stoppingToken);

 

                if (success)

                {

                    consumer.Commit(result);

                }

                else

                {

                    // Send to DLT (dead letter topic)

                    await SendToDeadLetter(result.Message.Value, stoppingToken);

 

                    // Commit offset to avoid infinite loop

                    consumer.Commit(result);

                }

            }

            catch (ConsumeException ex)

            {

                _logger.LogError(ex, "Kafka consume error");

            }

            catch (Exception ex)

            {

                _logger.LogError(ex, "Consumer loop crashed");

            }

        }

    }

 

    private async Task<bool> ProcessWithRetry(OrderCreatedEvent evt, CancellationToken ct)

    {

        var maxRetry = 3;

 

        for (int attempt = 1; attempt <= maxRetry; attempt++)

        {

            try

            {

                using var scope = _scopeFactory.CreateScope();

                var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();

 

                // Idempotency check

                var alreadyProcessed = await db.ProcessedMessages

                    .AnyAsync(x => x.Id == evt.EventId, ct);

 

                if (alreadyProcessed)

                {

                    _logger.LogInformation("Event already processed: {EventId}", evt.EventId);

                    return true;

                }

 

                // ✅ Business logic here

                _logger.LogInformation("Processing order: {OrderId} attempt {Attempt}", evt.OrderId, attempt);

 

                // Save idempotency marker

                db.ProcessedMessages.Add(new ProcessedMessage

                {

                    Id = evt.EventId

                });

 

                await db.SaveChangesAsync(ct);

 

                return true;

            }

            catch (Exception ex)

            {

                _logger.LogError(ex, "Processing failed attempt {Attempt}", attempt);

 

                await Task.Delay(TimeSpan.FromSeconds(2), ct);

            }

        }

 

        return false;

    }

 

    private async Task SendToDeadLetter(string payload, CancellationToken ct)

    {

        var producerConfig = new ProducerConfig

        {

            BootstrapServers = _configuration["Kafka:BootstrapServers"]

        };

 

        using var producer = new ProducerBuilder<string, string>(producerConfig).Build();

 

        await producer.ProduceAsync("order-created-dlt",

            new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = payload }, ct);

    }

}


9️ appsettings.json

{

  "Kafka": {

    "BootstrapServers": "localhost:9092"

  },

  "ConnectionStrings": {

    "DefaultConnection": "Server=localhost,1433;Database=KafkaDemoDb;User Id=sa;Password=YourStrong@Passw0rd;TrustServerCertificate=True;"

  }

}


🔟 Program.cs (Register Everything)

var builder = WebApplication.CreateBuilder(args);

 

builder.Services.AddControllers();

 

builder.Services.AddDbContext<AppDbContext>(options =>

{

    options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"));

});

 

builder.Services.AddSingleton<KafkaProducerFactory>();

 

// Outbox publisher

builder.Services.AddHostedService<OutboxPublisherWorker>();

 

// Consumer service (usually separate microservice in real world)

builder.Services.AddHostedService<OrderCreatedConsumerWorker>();

 

var app = builder.Build();

app.MapControllers();

app.Run();


✅ What you achieved (Enterprise Ready)

You now have:

✅ EF Core + SQL Server
✅ Outbox Pattern (safe publish)
✅ Kafka Producer (Confluent.Kafka)
✅ Kafka Consumer (BackgroundService)
✅ Retry + Dead Letter Topic
✅ Idempotency (ProcessedMessages table)
✅ Commit offsets after success


🔥 Interview-ready Explanation (Use this)

We implemented Kafka using Confluent.Kafka in .NET Core.
To guarantee consistency between database and Kafka publishing, we used the Outbox pattern with EF Core and SQL Server.
Events are stored in an Outbox table within the same transaction as business data.
A background publisher service publishes events to Kafka and marks them as processed.
Consumers are hosted services with manual offset commit, retry logic, dead-letter topic support, and idempotency checks.

 

Comments

Popular posts from this blog

Debouncing & Throttling in RxJS: Optimizing API Calls and User Interactions

Promises in Angular

Comprehensive Guide to C# and .NET Core OOP Concepts and Language Features