Enterprise RabbitMQ + MassTransit + Outbox + EF Core + SQL Server (Real Production Style)

✅ What We’re Building (Enterprise)

Orders API

  • Saves Order in SQL Server
  • Saves Outbox message in SQL Server (same transaction)

Outbox Publisher Worker

  • Reads Outbox table
  • Publishes event to RabbitMQ via MassTransit
  • Marks Outbox as Processed

Consumer Service

  • Consumes OrderSubmitted
  • Has retry policy
  • If fails → MassTransit automatically moves to _error queue

1) Docker Compose (RabbitMQ + SQL Server)

πŸ“Œ docker-compose.yml

version: "3.8"

 

services:

  rabbitmq:

    image: rabbitmq:3-management

    container_name: rabbitmq-enterprise

    ports:

      - "5672:5672"

      - "15672:15672"

    environment:

      RABBITMQ_DEFAULT_USER: guest

      RABBITMQ_DEFAULT_PASS: guest

 

  sqlserver:

    image: mcr.microsoft.com/mssql/server:2022-latest

    container_name: sqlserver-enterprise

    environment:

      SA_PASSWORD: "YourStrong@Passw0rd"

      ACCEPT_EULA: "Y"

    ports:

      - "1433:1433"

RabbitMQ UI:
πŸ‘‰ http://localhost:15672
Login: guest / guest


2) NuGet Packages

Run these:

dotnet add package MassTransit

dotnet add package MassTransit.RabbitMQ

dotnet add package Microsoft.EntityFrameworkCore.SqlServer

dotnet add package Microsoft.EntityFrameworkCore.Tools


3) EF Core Entities

✅ Order

public class Order

{

    public Guid Id { get; set; }

    public string CustomerName { get; set; } = default!;

    public decimal TotalAmount { get; set; }

    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;

}


✅ OutboxMessage

public class OutboxMessage

{

    public Guid Id { get; set; } = Guid.NewGuid();

 

    public string MessageType { get; set; } = default!;

    public string Payload { get; set; } = default!;

 

    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;

    public DateTime? ProcessedAt { get; set; }

 

    public int RetryCount { get; set; }

    public string Status { get; set; } = "Pending";

    // Pending, Processed, Failed

}


✅ ProcessedMessage (Idempotency)

public class ProcessedMessage

{

    public Guid Id { get; set; } // EventId

    public DateTime ProcessedAt { get; set; } = DateTime.UtcNow;

}


4) DbContext

using Microsoft.EntityFrameworkCore;

 

public class AppDbContext : DbContext

{

    public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }

 

    public DbSet<Order> Orders => Set<Order>();

    public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();

    public DbSet<ProcessedMessage> ProcessedMessages => Set<ProcessedMessage>();

 

    protected override void OnModelCreating(ModelBuilder modelBuilder)

    {

        modelBuilder.Entity<Order>().HasKey(x => x.Id);

        modelBuilder.Entity<OutboxMessage>().HasKey(x => x.Id);

        modelBuilder.Entity<ProcessedMessage>().HasKey(x => x.Id);

    }

}


5) Message Contract (Event)

MassTransit works best with Records.

public record OrderSubmittedEvent(

    Guid EventId,

    Guid OrderId,

    string CustomerName,

    decimal TotalAmount,

    DateTime CreatedAt,

    string CorrelationId

);


6) Orders Controller (Write DB + Outbox)

πŸ“Œ OrdersController.cs

using Microsoft.AspNetCore.Mvc;

using System.Text.Json;

 

[ApiController]

[Route("api/orders")]

public class OrdersController : ControllerBase

{

    private readonly AppDbContext _db;

 

    public OrdersController(AppDbContext db)

    {

        _db = db;

    }

 

    [HttpPost]

    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)

    {

        var correlationId = HttpContext.TraceIdentifier;

 

        using var trx = await _db.Database.BeginTransactionAsync();

 

        var order = new Order

        {

            Id = Guid.NewGuid(),

            CustomerName = request.CustomerName,

            TotalAmount = request.TotalAmount

        };

 

        _db.Orders.Add(order);

 

        var evt = new OrderSubmittedEvent(

            EventId: Guid.NewGuid(),

            OrderId: order.Id,

            CustomerName: order.CustomerName,

            TotalAmount: order.TotalAmount,

            CreatedAt: order.CreatedAt,

            CorrelationId: correlationId

        );

 

        _db.OutboxMessages.Add(new OutboxMessage

        {

            MessageType = "OrderSubmittedEvent",

            Payload = JsonSerializer.Serialize(evt),

            Status = "Pending"

        });

 

        await _db.SaveChangesAsync();

        await trx.CommitAsync();

 

        return Ok(new { order.Id, Message = "Order saved + Outbox stored safely." });

    }

}

 

public record CreateOrderRequest(string CustomerName, decimal TotalAmount);


7) MassTransit Setup (RabbitMQ)

πŸ“Œ Program.cs

using MassTransit;

using Microsoft.EntityFrameworkCore;

 

var builder = WebApplication.CreateBuilder(args);

 

builder.Services.AddControllers();

 

builder.Services.AddDbContext<AppDbContext>(options =>

{

    options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"));

});

 

builder.Services.AddMassTransit(x =>

{

    x.SetKebabCaseEndpointNameFormatter();

 

    x.AddConsumer<OrderSubmittedConsumer>();

 

    x.UsingRabbitMq((context, cfg) =>

    {

        cfg.Host("localhost", "/", h =>

        {

            h.Username("guest");

            h.Password("guest");

        });

 

        // Retry for consumer failures

        cfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));

 

        // Auto-create queues based on consumers

        cfg.ConfigureEndpoints(context);

    });

});

 

