There are many ways to configure MassTransit consumers and register them. A nice way I've found is to create an extension class and setup MassTransit configuration.
In this post I'll show how to register a consumer using the AddConsumer
method and have a separate class for the consumer definition so I can configure it individually.
Configuration extension
Use ServiceCollection
extension pattern to keep the host builder clean and tidy.
public static class MassTransit
{
public static void AddMassTransitConfiguration(this IServiceCollection services)
{
services.AddMassTransit(x =>
{
x.AddConsumer<ProcessBurgerOrderConsumer>(typeof(ProcessBurgerOrderConsumerDefinition));
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
}
}
Consumer definition
Create a consumer definition.
public class SubmitBurgerOrderConsumerDefinition : ConsumerDefinition<SubmitBurgerOrderConsumer>
{
public SubmitBurgerOrderConsumerDefinition()
{
// limit the number of messages consumed concurrently
// this applies to the consumer only, not the endpoint
ConcurrentMessageLimit = 8;
}
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<SubmitBurgerOrderConsumer> consumerConfigurator)
{
// configure message retry with millisecond intervals
endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
// use the outbox to prevent duplicate events from being published
endpointConfigurator.UseInMemoryOutbox();
}
}
Hook up
Simply bring it all together.
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransitConfiguration();
services.AddMassTransitHostedService();
});
}