Friday, Feb 9, 2024
Decoupling Message Brokers in C#
Intro
Today, we’re going to discuss message brokers and how to make your code less dependent on the services you’re using. This is the second blog post in the “Decoupling” series. If you’re interested, you can read my previous blog post where we learned how to decouple from schedulers in C#, and you can also find motivation for why decoupling from such services is a good practice. Without further ado, let’s delve into today’s topic: message brokers.
We’ll be concentrating on implementing MassTransit with RabbitMQ, showcasing how to develop custom message handlers that can be shared across your application without the need for referencing MassTransit.
For those unfamiliar with the concept, a message broker fundamentally serves as a communication system between two distinct points through the exchange of messages. These points could be two separate applications, services, or any network-connected entity capable of executing code.
Readers are encouraged to look at the proof-of-concept repository, the link can also be found at the end of the blog post!
Interfaces
As always we’ll start with interfaces; there’s a lot to cover so let’s start with the basics:
public interface IMessageHandler;
public interface IMessageHandler<TMessage> : IMessageHandler
where TMessage : class, IMessage
{
Task Handle(TMessage message, IMessageContext context);
}
IMessageHandler
implementation is the place where you’ll define logic to handle the provided TMessage
.
public interface IMessage
{
Guid Id { get; init; }
// Other stuff...
}
The class implementing IMessage
allows you to store any information, as long as it’s serializable.
public interface IServiceBus
{
Task PublishAsync<T>(T message) where T : class, IMessage;
void Start();
void Stop();
}
A lot of methods were removed from the IServiceBus
for brevity, but these are the only ones you’ll actually need.
IMessageHandler
,IMessage
,IMessageContext
andIServiceBus
will be our only public interfaces, meaning they are the only ones referenced outside of the source DLL.
Custom Consumer Convention
Our main driver behind this implementation is MassTransit. It provides the ability to create and utilize custom interfaces in place of those provided by MassTransit itself. This flexibility enables us to opt-out of using MassTransit if the need arises.
To achieve this, we need to implement the custom consumer convention, instructing MassTransit to utilize our interfaces instead of its own.
internal class CustomConsumerConvention : IConsumerConvention
{
// Returns the message convention for the type of T
// TConsumer is an implementation of IMessageHandler
IConsumerMessageConvention IConsumerConvention.GetConsumerMessageConvention<TConsumer>() => new CustomConsumerMessageConvention<TConsumer>();
}
The idea is to tell MassTransit how to connect our IMessageHandler
with IMessage<T>
- this is achieved through CustomConsumerMessageConvention<TConsumer>
class.
internal class CustomConsumerMessageConvention<TConsumer> : IConsumerMessageConvention
where TConsumer : class
{
// Example:
// TConsumer = ScopedMessageHandler<MessageBrokerTestHandler>
// 1. Extract MessageBrokerTestHandler : IMessageHandler<MessageA>, IMessageHandler<MessageB> from TConsumer
// 2. Extract MessageA & MessageB from the MessageBrokerTestHandler
public IEnumerable<IMessageInterfaceType> GetMessageTypes()
{
// See ScopedMessageHandler, generic parameter is the actual handler.
var scopedConsumerType = typeof(TConsumer);
// Actual message consumer that implements IMessageHandler<TMessage>
var consumerType = scopedConsumerType.IsGenericType && scopedConsumerType.GetGenericTypeDefinition() == typeof(ScopedMessageHandler<>)
? scopedConsumerType.GetGenericArguments().First()
: throw new Exception("Consumer is not wrapped with ScopedMessageHandler.");
var customConsumerInterfaceTypes = consumerType
.GetInterfaces()
.Where(interfaceType =>
interfaceType.IsGenericType
&& interfaceType.GetGenericTypeDefinition() == typeof(IMessageHandler<>)
)
.Select(messageHandlerType => new CustomConsumerInterfaceType(messageHandlerType.GetGenericArguments().First(), scopedConsumerType))
.Where(x =>
!x.MessageType.IsValueType &
& x.MessageType != typeof(string)
);
foreach (var type in customConsumerInterfaceTypes)
yield return type;
}
}
This is the first time ScopedMessageHandler
is mentioned, it’s an internal wrapper around IMessageHandler
implementation that allows us to manage dependency injections IServiceScope
.
Although the preceding code might seem complex initially, it becomes clear once we look at an example:
public class MessageBrokerTestHandler : IMessageHandler<MessageA>, IMessageHandler<MessageB>
{
Task IMessageHandler<MessageA>.Handle(MessageA message, IMessageContext context) => Task.CompletedTask;
Task IMessageHandler<MessageB>.Handle(MessageB message, IMessageContext context) => Task.CompletedTask;
}
MessageBrokerTestHandler
is internally wrapped:ScopedMessageHandler<MessageBrokerTestHandler>
.- Entering
GetMessageTypes
,TConsumer = ScopedMessageHandler<MessageBrokerTestHandler>
. - Extract
MessageBrokerTestHandler : IMessageHandler<MessageA>, IMessageHandler<MessageB>
fromTConsumer
. - Extract
MessageA
&MessageB
from theMessageBrokerTestHandler
. - Return an instance of
CustomConsumerInterfaceType
that holds information on the message and the consumer.
internal class CustomConsumerInterfaceType : IMessageInterfaceType
{
private readonly Lazy<IMessageConnectorFactory> _consumeConnectorFactory;
// Message type is our `IMessage` and consumer type is the ScopedMessageHandler
public CustomConsumerInterfaceType(Type messageType, Type consumerType)
{
MessageType = messageType;
_consumeConnectorFactory = new Lazy<IMessageConnectorFactory>(
() => (IMessageConnectorFactory)Activator.CreateInstance(typeof(CustomConsumeConnectorFactory<,>).MakeGenericType(consumerType, messageType))
);
}
public Type MessageType { get; }
// Omitted:
// IConsumerMessageConnector<T> IMessageInterfaceType.GetConsumerConnector<T>()
// IInstanceMessageConnector<T> IMessageInterfaceType.GetInstanceConnector<T>()
}
The next step is to implement a custom IMessageConnectorFactory
.
internal class CustomConsumeConnectorFactory<TConsumer, TMessage> : IMessageConnectorFactory
where TConsumer : class, IScopedMessageHandler
where TMessage : class, IMessage
{
private readonly ConsumerMessageConnector<TConsumer, TMessage> _consumerConnector;
private readonly InstanceMessageConnector<TConsumer, TMessage> _instanceConnector;
public CustomConsumeConnectorFactory()
{
var filter = new CustomConsumerMessageFilter<TConsumer, TMessage>();
_consumerConnector = new ConsumerMessageConnector<TConsumer, TMessage>(filter);
_instanceConnector = new InstanceMessageConnector<TConsumer, TMessage>(filter);
}
// Omitted:
// IConsumerMessageConnector<T> IMessageConnectorFactory.CreateConsumerConnector<T>()
// IInstanceMessageConnector<T> IMessageConnectorFactory.CreateInstanceConnector<T>()
}
Lastly, CustomConsumerMessageFilter
will enable us to invoke our custom handler.
internal class CustomConsumerMessageFilter<TConsumer, TMessage> : IConsumerMessageFilter<TConsumer, TMessage>
where TConsumer : class, IScopedMessageHandler
where TMessage : class, IMessage
{
async Task IFilter<ConsumerConsumeContext<TConsumer, TMessage>>.Send(
ConsumerConsumeContext<TConsumer, TMessage> context,
IPipe<ConsumerConsumeContext<TConsumer, TMessage>> next
)
{
// Trimmed implementation.
// This Handle method will internally create our IMessageHandler through a service provider and call its Handle method.
await context.Consumer.Handle(context.Message, new MessageContext());
}
// Omitted:
// void IProbeSite.Probe(ProbeContext context)
}
Service Bus
We’ll be focusing on two transports for sending messages:
- In-Memory. Designed for sending and receiving messages within the current process only.
- RabbitMq. A message broker system utilizing queues for message exchange.
For each of these transports, we’ll create a custom service bus. However, before proceeding, we must define a base class for both.
internal abstract class ServiceBus : IServiceBus
{
private readonly BusConfigurator _busConfigurator = new();
private readonly IServiceProvider _serviceProvider;
private IBusControl _bus;
public ServiceBus(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
internal void ConfigureEndpoints(Action<IBusConfigurator> configurator) => configurator?.Invoke(_busConfigurator);
// Setup and create a Bus Control instance
protected abstract IBusControl Setup();
// Register bus endpoints
protected virtual void SetupEndpoints(IBusFactoryConfigurator cfg)
{
var serviceProviderIsService = _serviceProvider.GetRequiredService<IServiceProviderIsService>();
foreach (var (endpointName, handlers) in _busConfigurator.EndpointMap)
{
var invalidType = handlers.FirstOrDefault(handler =>
!handler.IsAssignableTo(typeof(IMessageHandler))
|| !serviceProviderIsService.IsService(handler)
);
if (invalidType is not null)
{
throw new Exception($"Type '{invalidType.FullName}' is not assignable to {nameof(IMessageHandler)} or is not registered with IServiceCollection.");
}
cfg.ReceiveEndpoint(endpointName, e =>
{
foreach (var consumer in handlers)
{
e.Consumer(
typeof(ScopedMessageHandler<>).MakeGenericType(consumer),
consumerType => ActivatorUtilities.CreateInstance(_serviceProvider, consumerType, [_serviceProvider.CreateScope()])
);
}
});
}
}
// Trimmed implementation.
// Omitted:
// Task PublishAsync<T>(T message)
// virtual void Start()
// void Stop() => _bus.Stop();
}
Method SetupEndpoints
, is responsible for configuring the endpoints for message handling within our messaging system. Let’s break down what it does:
- Iterating Through Endpoint Configuration: Iterates through each endpoint configuration.
- Type Validation: For each endpoint, it checks if the configured message handlers are valid.
- Endpoint Configuration: If all handlers are valid, it sets up a receive endpoint for the current endpoint name. Within this endpoint configuration, it adds consumers for each message handler associated with the endpoint.
- Dynamic Consumer Configuration: It dynamically configures consumers for each message handler using the
ScopedMessageHandler<>
type. Additionally, it ensures that each consumer instance is created within a service scope to manage dependencies effectively.
Overall, this method establishes the necessary endpoints for message handling based on the provided configuration, ensuring that each handler is appropriately validated and configured within the messaging system.
internal class InMemoryServiceBus : ServiceBus
{
public InMemoryServiceBus(IServiceProvider serviceProvider)
: base(serviceProvider)
{
}
protected override IBusControl Setup()
{
ConsumerConvention.Register<CustomConsumerConvention>();
return Bus.Factory.CreateUsingInMemory(cfg =>
{
SetupEndpoints(cfg);
});
}
}
Handlers are registered through IBusConfigurator
where you specify an endpoint name and the corresponding handlers to be invoked when a message is sent to that endpoint.
public interface IBusConfigurator
{
void ReceiveEndpoint(string endpointName, Action<IBusEndpointConfigurator> endpointConfigurator);
}
public interface IBusEndpointConfigurator
{
void AddHandler<T>() where T : IMessageHandler;
}
It’s always a lot easier to look at some examples, this code sets up a message handling scenario where messages sent to the app-default endpoint will be processed by the MessageBrokerTestHandler
.
serviceCollection.AddSingleton<MessageBrokerTestHandler>();
serviceCollection.RegisterMessageBroker(config, busCfg =>
{
busCfg.ReceiveEndpoint("app-default", ep =>
{
ep.AddHandler<MessageBrokerTestHandler>();
});
});
Ending notes
Here’s a few tips on how to upgrade the previous implementation:
- Create a message handler cancellation system
- You will need a way of notifying any service in use that you want to cancel the execution of a handler. The service executing the handler may be one of many services that aren’t running in the same environment.
- Read my previous blog? Create a system that shares one
IWorkOrder
interface, used to both schedule and consume messages.
Now, if you ever wish to change your implementation down the line, you can do so with considerably less effort!
💾 See the full example on Github!.
Thanks for reading!