// Outbox publisher worker

builder.Services.AddHostedService<OutboxPublisherWorker>();

 

var app = builder.Build();

app.MapControllers();

app.Run();


8) Outbox Publisher Worker (Publishes to RabbitMQ)

πŸ“Œ OutboxPublisherWorker.cs

using MassTransit;

using Microsoft.EntityFrameworkCore;

using System.Text.Json;

 

public class OutboxPublisherWorker : BackgroundService

{

    private readonly IServiceScopeFactory _scopeFactory;

    private readonly IPublishEndpoint _publishEndpoint;

    private readonly ILogger<OutboxPublisherWorker> _logger;

 

    public OutboxPublisherWorker(

        IServiceScopeFactory scopeFactory,

        IPublishEndpoint publishEndpoint,

        ILogger<OutboxPublisherWorker> logger)

    {

        _scopeFactory = scopeFactory;

        _publishEndpoint = publishEndpoint;

        _logger = logger;

    }

 

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    {

        while (!stoppingToken.IsCancellationRequested)

        {

            try

            {

                using var scope = _scopeFactory.CreateScope();

                var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();

 

                var pending = await db.OutboxMessages

                    .Where(x => x.Status == "Pending" && x.RetryCount < 5)

                    .OrderBy(x => x.CreatedAt)

                    .Take(10)

                    .ToListAsync(stoppingToken);

 

                foreach (var msg in pending)

                {

                    try

                    {

                        if (msg.MessageType == "OrderSubmittedEvent")

                        {

                            var evt = JsonSerializer.Deserialize<OrderSubmittedEvent>(msg.Payload);

 

                            if (evt == null)

                                throw new Exception("Outbox payload invalid.");

 

                            await _publishEndpoint.Publish(evt, stoppingToken);

 

                            msg.Status = "Processed";

                            msg.ProcessedAt = DateTime.UtcNow;

                        }

                        else

                        {

                            msg.Status = "Failed";

                        }

                    }

                    catch (Exception ex)

                    {

                        msg.RetryCount++;

                        _logger.LogError(ex, "Outbox publish failed for {Id}", msg.Id);

 

                        if (msg.RetryCount >= 5)

                            msg.Status = "Failed";

                    }

                }

 

                await db.SaveChangesAsync(stoppingToken);

            }

            catch (Exception ex)

            {

                _logger.LogError(ex, "Outbox publisher loop crashed");

            }

 

            await Task.Delay(TimeSpan.FromSeconds(3), stoppingToken);

        }

    }

}


9) Consumer (MassTransit)

πŸ“Œ OrderSubmittedConsumer.cs

using MassTransit;

using Microsoft.EntityFrameworkCore;

 

public class OrderSubmittedConsumer : IConsumer<OrderSubmittedEvent>

{

    private readonly AppDbContext _db;

    private readonly ILogger<OrderSubmittedConsumer> _logger;

 

    public OrderSubmittedConsumer(AppDbContext db, ILogger<OrderSubmittedConsumer> logger)

    {

        _db = db;

        _logger = logger;

    }

 

    public async Task Consume(ConsumeContext<OrderSubmittedEvent> context)

    {

        var evt = context.Message;

 

        // ✅ Idempotency check

        var alreadyProcessed = await _db.ProcessedMessages

            .AnyAsync(x => x.Id == evt.EventId);

 

        if (alreadyProcessed)

        {

            _logger.LogInformation("Duplicate event ignored: {EventId}", evt.EventId);

            return;

        }

 

        // ✅ Business logic

        _logger.LogInformation("Processing Order {OrderId} for {Customer}",

            evt.OrderId, evt.CustomerName);

 

        // Simulate work

        // throw new Exception("Testing retry + error queue");

 

        _db.ProcessedMessages.Add(new ProcessedMessage

        {

            Id = evt.EventId

        });

 

        await _db.SaveChangesAsync();

    }

}


πŸ”₯ MassTransit Auto Error Queue (Important)

If consumer fails after retries:

MassTransit automatically creates:

order-submitted-event_error

So you get DLQ without manual DLX.


10) appsettings.json

{

  "ConnectionStrings": {

    "DefaultConnection": "Server=localhost,1433;Database=RabbitEnterpriseDb;User Id=sa;Password=YourStrong@Passw0rd;TrustServerCertificate=True;"

  }

}


✅ How To Run (Step-by-step)

1) Start Docker

docker-compose up -d

2) Run migrations

dotnet ef migrations add InitialCreate

dotnet ef database update

3) Run the API

dotnet run

4) Test API

POST:
http://localhost:5000/api/orders

Body:

{

  "customerName": "Balaji",

  "totalAmount": 2500

}


✅ Enterprise Summary (Interview Answer)

We used RabbitMQ with MassTransit in .NET Core.
For reliability, we implemented Outbox Pattern using EF Core + SQL Server.
Orders and Outbox messages are saved in one DB transaction.
A background worker publishes Outbox messages to RabbitMQ.
Consumers are handled by MassTransit with retry policies and automatic error queues.
We also implemented idempotency using a ProcessedMessages table.

Comments

Popular posts from this blog

Debouncing & Throttling in RxJS: Optimizing API Calls and User Interactions

Promises in Angular

Comprehensive Guide to C# and .NET Core OOP Concepts and Language Features