MassTransit EF Core Outbox (NO custom polling worker)
Meaning:
- You don’t write OutboxPublisherWorker
- MassTransit automatically stores messages in SQL Server
- Publishes only after transaction commits
- Automatically retries delivery
- Clean + production-ready
✅ What You’ll Get in NEXT++
1)
MassTransit Transactional Outbox
- Your controller does:
- Save Order
- Publish Event (inside same transaction)
- MassTransit stores it in Outbox table
- Publishes after commit
2)
Inbox (Idempotency)
MassTransit can also handle
duplicate protection.
3)
Retries + Error queue
Built-in.
1) Install Required Packages
dotnet
add package MassTransit
dotnet
add package MassTransit.RabbitMQ
dotnet
add package MassTransit.EntityFrameworkCore
dotnet
add package Microsoft.EntityFrameworkCore.SqlServer
dotnet
add package Microsoft.EntityFrameworkCore.Tools
2) DbContext (Orders + MassTransit Outbox Tables)
π AppDbContext.cs
using
MassTransit;
using
MassTransit.EntityFrameworkCoreIntegration;
using
MassTransit.EntityFrameworkCoreIntegration.Mappings;
using
Microsoft.EntityFrameworkCore;
public
class AppDbContext : DbContext
{
public
AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }
public DbSet<Order> Orders =>
Set<Order>();
// MassTransit Outbox Tables
public DbSet<OutboxMessageEntity>
OutboxMessages => Set<OutboxMessageEntity>();
public DbSet<OutboxStateEntity>
OutboxStates => Set<OutboxStateEntity>();
public DbSet<InboxStateEntity>
InboxStates => Set<InboxStateEntity>();
protected override void
OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Order>().HasKey(x => x.Id);
// MassTransit tables mapping
modelBuilder.AddInboxStateEntity();
modelBuilder.AddOutboxMessageEntity();
modelBuilder.AddOutboxStateEntity();
}
}
3) Order Entity
public
class Order
{
public Guid Id { get; set; }
public string CustomerName { get; set; } =
default!;
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; } =
DateTime.UtcNow;
}
4) Event Contract
public
record OrderSubmittedEvent(
Guid OrderId,
string CustomerName,
decimal TotalAmount,
DateTime CreatedAt,
string CorrelationId
);
5) Program.cs (MassTransit + EF Outbox Setup)
π Program.cs
using
MassTransit;
using
Microsoft.EntityFrameworkCore;
var
builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddDbContext<AppDbContext>(options
=>
{
options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"));
});
builder.Services.AddMassTransit(x
=>
{
x.SetKebabCaseEndpointNameFormatter();
// Consumer
x.AddConsumer<OrderSubmittedConsumer>(cfg =>
{
cfg.UseMessageRetry(r =>
r.Interval(3, TimeSpan.FromSeconds(5)));
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost",
"/", h =>
{
h.Username("guest");
h.Password("guest");
});
// ✅ THIS IS THE ENTERPRISE MAGIC
cfg.UseEntityFrameworkOutbox<AppDbContext>(context);
cfg.ConfigureEndpoints(context);
});
// EF Outbox config
x.AddEntityFrameworkOutbox<AppDbContext>(o =>
{
o.QueryDelay = TimeSpan.FromSeconds(1);
o.UseSqlServer();
o.UseBusOutbox();
// π₯ ensures publish is stored
in DB + sent only after commit
});
});
var
app = builder.Build();
app.MapControllers();
app.Run();
6) Controller (NO Manual Outbox Table)
π OrdersController.cs
using
MassTransit;
using
Microsoft.AspNetCore.Mvc;
[ApiController]
[Route("api/orders")]
public
class OrdersController : ControllerBase
{
private readonly AppDbContext _db;
private readonly IPublishEndpoint
_publishEndpoint;
public OrdersController(AppDbContext db,
IPublishEndpoint publishEndpoint)
{
_db = db;
_publishEndpoint = publishEndpoint;
}
[HttpPost]
public async Task<IActionResult>
CreateOrder([FromBody] CreateOrderRequest request)
{
var correlationId =
HttpContext.TraceIdentifier;
using var trx = await _db.Database.BeginTransactionAsync();
var order = new Order
{
Id = Guid.NewGuid(),
CustomerName =
request.CustomerName,
TotalAmount = request.TotalAmount
};
_db.Orders.Add(order);
// ✅ This publish does NOT immediately
go to RabbitMQ.
// It goes to SQL Outbox.
await _publishEndpoint.Publish(new
OrderSubmittedEvent(
OrderId: order.Id,
CustomerName: order.CustomerName,
TotalAmount: order.TotalAmount,
CreatedAt: order.CreatedAt,
CorrelationId: correlationId
));
await _db.SaveChangesAsync();
await trx.CommitAsync();
return Ok(new { order.Id, Message =
"Order saved + Event published using EF Outbox." });
}
}
public
record CreateOrderRequest(string CustomerName, decimal TotalAmount);
7) Consumer (Same)
π OrderSubmittedConsumer.cs
using
MassTransit;
public
class OrderSubmittedConsumer : IConsumer<OrderSubmittedEvent>
{
private readonly
ILogger<OrderSubmittedConsumer> _logger;
public
OrderSubmittedConsumer(ILogger<OrderSubmittedConsumer> logger)
{
_logger = logger;
}
public async Task
Consume(ConsumeContext<OrderSubmittedEvent> context)
{
var evt = context.Message;
_logger.LogInformation("Order
received: {OrderId}, Customer: {Customer}",
evt.OrderId, evt.CustomerName);
// simulate
await Task.Delay(500);
// throw new Exception("Test retry
+ _error queue");
}
}
8) EF Core Migration
Run:
dotnet
ef migrations add InitialWithMassTransitOutbox
dotnet
ef database update
This will create tables like:
- Orders
- InboxState
- OutboxMessage
- OutboxState
✅ Why This Is Enterprise++
π₯
You removed:
❌ OutboxPublisherWorker
❌ Manual Outbox table logic
❌ Manual retry handling
✅
You gained:
✅ Atomic save + publish
✅ Guaranteed delivery
✅ Built-in retries
✅ Cleaner code
✅ Industry standard
π― Interview Answers (VERY IMPORTANT)
Q)
How do you implement Outbox with RabbitMQ in .NET?
✅ Answer:
We use MassTransit with
EntityFramework Outbox.
When we call Publish inside a DB transaction, MassTransit stores the message in
SQL Server Outbox tables.
After transaction commits, MassTransit dispatches the message to RabbitMQ
reliably.
This ensures no message loss and solves dual-write problems.
Q)
Why not publish directly to RabbitMQ?
Because DB save and RabbitMQ publish
are two separate operations.
If DB commit succeeds but publish fails, we lose the event.
Outbox solves this by making message persistence part of the same transaction.
Comments
Post a Comment