MassTransit EF Core Outbox (NO custom polling worker)

Meaning:

  • You don’t write OutboxPublisherWorker
  • MassTransit automatically stores messages in SQL Server
  • Publishes only after transaction commits
  • Automatically retries delivery
  • Clean + production-ready

✅ What You’ll Get in NEXT++

1) MassTransit Transactional Outbox

  • Your controller does:
    • Save Order
    • Publish Event (inside same transaction)
  • MassTransit stores it in Outbox table
  • Publishes after commit

2) Inbox (Idempotency)

MassTransit can also handle duplicate protection.

3) Retries + Error queue

Built-in.


1) Install Required Packages

dotnet add package MassTransit

dotnet add package MassTransit.RabbitMQ

dotnet add package MassTransit.EntityFrameworkCore

dotnet add package Microsoft.EntityFrameworkCore.SqlServer

dotnet add package Microsoft.EntityFrameworkCore.Tools


2) DbContext (Orders + MassTransit Outbox Tables)

πŸ“Œ AppDbContext.cs

using MassTransit;

using MassTransit.EntityFrameworkCoreIntegration;

using MassTransit.EntityFrameworkCoreIntegration.Mappings;

using Microsoft.EntityFrameworkCore;

 

public class AppDbContext : DbContext

{

    public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }

 

    public DbSet<Order> Orders => Set<Order>();

 

    // MassTransit Outbox Tables

    public DbSet<OutboxMessageEntity> OutboxMessages => Set<OutboxMessageEntity>();

    public DbSet<OutboxStateEntity> OutboxStates => Set<OutboxStateEntity>();

    public DbSet<InboxStateEntity> InboxStates => Set<InboxStateEntity>();

 

    protected override void OnModelCreating(ModelBuilder modelBuilder)

    {

        modelBuilder.Entity<Order>().HasKey(x => x.Id);

 

        // MassTransit tables mapping

        modelBuilder.AddInboxStateEntity();

        modelBuilder.AddOutboxMessageEntity();

        modelBuilder.AddOutboxStateEntity();

    }

}


3) Order Entity

public class Order

{

    public Guid Id { get; set; }

    public string CustomerName { get; set; } = default!;

    public decimal TotalAmount { get; set; }

    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;

}


4) Event Contract

public record OrderSubmittedEvent(

    Guid OrderId,

    string CustomerName,

    decimal TotalAmount,

    DateTime CreatedAt,

    string CorrelationId

);


5) Program.cs (MassTransit + EF Outbox Setup)

πŸ“Œ Program.cs

using MassTransit;

using Microsoft.EntityFrameworkCore;

 

var builder = WebApplication.CreateBuilder(args);

 

builder.Services.AddControllers();

 

builder.Services.AddDbContext<AppDbContext>(options =>

{

    options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"));

});

 

builder.Services.AddMassTransit(x =>

{

    x.SetKebabCaseEndpointNameFormatter();

 

    // Consumer

    x.AddConsumer<OrderSubmittedConsumer>(cfg =>

    {

        cfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));

    });

 

    x.UsingRabbitMq((context, cfg) =>

    {

        cfg.Host("localhost", "/", h =>

        {

            h.Username("guest");

            h.Password("guest");

        });

 

        // ✅ THIS IS THE ENTERPRISE MAGIC

        cfg.UseEntityFrameworkOutbox<AppDbContext>(context);

 

        cfg.ConfigureEndpoints(context);

    });

 

    // EF Outbox config

    x.AddEntityFrameworkOutbox<AppDbContext>(o =>

    {

        o.QueryDelay = TimeSpan.FromSeconds(1);

        o.UseSqlServer();

        o.UseBusOutbox();

        // πŸ”₯ ensures publish is stored in DB + sent only after commit

    });

});

 

var app = builder.Build();

app.MapControllers();

app.Run();


6) Controller (NO Manual Outbox Table)

πŸ“Œ OrdersController.cs

using MassTransit;

using Microsoft.AspNetCore.Mvc;

 

[ApiController]

[Route("api/orders")]

public class OrdersController : ControllerBase

{

    private readonly AppDbContext _db;

    private readonly IPublishEndpoint _publishEndpoint;

 

    public OrdersController(AppDbContext db, IPublishEndpoint publishEndpoint)

    {

        _db = db;

        _publishEndpoint = publishEndpoint;

    }

 

    [HttpPost]

    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)

    {

        var correlationId = HttpContext.TraceIdentifier;

 

        using var trx = await _db.Database.BeginTransactionAsync();

 

        var order = new Order

        {

            Id = Guid.NewGuid(),

            CustomerName = request.CustomerName,

            TotalAmount = request.TotalAmount

        };

 

        _db.Orders.Add(order);

 

        // ✅ This publish does NOT immediately go to RabbitMQ.

        // It goes to SQL Outbox.

        await _publishEndpoint.Publish(new OrderSubmittedEvent(

            OrderId: order.Id,

            CustomerName: order.CustomerName,

            TotalAmount: order.TotalAmount,

            CreatedAt: order.CreatedAt,

            CorrelationId: correlationId

        ));

 

        await _db.SaveChangesAsync();

        await trx.CommitAsync();

 

        return Ok(new { order.Id, Message = "Order saved + Event published using EF Outbox." });

    }

}

 

public record CreateOrderRequest(string CustomerName, decimal TotalAmount);


7) Consumer (Same)

πŸ“Œ OrderSubmittedConsumer.cs

using MassTransit;

 

public class OrderSubmittedConsumer : IConsumer<OrderSubmittedEvent>

{

    private readonly ILogger<OrderSubmittedConsumer> _logger;

 

    public OrderSubmittedConsumer(ILogger<OrderSubmittedConsumer> logger)

    {

        _logger = logger;

    }

 

    public async Task Consume(ConsumeContext<OrderSubmittedEvent> context)

    {

        var evt = context.Message;

 

        _logger.LogInformation("Order received: {OrderId}, Customer: {Customer}",

            evt.OrderId, evt.CustomerName);

 

        // simulate

        await Task.Delay(500);

 

        // throw new Exception("Test retry + _error queue");

    }

}


8) EF Core Migration

Run:

dotnet ef migrations add InitialWithMassTransitOutbox

dotnet ef database update

This will create tables like:

  • Orders
  • InboxState
  • OutboxMessage
  • OutboxState

✅ Why This Is Enterprise++

πŸ”₯ You removed:

❌ OutboxPublisherWorker
❌ Manual Outbox table logic
❌ Manual retry handling

✅ You gained:

✅ Atomic save + publish
✅ Guaranteed delivery
✅ Built-in retries
✅ Cleaner code
✅ Industry standard


🎯 Interview Answers (VERY IMPORTANT)

Q) How do you implement Outbox with RabbitMQ in .NET?

✅ Answer:

We use MassTransit with EntityFramework Outbox.
When we call Publish inside a DB transaction, MassTransit stores the message in SQL Server Outbox tables.
After transaction commits, MassTransit dispatches the message to RabbitMQ reliably.
This ensures no message loss and solves dual-write problems.


Q) Why not publish directly to RabbitMQ?

Because DB save and RabbitMQ publish are two separate operations.
If DB commit succeeds but publish fails, we lose the event.
Outbox solves this by making message persistence part of the same transaction.

 

Comments

Popular posts from this blog

Debouncing & Throttling in RxJS: Optimizing API Calls and User Interactions

Promises in Angular

Comprehensive Guide to C# and .NET Core OOP Concepts and Language Features