Enterprise Architecture (Outbox + Kafka + SQL Server)
🏗 Final Solution Structure
KafkaEnterpriseDemo/
│
├──
docker-compose.yml
│
├──
Kafka.Shared/
│ ├── Events/
│ │
└── OrderCreatedEvent.cs
│ └── Contracts/
│ └── KafkaTopics.cs
│
├──
Kafka.ProducerApi/
│ ├── Controllers/
│ │
└── OrdersController.cs
│ ├── Application/
│ │
├── Services/
│ │
│ └── OrderService.cs
│ │
└── Interfaces/
│ │
└── IOrderService.cs
│ ├── Infrastructure/
│ │
├── Data/
│ │ │
├── AppDbContext.cs
│ │
│ └── Migrations/
│ │
├── Outbox/
│ │
│ ├── OutboxMessage.cs
│ │
│ ├── OutboxStatus.cs
│ │
│ └── OutboxRelayWorker.cs
│ │
└── Kafka/
│ │
├── IKafkaProducer.cs
│ │
└── KafkaProducer.cs
│ ├── appsettings.json
│ └── Program.cs
│
└──
Kafka.ConsumerWorker/
├── Workers/
│
└── OrderConsumerWorker.cs
├── Services/
│
└── DlqProducer.cs
├── appsettings.json
└── Program.cs
🐳 Step 1: Docker Compose (Kafka + Schema
Registry + SQL Server)
✅
docker-compose.yml
version:
'3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image:
confluentinc/cp-schema-registry:latest
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS:
http://0.0.0.0:8081
sqlserver:
image:
mcr.microsoft.com/mssql/server:2022-latest
container_name: kafka_sqlserver
ports:
- "1433:1433"
environment:
ACCEPT_EULA: "Y"
SA_PASSWORD:
"YourStrong@Password123"
Run:
docker-compose
up -d
🧱 Step 2: Install Required Packages (Producer
API)
Inside Kafka.ProducerApi project install:
dotnet
add package Confluent.Kafka
dotnet
add package Microsoft.EntityFrameworkCore.SqlServer
dotnet
add package Microsoft.EntityFrameworkCore.Design
🧾 Step 3: appsettings.json (Producer API)
Kafka.ProducerApi/appsettings.json
{
"ConnectionStrings": {
"DefaultConnection":
"Server=localhost,1433;Database=KafkaOutboxDb;User
Id=sa;Password=YourStrong@Password123;TrustServerCertificate=True"
},
"Kafka": {
"BootstrapServers":
"localhost:9092",
"OrdersTopic":
"orders-topic"
},
"Logging": {
"LogLevel": {
"Default":
"Information"
}
}
}
🗃 Step 4: Create EF Core DbContext
Infrastructure/Data/AppDbContext.cs
using
Kafka.ProducerApi.Infrastructure.Outbox;
using
Microsoft.EntityFrameworkCore;
public
class AppDbContext : DbContext
{
public
AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }
public DbSet<OutboxMessage>
OutboxMessages => Set<OutboxMessage>();
protected override void
OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<OutboxMessage>(entity =>
{
entity.HasKey(x => x.Id);
entity.Property(x =>
x.Topic).IsRequired().HasMaxLength(200);
entity.Property(x =>
x.Payload).IsRequired();
entity.Property(x =>
x.Status).IsRequired();
entity.Property(x => x.CreatedAtUtc).IsRequired();
});
}
}
📦 Step 5: Outbox Entity + Status
Infrastructure/Outbox/OutboxStatus.cs
public
enum OutboxStatus
{
Pending = 0,
Processed = 1,
Failed = 2
}
Infrastructure/Outbox/OutboxMessage.cs
public
class OutboxMessage
{
public Guid Id { get; set; } =
Guid.NewGuid();
public string Topic { get; set; } =
string.Empty;
public string Payload { get; set; } =
string.Empty;
public OutboxStatus Status { get; set; } =
OutboxStatus.Pending;
public int RetryCount { get; set; } = 0;
public string? LastError { get; set; }
public DateTime CreatedAtUtc { get; set; }
= DateTime.UtcNow;
public DateTime? ProcessedAtUtc { get; set;
}
}
🧠 Step 6: Kafka Producer (Singleton)
Infrastructure/Kafka/IKafkaProducer.cs
public
interface IKafkaProducer
{
Task ProduceAsync(string topic, string
message);
}
Infrastructure/Kafka/KafkaProducer.cs
using
Confluent.Kafka;
public
class KafkaProducer : IKafkaProducer
{
private readonly IProducer<Null,
string> _producer;
public KafkaProducer(IConfiguration
configuration)
{
var config = new ProducerConfig
{
BootstrapServers =
configuration["Kafka:BootstrapServers"],
Acks = Acks.All,
EnableIdempotence = true
};
_producer = new
ProducerBuilder<Null, string>(config).Build();
}
public async Task ProduceAsync(string
topic, string message)
{
await _producer.ProduceAsync(topic, new
Message<Null, string>
{
Value = message
});
}
}
🧾 Step 7: Business Event Contract
Kafka.Shared/Events/OrderCreatedEvent.cs
public
class OrderCreatedEvent
{
public Guid OrderId { get; set; }
public string CustomerName { get; set; } =
string.Empty;
public decimal Amount { get; set; }
public DateTime CreatedAtUtc { get; set; }
}
🏦 Step 8: Order Service (Writes Outbox)
Application/Interfaces/IOrderService.cs
public
interface IOrderService
{
Task<Guid> CreateOrderAsync(string
customerName, decimal amount);
}
Application/Services/OrderService.cs
using
System.Text.Json;
public
class OrderService : IOrderService
{
private readonly AppDbContext _db;
private readonly IConfiguration
_configuration;
public OrderService(AppDbContext db,
IConfiguration configuration)
{
_db = db;
_configuration = configuration;
}
public async Task<Guid>
CreateOrderAsync(string customerName, decimal amount)
{
// In real app: save Order table too
(not included here)
var orderId = Guid.NewGuid();
var evt = new OrderCreatedEvent
{
OrderId = orderId,
CustomerName = customerName,
Amount = amount,
CreatedAtUtc = DateTime.UtcNow
};
var topic =
_configuration["Kafka:OrdersTopic"]!;
var outbox = new OutboxMessage
{
Topic = topic,
Payload =
JsonSerializer.Serialize(evt),
Status = OutboxStatus.Pending
};
_db.OutboxMessages.Add(outbox);
await _db.SaveChangesAsync();
return orderId;
}
}
🔁 Step 9: Outbox Relay Worker (Publishes to
Kafka)
This is the real magic.
Infrastructure/Outbox/OutboxRelayWorker.cs
using
Microsoft.EntityFrameworkCore;
public
class OutboxRelayWorker : BackgroundService
{
private readonly IServiceProvider
_serviceProvider;
private readonly
ILogger<OutboxRelayWorker> _logger;
public OutboxRelayWorker(IServiceProvider
serviceProvider, ILogger<OutboxRelayWorker> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task
ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Outbox
Relay Worker started...");
while
(!stoppingToken.IsCancellationRequested)
{
try
{
using var scope =
_serviceProvider.CreateScope();
var db =
scope.ServiceProvider.GetRequiredService<AppDbContext>();
var kafka =
scope.ServiceProvider.GetRequiredService<IKafkaProducer>();
var pendingMessages = await
db.OutboxMessages
.Where(x => x.Status ==
OutboxStatus.Pending && x.RetryCount < 5)
.OrderBy(x => x.CreatedAtUtc)
.Take(20)
.ToListAsync(stoppingToken);
foreach (var msg in
pendingMessages)
{
try
{
await
kafka.ProduceAsync(msg.Topic, msg.Payload);
msg.Status =
OutboxStatus.Processed;
msg.ProcessedAtUtc =
DateTime.UtcNow;
msg.LastError = null;
}
catch (Exception ex)
{
msg.RetryCount++;
msg.LastError =
ex.Message;
if (msg.RetryCount
>= 5)
msg.Status =
OutboxStatus.Failed;
}
}
await
db.SaveChangesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Outbox Relay Worker loop failed.");
}
await Task.Delay(2000, stoppingToken);
}
}
}
🌐 Step 10: OrdersController (Calls Service)
Controllers/OrdersController.cs
using
Microsoft.AspNetCore.Mvc;
[ApiController]
[Route("api/[controller]")]
public
class OrdersController : ControllerBase
{
private readonly IOrderService
_orderService;
public OrdersController(IOrderService
orderService)
{
_orderService = orderService;
}
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody]
OrderRequest request)
{
var id = await
_orderService.CreateOrderAsync(request.CustomerName, request.Amount);
return Ok(new { OrderId = id, Message =
"Order saved + Outbox message created!" });
}
}
public
record OrderRequest(string CustomerName, decimal Amount);
✅ Step 11: Program.cs (DI Setup)
Kafka.ProducerApi/Program.cs
using
Microsoft.EntityFrameworkCore;
var
builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddSwaggerGen();
builder.Services.AddDbContext<AppDbContext>(options
=>
{
options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"));
});
//
Kafka Producer as Singleton
builder.Services.AddSingleton<IKafkaProducer,
KafkaProducer>();
//
Outbox Relay Worker
builder.Services.AddHostedService<OutboxRelayWorker>();
//
Business service
builder.Services.AddScoped<IOrderService,
OrderService>();
var
app = builder.Build();
app.UseSwagger();
app.UseSwaggerUI();
app.MapControllers();
app.Run();
🛠 Step 12: EF Core Migration
Run inside Kafka.ProducerApi:
dotnet
ef migrations add InitOutbox
dotnet
ef database update
✅ What You Achieved (Enterprise Level)
Now your system is:
✅ Reliable (Outbox prevents message
loss)
✅ Scalable (Kafka handles partitions)
✅ Production style (worker relay pattern)
✅ Database-safe (no dual write inconsistency)
✅ Retry supported
✅ Ready for DLQ + Schema Registry
Comments
Post a Comment