RabbitMQ is a message broker accepts messages from a producer (the message sender) and holds it in a queue so that a consumer (the message receiver) can retrieve it. In this blog, I'll show you how to build a topic exchange on RabbitMQ with .NET 6
At a high level, RabbitMQ is an open source message broker. A message broker accepts messages from a producer (the message sender) and holds it in a queue so that a consumer (the message receiver) can retrieve it. This allows for multiple producers and consumers to share the same queue without having to directly pass messages between each other. What RabbitMQ excels at is doing this at scale whilst staying lightweight and easy to deploy.
To get started with a basic RabbitMQ implementation, checkout this guide.
The first question you may have is “why do I want to add in additional complexity?”
If you were not using a message broker, you would most likely be using an HTTP or socket-based form of communication. These methods can be difficult to scale and are not always robust. HTTP communication can also tightly couple two systems together - increasing inter-dependency, which is undesirable.
The addition of a message broker improves the fault tolerance and resiliency of the systems in which they are employed.
They are easy to scale as their publish/subscribe pattern means the addition of many more services can be easily supported - without having to modify existing systems.
RabbitMQ has four types of exchanges (or message routers) available to route the message in different ways. You will focus specifically on a topic exchange.
Topic exchanges route messages to one or many queues based on matching a message routing key and the pattern that was used to bind a queue to an exchange. The topic exchange type is often used to implement various publish/subscribe pattern variations. Topic exchanges are commonly used for the multicast routing of messages.
If you would like to learn more about the three other types of exchange available in RabbitMQ, then checkout the documentation.
Before you begin, there are a few tools you will need:
For this example, you’ll need a simple, single-node RabbitMQ server, so you won’t need to change any of the default settings. You can run the container image with the following:
docker run -it --rm -p 5672:5672 -p 15672:15672 rabbitmq:3-management
The above will run the 3-management tag of the RabbitMQ container image. The 3-management tag of this image ships with the management plug-in, which will provide an HTTP API
and a web-based UI for your RabbitMQ server.
You’ll tell Docker to run this with an interactive shell with the -it
option, as well as tell it to automatically remove the container when it exits with the -rm
option. You’ll also forward a couple of ports (5672
for the RabbitMQ server and 15672
for the HTTP API and web UI) from your local machine to the container, which means that you can connect to it on localhost
.
To access the web UI, navigate to http://localhost:15672
. The username and password are both guest
. The UI offers features such as add and remove users, as well as being able to view diagnostic information from the server.
To demonstrate the topic exchange you will use this simple flow:
OrderService
via an API callorder.cookwaffle
and passed to RabbitMQKitchenService
will listen for any messages with a routing key that contains cookwaffle
KitchenService
will then create a new Order object and pass it to the SignalR
hub to be displayed on the UIThe bare bones of this project have been created for you in the following repo on GitHub. This is the start
branch. If you would like to see the completed code, checkout the complete
branch.
This repo consists of three discrete projects - detailed below. Each project already has the RabbitMQ nuget package, RabbitMQ.Client
installed ready to use.
A webAPI application which, when its API endpoint is called, will create a new order. You will need to add a message to the topic exchange with the routing key order.cookwaffle
. This project is the “Producer”.
A Blazor Server application with SignalR
. SignalR
will be used to update the kitchen UI browser window as orders come in.
You will need to add a listener to this application to listen for messages with a specific routing key pattern matching #.cookwaffle
, to publish an order to the kitchen UI. This is a wildcard routing key as it contains the #
. This service will look for any messages relating to cookwaffle
regardless of suffixes.
This project is the “Consumer”.
A class library where you will store shared code to do with RabbitMQ setup. This project has a model called RabbitMQSettings
consisting of three fields: HostName
, ExchangeName
and ExchangeType
which will be used to define these properties in RabbitMQ. It also has a class called IServiceCollectionExtensions
which you will use in your other two projects to set up RabbitMQ client connections.
Open up the solution in your IDE of choice. You will start with the Messaging.Common
project as this will be used by the other two projects.
As previously mentioned, this project contains a file called IServiceCollectionExtensions
which you will use to set a connection to RabbitMQ.
First you will need to get the connection settings for the RabbitMQ broker from appsettings.json
and bind them to a C# object. You can then add these settings as a singleton in the IoC container IServiceCollection
.
public static IServiceCollection SetUpRabbitMq(this IServiceCollection services, IConfiguration config)
{
var configSection = config.GetSection("RabbitMQSettings");
var settings = new RabbitMQSettings();
configSection.Bind(settings);
// add the settings for later use by other classes via injection
services.AddSingleton<RabbitMQSettings>(settings);
}
Next, you will need to create an instance of ConnectionFactory
and add it to the IoC container, using the HostName
from the RabbitMQSettings
object. As you’ll be using an async consumer later on in this demo, you will also need to tell the ConnectionFactory
this by setting DispatchConsumersAsync
to `true.
services.AddSingleton<IConnectionFactory>(sp => new ConnectionFactory
{
HostName = settings.HostName,
DispatchConsumersAsync = true;
});
To ensure your connections are disposed of correctly by the IoC container, you will need to create a ModelFactory
that implements the IDisposable
interface. The outline for this class is already created for you in the IServiceCollectionExtensions
file.
For more information on correct disposal of dependencies added to the IServiceCollection
please refer to this excellent blog series by Steve Collins.
First, create a constructor that accepts both the RabbitMQSettings
and the IConnectionFactory
you just created and added to the IoC container. You will also call CreateConnection
on the ConnectionFactory
.
Assign both of the above to private fields.
public class ModelFactory : IDisposable
{
private readonly IConnection _connection;
private readonly RabbitMQSettings _settings;
public ModelFactory(IConnectionFactory connectionFactory, RabbitMQSettings settings)
{
_settings = settings;
_connection = connectionFactory.CreateConnection();
}
public void Dispose()
{
throw new NotImplementedException();
}
}
Now, create a method called CreateChannel
that will return an instance of IModel
. This is where you will declare both the type and name of your RabbitMQ exchange.
public IModel CreateChannel()
{
var channel = _connection.CreateModel();
channel.ExchangeDeclare(exchange: _settings.ExchangeName, type: _settings.ExchangeType);
return channel;
}
Lastly, in the Dispose
method, you will need to call Dispose
on the connection.
public void Dispose()
{
_connection.Dispose();
}
You will now have a completed ModelFactory
class which you can add to the IoC container back in the SetUpRabbitMq
method above and calling the CreateChannel
method.
public static IServiceCollection SetUpRabbitMq(this IServiceCollection services, IConfiguration config)
{
var configSection = config.GetSection("RabbitMQSettings");
var settings = new RabbitMQSettings();
configSection.Bind(settings);
// add the settings for later use by other classes via injection
services.AddSingleton<RabbitMQSettings>(settings);
// As the connection factory is disposable, need to ensure container disposes of it when finished
services.AddSingleton<IConnectionFactory>(sp => new ConnectionFactory
{
HostName = settings.HostName
});
services.AddSingleton<ModelFactory>();
services.AddSingleton(sp => sp.GetRequiredService<ModelFactory>().CreateChannel());
return services;
}
That is everything you need to do in the “Messaging.Common” project.
For both the KitchenService
and the OrderService
projects you will store the configuration for RabbitMQ in the appsettings.json
file in each project.
In production, you will store this configuration wherever you add environment variables.
Add the following JSON to the top-level of appsettings.json
file in the KitchenService
and OrderService
projects:
{
// other configuration removed for brevity
"RabbitMQSettings": {
"HostName": "localhost",
"ExchangeName": "waffle_messaging",
"ExchangeType": "topic"
}
}
Here you are specifying:
HostName
as localhost
as you are running locallyExchangeName
as waffle_messaging
- this can be changed to suit your needsExchange
you would like as topic
The first thing you will need to do is to call the code you just wrote in the “Messaging.Common” project.
The reference to the “Messaging.Common” project is already in place, therefore in the “Program.cs” file, before builder.Build()
is called, add the following:
builder.Services.SetUpRabbitMq(builder.Configuration);
This will pass in the application configuration including the RabbitMQSettings
– you can see these in the appsettings.json
file – and set up the topic exchange, if it doesn’t already exist, and connect to it.
Next, you will need to write the code that will convert a C# Order
object into a message.
Open the “RabbitSender.cs” file and add the following constructor and fields:
public class RabbitSender
{
private readonly IModel _channel;
private readonly RabbitMQSettings _rabbitSettings;
public RabbitSender(RabbitMQSettings rabbitSettings, IModel channel)
{
_channel = channel;
_rabbitSettings = rabbitSettings;
}
}
Both the RabbitMQSettings
and the IModel
dependencies were set up in the Messaging.Common
project.
You can now add a generic method called PublishMessage
to the RabbitSender
class with the following code:
public void PublishMessage<T>(T entity, string key) where T : class
{
var message = JsonSerializer.Serialize(entity);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: _rabbitSettings.ExchangeName,
routingKey: key,
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", key, message);
}
The above code serializes the C# object to JSON, encodes it to UTF8
and then publishes it to the RabbitMQ message broker, along with the specified routing key.
The key and serialized message will both be printed to the console for monitoring. This could be changed at a later date to print to logs.
Now you can add the RabbitSender
to the IoC container. Back in “Program.cs”. add the following immediately after you call SetUpRabbitMq
:
builder.Services.SetUpRabbitMq(builder.Configuration);
builder.Services.AddSingleton<RabbitSender>();
The OrderService
already has an API endpoint in the “Program.cs” file that will automatically create a seed waffle order, for demonstration purposes only, when the endpoint is called unless an actual order is passed through. You can now update the API to use RabbitSender
and send a message to the message broker like so:
app.MapPost("/waffleOrder", (RabbitSender rabbitSender, [FromBody] Order order) =>
{
if (order.Id is 0)
{
order = new Order().Seed(orderIdSeed);
orderIdSeed++;
}
rabbitSender.PublishMessage<Order>(order, "order.cookwaffle");
});
You have made the RabbitSender
class available via dependency injection and then called the PublishMessage
method passing both the order and the routing key order.cookwaffle
.
That is everything you need to do in the “OrderService” project.
Just as you did in the OrderService
, add the RabbitMQ setup to the IServiceCollection
in the “Program.cs” file before builder.Build()
is called.
builder.Services.SetUpRabbitMq(builder.Configuration);
Next, you will need to write the code that will listen for incoming messages from the message broker.
The RabbitReceiver
class has already been created for you and it implements IHostedService
. This means that the service, once added to the IoC container, will be a long-running, background service.
Open the “RabbitReceiver.cs” file and add the following constructor and fields:
public class RabbitReceiver : IHostedService
{
private readonly RabbitMQSettings _rabbitSettings;
private readonly IModel _channel;
private readonly IHubContext<OrderHub> _orderHub;
public RabbitReceiver(RabbitMQSettings rabbitSettings, IModel channel, IHubContext<OrderHub> hub)
{
_rabbitSettings = rabbitSettings;
_channel = channel;
_orderHub = hub;
}
// code omitted for brevity
}
Next, you’ll need to create a method in the RabbitReceiver
class that actually does the listening. Call this DoStuff
and add the following code:
private void DoStuff()
{
_channel.ExchangeDeclare(exchange: _rabbitSettings.ExchangeName,
type: _rabbitSettings.ExchangeType);
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName, exchange: _rabbitSettings.ExchangeName, routingKey: "#.cookwaffle");
}
In the above code, you are declaring the Exchange, checking if it exists and creating if it doesn’t.
You will then tell the IModel channel
to declare a queue and return the generated name.
Next, the queue is bound to the IModel
and set to listen to any message with a routing key matching cookwaffle
.
Finally, you will declare the consumer
and add an expression to the Received
event on that consumer. The BasicConsume
at the end of this code block will actually start the consumption process.
Add the following code to the DoStuff
method:
var consumerAsync = new AsyncEventingBasicConsumer(_channel);
consumerAsync.Received += async (_, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var order = JsonSerializer.Deserialize<Order>(message);
await _orderHub.Clients.All.SendAsync("new-order", order);
_channel.BasicAck(ea.DeliveryTag, false);
};
_channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumerAsync);
When the consumer receives a message with the required routing key pattern #.cookwaffle
it will:
EventArgs ea
- note that the first argument (happens to be the IModel
) in the expression is not used currently and is therefore discarded.message
message
from JSON to a strongly typed Order
model called order
_orderhub
to send a message to all clients (i.e. the kitchen UI)BasicAck
is called to let RabbitMQ know that the message has been received and processedThe last two steps of the RabbitReceiver
class is to implement the IHostedService
interface correctly.
First, the StartAsync
code should be updated to this:
public Task StartAsync(CancellationToken cancellationToken)
{
DoStuff();
return Task.CompletedTask;
}
The above code will begin on application start and call the DoStuff
method.
Lastly, you need to dispose of the channel correctly on shutdown by updating the StopAsync
method to the following:
public Task StopAsync(CancellationToken cancellationToken)
{
_channel.Dispose();
return Task.CompletedTask;
}
The final step to complete the code in the “KitchenService” is to ensure all the code you have written is instantiated on startup.
In the “Program.cs” file, add the following code after you call the SetUpRabbitMq
on builder.Services
:
builder.Services.AddHostedService<RabbitReceiver>();
Now the code is complete, you can start both the “KitchenService” and “OrderService” projects either from using dotnet run
in the CLI root of each project or by configuring your IDE to run both projects on start. The Messaging.Common
project will not run as it is just a class library.
You’ll, of course, need to have RabbitMQ running in Docker as outlined at the start of this post.
Once the “OrderService” has started you can navigate to https://localhost:7196/swagger/index.html to see the API explorer provided by Swagger.
The “KitchenService” UI is available on https://localhost:7193/ and should be a simple web view.
On swagger, click on the endpoint /waffleorder
and then on the “Try it out” button. You can either modify the message body or leave it as is - it will seed automatically if not set - then click on “Execute”.
Almost instantaneously, you should see an order pop up on the “KitchenService” UI. Your message has been successfully delivered by RabbitMQ - Congrats!
You have covered a very happy path above that software seldom follows!
RabbitMQ is a powerful, enterprise-level message broker with a plethora of features including those that deal with the unhappy path. I strongly urge you to check out the documentation and learn more about what to do when things go wrong.
I hope you enjoyed this tutorial. If you have any thoughts or ideas please say hello on any of the channels below:
Email: laylap@vmware.com
Twitter: @LaylaCodesIt
GitHub: layla-p