Enterprise RabbitMQ + MassTransit + Outbox + EF Core + SQL Server (Real Production Style)
✅ What We’re Building (Enterprise)
Orders
API
- Saves Order in SQL Server
- Saves Outbox message in SQL Server (same transaction)
Outbox
Publisher Worker
- Reads Outbox table
- Publishes event to RabbitMQ via MassTransit
- Marks Outbox as Processed
Consumer
Service
- Consumes OrderSubmitted
- Has retry policy
- If fails → MassTransit automatically moves to _error
queue
1) Docker Compose (RabbitMQ + SQL Server)
π docker-compose.yml
version:
"3.8"
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq-enterprise
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
sqlserver:
image:
mcr.microsoft.com/mssql/server:2022-latest
container_name: sqlserver-enterprise
environment:
SA_PASSWORD:
"YourStrong@Passw0rd"
ACCEPT_EULA: "Y"
ports:
- "1433:1433"
RabbitMQ UI:
π http://localhost:15672
Login: guest / guest
2) NuGet Packages
Run these:
dotnet
add package MassTransit
dotnet
add package MassTransit.RabbitMQ
dotnet
add package Microsoft.EntityFrameworkCore.SqlServer
dotnet
add package Microsoft.EntityFrameworkCore.Tools
3) EF Core Entities
✅
Order
public
class Order
{
public Guid Id { get; set; }
public string CustomerName { get; set; } =
default!;
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; } =
DateTime.UtcNow;
}
✅
OutboxMessage
public
class OutboxMessage
{
public Guid Id { get; set; } =
Guid.NewGuid();
public string MessageType { get; set; } =
default!;
public string Payload { get; set; } =
default!;
public DateTime CreatedAt { get; set; } =
DateTime.UtcNow;
public DateTime? ProcessedAt { get; set; }
public int RetryCount { get; set; }
public string Status { get; set; } =
"Pending";
// Pending, Processed, Failed
}
✅
ProcessedMessage (Idempotency)
public
class ProcessedMessage
{
public Guid Id { get; set; } // EventId
public DateTime ProcessedAt { get; set; } =
DateTime.UtcNow;
}
4) DbContext
using
Microsoft.EntityFrameworkCore;
public
class AppDbContext : DbContext
{
public
AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }
public DbSet<Order> Orders =>
Set<Order>();
public DbSet<OutboxMessage>
OutboxMessages => Set<OutboxMessage>();
public DbSet<ProcessedMessage>
ProcessedMessages => Set<ProcessedMessage>();
protected override void
OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Order>().HasKey(x => x.Id);
modelBuilder.Entity<OutboxMessage>().HasKey(x
=> x.Id);
modelBuilder.Entity<ProcessedMessage>().HasKey(x => x.Id);
}
}
5) Message Contract (Event)
MassTransit works best with Records.
public
record OrderSubmittedEvent(
Guid EventId,
Guid OrderId,
string CustomerName,
decimal TotalAmount,
DateTime CreatedAt,
string CorrelationId
);
6) Orders Controller (Write DB + Outbox)
π OrdersController.cs
using
Microsoft.AspNetCore.Mvc;
using
System.Text.Json;
[ApiController]
[Route("api/orders")]
public
class OrdersController : ControllerBase
{
private readonly AppDbContext _db;
public OrdersController(AppDbContext db)
{
_db = db;
}
[HttpPost]
public async Task<IActionResult>
CreateOrder([FromBody] CreateOrderRequest request)
{
var correlationId =
HttpContext.TraceIdentifier;
using var trx = await
_db.Database.BeginTransactionAsync();
var order = new Order
{
Id = Guid.NewGuid(),
CustomerName =
request.CustomerName,
TotalAmount = request.TotalAmount
};
_db.Orders.Add(order);
var evt = new OrderSubmittedEvent(
EventId: Guid.NewGuid(),
OrderId: order.Id,
CustomerName: order.CustomerName,
TotalAmount: order.TotalAmount,
CreatedAt: order.CreatedAt,
CorrelationId: correlationId
);
_db.OutboxMessages.Add(new
OutboxMessage
{
MessageType =
"OrderSubmittedEvent",
Payload =
JsonSerializer.Serialize(evt),
Status = "Pending"
});
await _db.SaveChangesAsync();
await trx.CommitAsync();
return Ok(new { order.Id, Message =
"Order saved + Outbox stored safely." });
}
}
public
record CreateOrderRequest(string CustomerName, decimal TotalAmount);
7) MassTransit Setup (RabbitMQ)
π Program.cs
using
MassTransit;
using
Microsoft.EntityFrameworkCore;
var
builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddDbContext<AppDbContext>(options
=>
{
options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"));
});
builder.Services.AddMassTransit(x
=>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumer<OrderSubmittedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost",
"/", h =>
{
h.Username("guest");
h.Password("guest");
});
// Retry for consumer failures
cfg.UseMessageRetry(r =>
r.Interval(3, TimeSpan.FromSeconds(5)));
// Auto-create queues based on
consumers
cfg.ConfigureEndpoints(context);
});
});
//
Outbox publisher worker
builder.Services.AddHostedService<OutboxPublisherWorker>();
var
app = builder.Build();
app.MapControllers();
app.Run();
8) Outbox Publisher Worker (Publishes to RabbitMQ)
π OutboxPublisherWorker.cs
using
MassTransit;
using
Microsoft.EntityFrameworkCore;
using
System.Text.Json;
public
class OutboxPublisherWorker : BackgroundService
{
private readonly IServiceScopeFactory
_scopeFactory;
private readonly IPublishEndpoint
_publishEndpoint;
private readonly
ILogger<OutboxPublisherWorker> _logger;
public OutboxPublisherWorker(
IServiceScopeFactory scopeFactory,
IPublishEndpoint publishEndpoint,
ILogger<OutboxPublisherWorker>
logger)
{
_scopeFactory = scopeFactory;
_publishEndpoint = publishEndpoint;
_logger = logger;
}
protected override async Task
ExecuteAsync(CancellationToken stoppingToken)
{
while
(!stoppingToken.IsCancellationRequested)
{
try
{
using var scope =
_scopeFactory.CreateScope();
var db =
scope.ServiceProvider.GetRequiredService<AppDbContext>();
var pending = await
db.OutboxMessages
.Where(x => x.Status ==
"Pending" && x.RetryCount < 5)
.OrderBy(x =>
x.CreatedAt)
.Take(10)
.ToListAsync(stoppingToken);
foreach (var msg in pending)
{
try
{
if (msg.MessageType ==
"OrderSubmittedEvent")
{
var evt =
JsonSerializer.Deserialize<OrderSubmittedEvent>(msg.Payload);
if (evt == null)
throw new
Exception("Outbox payload invalid.");
await
_publishEndpoint.Publish(evt, stoppingToken);
msg.Status = "Processed";
msg.ProcessedAt =
DateTime.UtcNow;
}
else
{
msg.Status =
"Failed";
}
}
catch (Exception ex)
{
msg.RetryCount++;
_logger.LogError(ex,
"Outbox publish failed for {Id}", msg.Id);
if (msg.RetryCount
>= 5)
msg.Status =
"Failed";
}
}
await
db.SaveChangesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox
publisher loop crashed");
}
await
Task.Delay(TimeSpan.FromSeconds(3), stoppingToken);
}
}
}
9) Consumer (MassTransit)
π OrderSubmittedConsumer.cs
using
MassTransit;
using
Microsoft.EntityFrameworkCore;
public
class OrderSubmittedConsumer : IConsumer<OrderSubmittedEvent>
{
private readonly AppDbContext _db;
private readonly
ILogger<OrderSubmittedConsumer> _logger;
public OrderSubmittedConsumer(AppDbContext
db, ILogger<OrderSubmittedConsumer> logger)
{
_db = db;
_logger = logger;
}
public async Task Consume(ConsumeContext<OrderSubmittedEvent>
context)
{
var evt = context.Message;
// ✅ Idempotency check
var alreadyProcessed = await
_db.ProcessedMessages
.AnyAsync(x => x.Id ==
evt.EventId);
if (alreadyProcessed)
{
_logger.LogInformation("Duplicate
event ignored: {EventId}", evt.EventId);
return;
}
// ✅ Business logic
_logger.LogInformation("Processing
Order {OrderId} for {Customer}",
evt.OrderId, evt.CustomerName);
// Simulate work
// throw new Exception("Testing
retry + error queue");
_db.ProcessedMessages.Add(new
ProcessedMessage
{
Id = evt.EventId
});
await _db.SaveChangesAsync();
}
}
π₯ MassTransit Auto Error Queue (Important)
If consumer fails after retries:
MassTransit automatically creates:
✅ order-submitted-event_error
So you get DLQ without manual DLX.
10) appsettings.json
{
"ConnectionStrings": {
"DefaultConnection":
"Server=localhost,1433;Database=RabbitEnterpriseDb;User
Id=sa;Password=YourStrong@Passw0rd;TrustServerCertificate=True;"
}
}
✅ How To Run (Step-by-step)
1)
Start Docker
docker-compose
up -d
2)
Run migrations
dotnet
ef migrations add InitialCreate
dotnet
ef database update
3)
Run the API
dotnet
run
4)
Test API
POST:
http://localhost:5000/api/orders
Body:
{
"customerName": "Balaji",
"totalAmount": 2500
}
✅ Enterprise Summary (Interview Answer)
We used RabbitMQ with MassTransit in
.NET Core.
For reliability, we implemented Outbox Pattern using EF Core + SQL Server.
Orders and Outbox messages are saved in one DB transaction.
A background worker publishes Outbox messages to RabbitMQ.
Consumers are handled by MassTransit with retry policies and automatic error
queues.
We also implemented idempotency using a ProcessedMessages table.
Comments
Post a Comment