Final Enterprise Project Structure (Kafka + .NET 8)

We will build 2 projects (recommended in microservices):

KafkaDotNetDemo/

├── docker-compose.yml

├── Kafka.ProducerApi/

   ├── Controllers/

      └── OrdersController.cs

   ├── Services/

      ├── IKafkaProducer.cs

      └── KafkaProducer.cs

   ├── appsettings.json

   ├── Program.cs

   └── Kafka.ProducerApi.csproj

└── Kafka.ConsumerWorker/

    ├── Workers/

       └── OrderConsumerWorker.cs

    ├── Services/

       ├── IDlqProducer.cs

       └── DlqProducer.cs

    ├── appsettings.json

    ├── Program.cs

    └── Kafka.ConsumerWorker.csproj


🐳 Step 1: docker-compose.yml (Kafka + Zookeeper + Schema Registry)

Create this file in root folder:

docker-compose.yml

version: '3.8'

 

services:

  zookeeper:

    image: confluentinc/cp-zookeeper:latest

    environment:

      ZOOKEEPER_CLIENT_PORT: 2181

 

  kafka:

    image: confluentinc/cp-kafka:latest

    depends_on:

      - zookeeper

    ports:

      - "9092:9092"

    environment:

      KAFKA_BROKER_ID: 1

      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

 

  schema-registry:

    image: confluentinc/cp-schema-registry:latest

    depends_on:

      - kafka

    ports:

      - "8081:8081"

    environment:

      SCHEMA_REGISTRY_HOST_NAME: schema-registry

      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092

      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Run:

docker-compose up -d


🚀 Step 2: Create Producer API (.NET 8 Web API)

Create project

dotnet new webapi -n Kafka.ProducerApi

cd Kafka.ProducerApi

dotnet add package Confluent.Kafka


✅ Producer API appsettings.json

Kafka.ProducerApi/appsettings.json

{

  "Kafka": {

    "BootstrapServers": "localhost:9092",

    "Topic": "orders-topic"

  },

  "Logging": {

    "LogLevel": {

      "Default": "Information"

    }

  }

}


✅ Kafka Producer Interface

Services/IKafkaProducer.cs

using Confluent.Kafka;

 

public interface IKafkaProducer

{

    Task ProduceAsync(string topic, string message);

}


✅ Kafka Producer Implementation (Singleton Ready)

Services/KafkaProducer.cs

using Confluent.Kafka;

 

public class KafkaProducer : IKafkaProducer

{

    private readonly IProducer<Null, string> _producer;

 

    public KafkaProducer(IConfiguration configuration)

    {

        var kafkaConfig = new ProducerConfig

        {

            BootstrapServers = configuration["Kafka:BootstrapServers"],

            Acks = Acks.All,

            EnableIdempotence = true

        };

 

        _producer = new ProducerBuilder<Null, string>(kafkaConfig).Build();

    }

 

    public async Task ProduceAsync(string topic, string message)

    {

        await _producer.ProduceAsync(topic, new Message<Null, string>

        {

            Value = message

        });

    }

}


✅ OrdersController (API Endpoint)

Controllers/OrdersController.cs

using Microsoft.AspNetCore.Mvc;

using System.Text.Json;

 

[ApiController]

[Route("api/[controller]")]

public class OrdersController : ControllerBase

{

    private readonly IKafkaProducer _producer;

    private readonly IConfiguration _configuration;

 

    public OrdersController(IKafkaProducer producer, IConfiguration configuration)

    {

        _producer = producer;

        _configuration = configuration;

    }

 

    [HttpPost]

    public async Task<IActionResult> CreateOrder([FromBody] OrderRequest request)

    {

        var topic = _configuration["Kafka:Topic"];

 

        var payload = JsonSerializer.Serialize(new

        {

            OrderId = Guid.NewGuid(),

            request.CustomerName,

            request.Amount,

            CreatedAt = DateTime.UtcNow

        });

 

        await _producer.ProduceAsync(topic!, payload);

 

        return Ok(new { Message = "Order published to Kafka successfully!" });

    }

}

 

public record OrderRequest(string CustomerName, decimal Amount);


✅ Producer Program.cs (DI + Singleton)

Program.cs

var builder = WebApplication.CreateBuilder(args);

 

builder.Services.AddControllers();

builder.Services.AddEndpointsApiExplorer();

builder.Services.AddSwaggerGen();

 

// ✅ Register Kafka Producer as Singleton

builder.Services.AddSingleton<IKafkaProducer, KafkaProducer>();

 

var app = builder.Build();

 

app.UseSwagger();

app.UseSwaggerUI();

 

app.MapControllers();

app.Run();


🧾 Step 3: Create Consumer Worker (.NET 8 Worker Service)

Create project

cd ..

