Enterprise Kafka + .NET Core implementation
Enterprise Architecture (What we are building)
Order
API (Producer)
- Creates Order in SQL
- Writes Outbox event in same DB transaction
- Background job publishes Outbox to Kafka
Inventory
Worker (Consumer)
- Consumes order-created
- Processes message
- Uses idempotency table
- On failure → retries
- After retries → sends to order-created-dlt
1️⃣ Docker Compose (Kafka + Zookeeper + SQL Server)
📌 docker-compose.yml
version:
"3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
sqlserver:
image:
mcr.microsoft.com/mssql/server:2022-latest
environment:
SA_PASSWORD:
"YourStrong@Passw0rd"
ACCEPT_EULA: "Y"
ports:
- "1433:1433"
2️⃣ SQL Server + EF Core Entities
✅
Order Entity
public
class Order
{
public Guid Id { get; set; }
public string CustomerName { get; set; } =
default!;
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; } =
DateTime.UtcNow;
}
✅
OutboxMessage Entity
public
class OutboxMessage
{
public Guid Id { get; set; }
public string EventType { get; set; } =
default!;
public string Payload { get; set; } =
default!;
public DateTime CreatedAt { get; set; } =
DateTime.UtcNow;
public DateTime? ProcessedAt { get; set; }
public int RetryCount { get; set; }
public string Status { get; set; } =
"Pending";
// Pending, Processed, Failed
}
✅
ProcessedMessage Entity (Idempotency)
public
class ProcessedMessage
{
public Guid Id { get; set; } // MessageId
public DateTime ProcessedAt { get; set; } =
DateTime.UtcNow;
}
3️⃣ EF Core DbContext
public
class AppDbContext : DbContext
{
public
AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }
public DbSet<Order> Orders =>
Set<Order>();
public DbSet<OutboxMessage>
OutboxMessages => Set<OutboxMessage>();
public DbSet<ProcessedMessage>
ProcessedMessages => Set<ProcessedMessage>();
protected override void
OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Order>().HasKey(x => x.Id);
modelBuilder.Entity<OutboxMessage>().HasKey(x => x.Id);
modelBuilder.Entity<ProcessedMessage>().HasKey(x => x.Id);
}
}
4️⃣ Kafka Event DTO (OrderCreated)
public
record OrderCreatedEvent(
Guid EventId,
Guid OrderId,
string CustomerName,
decimal TotalAmount,
DateTime CreatedAt,
string CorrelationId
);
5️⃣ API Controller (Create Order)
📌 This does NOT publish
to Kafka directly
It writes to Outbox safely.
[ApiController]
[Route("api/orders")]
public
class OrdersController : ControllerBase
{
private readonly AppDbContext _db;
public OrdersController(AppDbContext db)
{
_db = db;
}
[HttpPost]
public async Task<IActionResult>
CreateOrder([FromBody] CreateOrderRequest request)
{
var correlationId =
HttpContext.TraceIdentifier;
using var trx = await
_db.Database.BeginTransactionAsync();
var order = new Order
{
Id = Guid.NewGuid(),
CustomerName = request.CustomerName,
TotalAmount = request.TotalAmount
};
_db.Orders.Add(order);
var evt = new OrderCreatedEvent(
EventId: Guid.NewGuid(),
OrderId: order.Id,
CustomerName: order.CustomerName,
TotalAmount: order.TotalAmount,
CreatedAt: order.CreatedAt,
CorrelationId: correlationId
);
_db.OutboxMessages.Add(new
OutboxMessage
{
Id = Guid.NewGuid(),
EventType = "OrderCreated",
Payload =
System.Text.Json.JsonSerializer.Serialize(evt),
Status = "Pending"
});
await _db.SaveChangesAsync();
await trx.CommitAsync();
return Ok(new { order.Id, Message =
"Order created. Event stored in outbox." });
}
}
public
record CreateOrderRequest(string CustomerName, decimal TotalAmount);
6️⃣ Kafka Producer Setup (Confluent.Kafka)
📌 KafkaProducerFactory.cs
using
Confluent.Kafka;
public
class KafkaProducerFactory
{
private readonly ProducerConfig _config;
public KafkaProducerFactory(IConfiguration
configuration)
{
_config = new ProducerConfig
{
BootstrapServers =
configuration["Kafka:BootstrapServers"],
Acks = Acks.All,
EnableIdempotence = true,
MessageSendMaxRetries = 3,
RetryBackoffMs = 200
};
}
public IProducer<string, string>
CreateProducer()
{
return new ProducerBuilder<string,
string>(_config).Build();
}
}
7️⃣ Outbox Publisher Background Service
📌 OutboxPublisherWorker.cs
This runs every few seconds:
- Reads pending outbox messages
- Publishes to Kafka
- Marks processed
using
Confluent.Kafka;
using
Microsoft.EntityFrameworkCore;
public
class OutboxPublisherWorker : BackgroundService
{
private readonly IServiceScopeFactory
_scopeFactory;
private readonly KafkaProducerFactory
_producerFactory;
private readonly ILogger<OutboxPublisherWorker>
_logger;
public OutboxPublisherWorker(
IServiceScopeFactory scopeFactory,
KafkaProducerFactory producerFactory,
ILogger<OutboxPublisherWorker>
logger)
{
_scopeFactory = scopeFactory;
_producerFactory = producerFactory;
_logger = logger;
}
protected override async Task
ExecuteAsync(CancellationToken stoppingToken)
{
using var producer =
_producerFactory.CreateProducer();
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope =
_scopeFactory.CreateScope();
var db =
scope.ServiceProvider.GetRequiredService<AppDbContext>();
var messages = await
db.OutboxMessages
.Where(x => x.Status ==
"Pending" && x.RetryCount < 5)
.OrderBy(x =>
x.CreatedAt)
.Take(10)
.ToListAsync(stoppingToken);
foreach (var msg in messages)
{
try
{
var topic =
msg.EventType switch
{
"OrderCreated" => "order-created",
_ => "unknown-topic"
};
var kafkaMessage = new
Message<string, string>
{
Key =
msg.Id.ToString(),
Value = msg.Payload
};
await
producer.ProduceAsync(topic, kafkaMessage, stoppingToken);
msg.Status =
"Processed";
msg.ProcessedAt =
DateTime.UtcNow;
}
catch (Exception ex)
{
msg.RetryCount++;
_logger.LogError(ex,
"Failed publishing outbox message {Id}", msg.Id);
if (msg.RetryCount
>= 5)
msg.Status =
"Failed";
}
}
await
db.SaveChangesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Outbox worker crashed loop.");
}
await
Task.Delay(TimeSpan.FromSeconds(3), stoppingToken);
}
}
}
8️⃣ Kafka Consumer Worker (OrderCreated Consumer)
📌 OrderCreatedConsumerWorker.cs
using
Confluent.Kafka;
using
Microsoft.EntityFrameworkCore;
using
System.Text.Json;
public
class OrderCreatedConsumerWorker : BackgroundService
{
private readonly IServiceScopeFactory
_scopeFactory;
private readonly IConfiguration
_configuration;
private readonly
ILogger<OrderCreatedConsumerWorker> _logger;
public OrderCreatedConsumerWorker(
IServiceScopeFactory scopeFactory,
IConfiguration configuration,
ILogger<OrderCreatedConsumerWorker> logger)
{
_scopeFactory = scopeFactory;
_configuration = configuration;
_logger = logger;
}
protected override async Task
ExecuteAsync(CancellationToken stoppingToken)
{
var config = new ConsumerConfig
{
BootstrapServers =
_configuration["Kafka:BootstrapServers"],
GroupId =
"inventory-service",
AutoOffsetReset =
AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using var consumer = new
ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("order-created");
while
(!stoppingToken.IsCancellationRequested)
{
try
{
var result =
consumer.Consume(stoppingToken);
if (result?.Message?.Value ==
null)
continue;
var evt =
JsonSerializer.Deserialize<OrderCreatedEvent>(result.Message.Value);
if (evt == null)
continue;
var success = await
ProcessWithRetry(evt, stoppingToken);
if (success)
{
consumer.Commit(result);
}
else
{
// Send to DLT (dead letter
topic)
await
SendToDeadLetter(result.Message.Value, stoppingToken);
// Commit offset to avoid
infinite loop
consumer.Commit(result);
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex,
"Kafka consume error");
}
catch (Exception ex)
{
_logger.LogError(ex,
"Consumer loop crashed");
}
}
}
private async Task<bool> ProcessWithRetry(OrderCreatedEvent
evt, CancellationToken ct)
{
var maxRetry = 3;
for (int attempt = 1; attempt <=
maxRetry; attempt++)
{
try
{
using var scope =
_scopeFactory.CreateScope();
var db =
scope.ServiceProvider.GetRequiredService<AppDbContext>();
// Idempotency check
var alreadyProcessed = await
db.ProcessedMessages
.AnyAsync(x => x.Id ==
evt.EventId, ct);
if (alreadyProcessed)
{
_logger.LogInformation("Event already processed: {EventId}",
evt.EventId);
return true;
}
// ✅ Business logic here
_logger.LogInformation("Processing order:
{OrderId} attempt {Attempt}", evt.OrderId, attempt);
// Save idempotency marker
db.ProcessedMessages.Add(new
ProcessedMessage
{
Id = evt.EventId
});
await db.SaveChangesAsync(ct);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex,
"Processing failed attempt {Attempt}", attempt);
await
Task.Delay(TimeSpan.FromSeconds(2), ct);
}
}
return false;
}
private async Task SendToDeadLetter(string
payload, CancellationToken ct)
{
var producerConfig = new ProducerConfig
{
BootstrapServers =
_configuration["Kafka:BootstrapServers"]
};
using var producer = new
ProducerBuilder<string, string>(producerConfig).Build();
await
producer.ProduceAsync("order-created-dlt",
new Message<string, string> {
Key = Guid.NewGuid().ToString(), Value = payload }, ct);
}
}
9️⃣ appsettings.json
{
"Kafka": {
"BootstrapServers":
"localhost:9092"
},
"ConnectionStrings": {
"DefaultConnection":
"Server=localhost,1433;Database=KafkaDemoDb;User
Id=sa;Password=YourStrong@Passw0rd;TrustServerCertificate=True;"
}
}
🔟 Program.cs (Register Everything)
var
builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddDbContext<AppDbContext>(options
=>
{
options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"));
});
builder.Services.AddSingleton<KafkaProducerFactory>();
//
Outbox publisher
builder.Services.AddHostedService<OutboxPublisherWorker>();
//
Consumer service (usually separate microservice in real world)
builder.Services.AddHostedService<OrderCreatedConsumerWorker>();
var
app = builder.Build();
app.MapControllers();
app.Run();
✅ What you achieved (Enterprise Ready)
You now have:
✅ EF Core + SQL Server
✅ Outbox Pattern (safe publish)
✅ Kafka Producer (Confluent.Kafka)
✅ Kafka Consumer (BackgroundService)
✅ Retry + Dead Letter Topic
✅ Idempotency (ProcessedMessages table)
✅ Commit offsets after success
🔥 Interview-ready Explanation (Use this)
We implemented Kafka using
Confluent.Kafka in .NET Core.
To guarantee consistency between database and Kafka publishing, we used the
Outbox pattern with EF Core and SQL Server.
Events are stored in an Outbox table within the same transaction as business
data.
A background publisher service publishes events to Kafka and marks them as
processed.
Consumers are hosted services with manual offset commit, retry logic,
dead-letter topic support, and idempotency checks.
Comments
Post a Comment