Final Enterprise Project Structure (Kafka + .NET 8)
We will build 2 projects (recommended in microservices):
KafkaDotNetDemo/
│
├──
docker-compose.yml
│
├──
Kafka.ProducerApi/
│ ├── Controllers/
│ │
└── OrdersController.cs
│ ├── Services/
│ │
├── IKafkaProducer.cs
│ │
└── KafkaProducer.cs
│ ├── appsettings.json
│ ├── Program.cs
│ └── Kafka.ProducerApi.csproj
│
└──
Kafka.ConsumerWorker/
├── Workers/
│
└── OrderConsumerWorker.cs
├── Services/
│
├── IDlqProducer.cs
│
└── DlqProducer.cs
├── appsettings.json
├── Program.cs
└── Kafka.ConsumerWorker.csproj
🐳 Step 1: docker-compose.yml (Kafka +
Zookeeper + Schema Registry)
Create this file in root folder:
✅
docker-compose.yml
version:
'3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image:
confluentinc/cp-schema-registry:latest
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME:
schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS:
http://0.0.0.0:8081
Run:
docker-compose
up -d
🚀 Step 2: Create Producer API (.NET 8 Web API)
Create
project
dotnet
new webapi -n Kafka.ProducerApi
cd
Kafka.ProducerApi
dotnet
add package Confluent.Kafka
✅
Producer API appsettings.json
Kafka.ProducerApi/appsettings.json
{
"Kafka": {
"BootstrapServers":
"localhost:9092",
"Topic": "orders-topic"
},
"Logging": {
"LogLevel": {
"Default":
"Information"
}
}
}
✅
Kafka Producer Interface
Services/IKafkaProducer.cs
using
Confluent.Kafka;
public
interface IKafkaProducer
{
Task ProduceAsync(string topic, string
message);
}
✅
Kafka Producer Implementation (Singleton Ready)
Services/KafkaProducer.cs
using
Confluent.Kafka;
public
class KafkaProducer : IKafkaProducer
{
private readonly IProducer<Null,
string> _producer;
public KafkaProducer(IConfiguration
configuration)
{
var kafkaConfig = new ProducerConfig
{
BootstrapServers =
configuration["Kafka:BootstrapServers"],
Acks = Acks.All,
EnableIdempotence = true
};
_producer = new
ProducerBuilder<Null, string>(kafkaConfig).Build();
}
public async Task ProduceAsync(string
topic, string message)
{
await _producer.ProduceAsync(topic, new
Message<Null, string>
{
Value = message
});
}
}
✅
OrdersController (API Endpoint)
Controllers/OrdersController.cs
using
Microsoft.AspNetCore.Mvc;
using
System.Text.Json;
[ApiController]
[Route("api/[controller]")]
public
class OrdersController : ControllerBase
{
private readonly IKafkaProducer _producer;
private readonly IConfiguration
_configuration;
public OrdersController(IKafkaProducer
producer, IConfiguration configuration)
{
_producer = producer;
_configuration = configuration;
}
[HttpPost]
public async Task<IActionResult>
CreateOrder([FromBody] OrderRequest request)
{
var topic =
_configuration["Kafka:Topic"];
var payload =
JsonSerializer.Serialize(new
{
OrderId = Guid.NewGuid(),
request.CustomerName,
request.Amount,
CreatedAt = DateTime.UtcNow
});
await _producer.ProduceAsync(topic!,
payload);
return Ok(new { Message = "Order
published to Kafka successfully!" });
}
}
public
record OrderRequest(string CustomerName, decimal Amount);
✅
Producer Program.cs (DI + Singleton)
Program.cs
var
builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
//
✅ Register Kafka Producer as Singleton
builder.Services.AddSingleton<IKafkaProducer,
KafkaProducer>();
var
app = builder.Build();
app.UseSwagger();
app.UseSwaggerUI();
app.MapControllers();
app.Run();
🧾 Step 3: Create Consumer Worker (.NET 8
Worker Service)
Create
project
cd
..
dotnet
new worker -n Kafka.ConsumerWorker
cd
Kafka.ConsumerWorker
dotnet
add package Confluent.Kafka
✅
Consumer Worker appsettings.json
Kafka.ConsumerWorker/appsettings.json
{
"Kafka": {
"BootstrapServers":
"localhost:9092",
"Topic":
"orders-topic",
"DlqTopic":
"orders-topic-dlq",
"GroupId":
"orders-consumer-group"
},
"Logging": {
"LogLevel": {
"Default":
"Information"
}
}
}
☠ Step 4: Dead Letter Queue Producer
(DLQ)
✅
DLQ Interface
Services/IDlqProducer.cs
public
interface IDlqProducer
{
Task SendToDlqAsync(string topic, string
message);
}
✅
DLQ Producer Implementation
Services/DlqProducer.cs
using
Confluent.Kafka;
public
class DlqProducer : IDlqProducer
{
private readonly IProducer<Null,
string> _producer;
public DlqProducer(IConfiguration
configuration)
{
var config = new ProducerConfig
{
BootstrapServers =
configuration["Kafka:BootstrapServers"]
};
_producer = new
ProducerBuilder<Null, string>(config).Build();
}
public async Task SendToDlqAsync(string
topic, string message)
{
await _producer.ProduceAsync(topic, new
Message<Null, string>
{
Value = message
});
}
}
👂 Step 5: Kafka Consumer Background Worker
(With DLQ)
Workers/OrderConsumerWorker.cs
using
Confluent.Kafka;
using
Microsoft.Extensions.Hosting;
public
class OrderConsumerWorker : BackgroundService
{
private readonly IConfiguration
_configuration;
private readonly
ILogger<OrderConsumerWorker> _logger;
private readonly IDlqProducer _dlqProducer;
public OrderConsumerWorker(
IConfiguration configuration,
ILogger<OrderConsumerWorker>
logger,
IDlqProducer dlqProducer)
{
_configuration = configuration;
_logger = logger;
_dlqProducer = dlqProducer;
}
protected override Task
ExecuteAsync(CancellationToken stoppingToken)
{
var config = new ConsumerConfig
{
BootstrapServers =
_configuration["Kafka:BootstrapServers"],
GroupId =
_configuration["Kafka:GroupId"],
AutoOffsetReset =
AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using var consumer = new
ConsumerBuilder<Ignore, string>(config).Build();
var topic =
_configuration["Kafka:Topic"];
var dlqTopic =
_configuration["Kafka:DlqTopic"];
consumer.Subscribe(topic);
_logger.LogInformation("Kafka
Consumer started...");
while
(!stoppingToken.IsCancellationRequested)
{
try
{
var result =
consumer.Consume(stoppingToken);
_logger.LogInformation("Received:
{msg}", result.Message.Value);
// ✅ Simulate processing
ProcessMessage(result.Message.Value);
// ✅ Commit only after success
consumer.Commit(result);
}
catch (ConsumeException ex)
{
_logger.LogError(ex,
"Kafka consume error");
}
catch (Exception ex)
{
_logger.LogError(ex,
"Processing failed, sending to DLQ...");
// Send message to DLQ (poison
pill)
_dlqProducer.SendToDlqAsync(dlqTopic!, ex.Message).Wait();
}
}
consumer.Close();
return Task.CompletedTask;
}
private void ProcessMessage(string message)
{
// TODO: Your actual logic here
// throw new Exception("Fake
error"); // Uncomment to test DLQ
}
}
✅
Worker Program.cs
Kafka.ConsumerWorker/Program.cs
using
Kafka.ConsumerWorker;
var
builder = Host.CreateApplicationBuilder(args);
//
DLQ Producer
builder.Services.AddSingleton<IDlqProducer,
DlqProducer>();
//
Consumer Worker
builder.Services.AddHostedService<OrderConsumerWorker>();
var
host = builder.Build();
host.Run();
🧠 Step 6: Run Everything
Start
Kafka:
docker-compose
up -d
Run
Producer API:
cd
Kafka.ProducerApi
dotnet
run
Run
Consumer Worker:
cd
../Kafka.ConsumerWorker
dotnet
run
✅ Testing
Call API:
POST:
https://localhost:xxxx/api/orders
Body:
{
"customerName": "Sai",
"amount": 5000
}
Then your Worker will print:
Received:
{ ...json... }
Comments
Post a Comment