dotnet new worker -n Kafka.ConsumerWorker

cd Kafka.ConsumerWorker

dotnet add package Confluent.Kafka


✅ Consumer Worker appsettings.json

Kafka.ConsumerWorker/appsettings.json

{

  "Kafka": {

    "BootstrapServers": "localhost:9092",

    "Topic": "orders-topic",

    "DlqTopic": "orders-topic-dlq",

    "GroupId": "orders-consumer-group"

  },

  "Logging": {

    "LogLevel": {

      "Default": "Information"

    }

  }

}


Step 4: Dead Letter Queue Producer (DLQ)

✅ DLQ Interface

Services/IDlqProducer.cs

public interface IDlqProducer

{

    Task SendToDlqAsync(string topic, string message);

}


✅ DLQ Producer Implementation

Services/DlqProducer.cs

using Confluent.Kafka;

 

public class DlqProducer : IDlqProducer

{

    private readonly IProducer<Null, string> _producer;

 

    public DlqProducer(IConfiguration configuration)

    {

        var config = new ProducerConfig

        {

            BootstrapServers = configuration["Kafka:BootstrapServers"]

        };

 

        _producer = new ProducerBuilder<Null, string>(config).Build();

    }

 

    public async Task SendToDlqAsync(string topic, string message)

    {

        await _producer.ProduceAsync(topic, new Message<Null, string>

        {

            Value = message

        });

    }

}


👂 Step 5: Kafka Consumer Background Worker (With DLQ)

Workers/OrderConsumerWorker.cs

using Confluent.Kafka;

using Microsoft.Extensions.Hosting;

 

public class OrderConsumerWorker : BackgroundService

{

    private readonly IConfiguration _configuration;

    private readonly ILogger<OrderConsumerWorker> _logger;

    private readonly IDlqProducer _dlqProducer;

 

    public OrderConsumerWorker(

        IConfiguration configuration,

        ILogger<OrderConsumerWorker> logger,

        IDlqProducer dlqProducer)

    {

        _configuration = configuration;

        _logger = logger;

        _dlqProducer = dlqProducer;

    }

 

    protected override Task ExecuteAsync(CancellationToken stoppingToken)

    {

        var config = new ConsumerConfig

        {

            BootstrapServers = _configuration["Kafka:BootstrapServers"],

            GroupId = _configuration["Kafka:GroupId"],

            AutoOffsetReset = AutoOffsetReset.Earliest,

            EnableAutoCommit = false

        };

 

        using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();

 

        var topic = _configuration["Kafka:Topic"];

        var dlqTopic = _configuration["Kafka:DlqTopic"];

 

        consumer.Subscribe(topic);

 

        _logger.LogInformation("Kafka Consumer started...");

 

        while (!stoppingToken.IsCancellationRequested)

        {

            try

            {

                var result = consumer.Consume(stoppingToken);

 

                _logger.LogInformation("Received: {msg}", result.Message.Value);

 

                // ✅ Simulate processing

                ProcessMessage(result.Message.Value);

 

                // ✅ Commit only after success

                consumer.Commit(result);

            }

            catch (ConsumeException ex)

            {

                _logger.LogError(ex, "Kafka consume error");

            }

            catch (Exception ex)

            {

                _logger.LogError(ex, "Processing failed, sending to DLQ...");

 

                // Send message to DLQ (poison pill)

                _dlqProducer.SendToDlqAsync(dlqTopic!, ex.Message).Wait();

            }

        }

 

        consumer.Close();

        return Task.CompletedTask;

    }

 

    private void ProcessMessage(string message)

    {

        // TODO: Your actual logic here

        // throw new Exception("Fake error"); // Uncomment to test DLQ

    }

}


✅ Worker Program.cs

Kafka.ConsumerWorker/Program.cs

using Kafka.ConsumerWorker;

 

var builder = Host.CreateApplicationBuilder(args);

 

// DLQ Producer

builder.Services.AddSingleton<IDlqProducer, DlqProducer>();

 

// Consumer Worker

builder.Services.AddHostedService<OrderConsumerWorker>();

 

var host = builder.Build();

host.Run();


🧠 Step 6: Run Everything

Start Kafka:

docker-compose up -d

Run Producer API:

cd Kafka.ProducerApi

dotnet run

Run Consumer Worker:

cd ../Kafka.ConsumerWorker

dotnet run


✅ Testing

Call API:

POST:

https://localhost:xxxx/api/orders

Body:

{

  "customerName": "Sai",

  "amount": 5000

}

Then your Worker will print:

Received: { ...json... }

 

Comments

Popular posts from this blog

Debouncing & Throttling in RxJS: Optimizing API Calls and User Interactions

Promises in Angular

Comprehensive Guide to C# and .NET Core OOP Concepts and Language Features