Azure Service Bus Message Pump
Azure Service Bus Message Pump will perform all the plumbing that is required for processing queues & topics:
- Manage message pump lifecycle
- Deserialize messages into concrete types
- Interpret message to provide correlation & context information
- Provide exception handling
- Provide telemetry
As a user, the only thing you have to do is focus on processing messages, not how to get them.
You can do this by creating a message handler which implements from IAzureServiceBusMessageHandler<TMessage>
(or IMessageHandler<TMessage, MessageContext>
).
Here is an example of a message handler that expects messages of type Order
:
using Arcus.Messaging.Abstractions;
using Arcus.Messaging.Pumps.ServiceBus;
using Microsoft.Extensions.Logging;
public class OrdersMessageHandler : IAzureServiceBusMessageHandler<Order>
{
private readonly ILogger _logger;
public OrdersMessageHandler(ILogger<OrdersMessageHandler> logger)
{
_logger = logger;
}
public async Task ProcessMessageAsync(
Order orderMessage,
AzureServiceBusMessageContext messageContext,
MessageCorrelationInfo correlationInfo,
CancellationToken cancellationToken)
{
_logger.LogInformation("Processing order {OrderId} for {OrderAmount} units of {OrderArticle} bought by {CustomerFirstName} {CustomerLastName}", orderMessage.Id, orderMessage.Amount, orderMessage.ArticleNumber, orderMessage.Customer.FirstName, orderMessage.Customer.LastName);
// Custom logic
_logger.LogInformation("Order {OrderId} processed", orderMessage.Id);
}
}
or with using the more general IMessageHandler<>
, that will use the more general MessageContext
instead of the one specific for Azure Service Bus.
using Arcus.Messaging.Abstractions;
using Arcus.Messaging.Pumps.Abstractions.MessageHandling;
using Microsoft.Extensions.Logging;
public class OrdersMessageHandler : IMessageHandler<Order>
{
private readonly ILogger _logger;
public OrdersMessageHandler(ILogger<OrdersMessageHandler> logger)
{
_logger = logger;
}
public async Task ProcessMessageAsync(
Order orderMessage,
MessageContext messageContext,
MessageCorrelationInfo correlationInfo,
CancellationToken cancellationToken)
{
_logger.LogInformation("Processing order {OrderId} for {OrderAmount} units of {OrderArticle} bought by {CustomerFirstName} {CustomerLastName}", orderMessage.Id, orderMessage.Amount, orderMessage.ArticleNumber, orderMessage.Customer.FirstName, orderMessage.Customer.LastName);
// Custom logic
_logger.LogInformation("Order {OrderId} processed", orderMessage.Id);
}
}
Other topics:
- Azure Service Bus Message Pump
Configuration
Once the message handler is created, you can very easily configure it:
using Microsoft.Extensions.DependencyInjection;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Add Service Bus Queue message pump and use OrdersMessageHandler to process the messages
// ISecretProvider will be used to lookup the connection string scoped to the queue for secret ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING
services.AddServiceBusQueueMessagePump("ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING")
.WithServiceBusMessageHandler<OrdersMessageHandler, Order>();
// Add Service Bus Topic message pump and use OrdersMessageHandler to process the messages on the 'My-Subscription-Name' subscription
// ISecretProvider will be used to lookup the connection string scoped to the queue for secret ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING
services.AddServiceBusTopicMessagePump("My-Subscription-Name", "ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING")
.WithServiceBusMessageHandler<OrdersMessageHandler, Order>();
// Note, that only a single call to the `.WithServiceBusMessageHandler` has to be made when the handler should be used across message pumps.
}
}
In this example, we are using the Azure Service Bus message pump to process a queue and a topic and use the connection string stored in the ARCUS_SERVICEBUS_ORDERS_CONNECTIONSTRING
connection string.
We support connection strings that are scoped on the Service Bus namespace and entity allowing you to choose the required security model for your applications. If you are using namespace-scoped connection strings you'll have to pass your queue/topic name as well.