Using the Message Bus
The message bus is the central component of Silverback that allows you to publish messages and have them delivered to the appropriate subscribers.
Creating the Message Model
First, we need to create a message class. The message class can be any POCO class. Any CLR type can be used or referenced since messages are only exchanged in memory, making it similar to calling a method directly.
public class SampleMessage
{
public string Content { get; set; }
}
It is not mandatory, but it is recommended to use the Silverback.Core.Model package (documented in the next chapter) to enhance semantics and improve code readability.
Silverback.Core.Model
Dedicated interfaces for events, commands, and queries are available in Silverback.Core.Model to help define the meaning of each message, making the code more structured and readable.
The integration variations are designed for messages exchanged via a message broker like Apache Kafka or MQTT.
These are the available interfaces:
- IEvent is used to notify other services of an event that has occurred. Events are fire-and-forget messages, meaning no response is expected.
- ICommand and ICommand<TResult> are used to trigger actions in another service or component. Commands are typically consumed by a single subscriber and can return a value (of type
TResult). - IQuery<TResult> functions similarly to ICommand<TResult> but always returns a result, as it represents a data request.
- The IIntegrationMessage interface identifies messages exchanged through a message broker. It has two specialized variations: IIntegrationEvent and IIntegrationCommand.
Publishing Messages
To publish a message, you need an instance of IPublisher, which is registered with the DI container as a transient service.
using Silverback.Messaging.Publishing;
public class MyPublishingService
{
private readonly IPublisher _publisher;
public MyPublishingService(IPublisher publisher)
{
_publisher = publisher;
}
public async Task PublishSomething()
{
SampleMessage message = new SampleMessage
{
Content = "Silverback rocks!"
};
await _publisher.PublishAsync(message);
}
}
The publisher provides both synchronous and asynchronous versions of each method.
Important
The publisher is a transient service, meaning it can be resolved from the root provider and injected into singleton services, but it will throw an exception if the published message has to be handled by a scoped subscriber. Producing to a message broker does not require a scope and is therefore safe to be done from a singleton service.
Return Values
Subscribers can return a result after processing a message.
public async Task<QueryResult> PublishSomething()
{
MyQuery query = new MyQuery() { ... };
QueryResult result = await _publisher.PublishAsync(query);
return result.Single();
}
Note
The call to Single() is required because Silverback allows multiple subscribers for the same message, collecting multiple return values. This is unnecessary when using the specialized publishers described in the next chapter. The ICommand and IQuery interfaces specify the TResult type for better clarity.
Silverback.Core.Model Extensions
Each message type (IEvent, ICommand/ICommand<TResult>, and IQuery<TResult>) includes specialized extensions for IPublisher to improve semantics and clarity.
public async Task PublishEvent()
{
MyEvent myEvent = new MyEvent() { ... };
await _publisher.PublishEventAsync(myEvent);
}
public async Task ExecuteCommand()
{
MyCommand myCommand = new MyCommand() { ... };
await _publisher.ExecuteCommandAsync(myCommand);
}
Subscribing to Messages
Now, we need to write a subscriber method to process the published messages.
Silverback’s message bus routes messages based on their type. When a message is published, Silverback evaluates the signatures of the subscribed methods and invokes those accepting the specific message type, a base type, or an implemented interface.
Subscriber Class
The preferred way to subscribe is by implementing message handling logic in a dedicated subscriber class.
public class SubscribingService
{
public async Task OnMessageReceived(SampleMessage message)
{
// Process message
}
}
The subscriber class must be registered with the DI container using the AddScopedSubscriber, AddSingletonSubscriber, or AddTransientSubscriber extension methods.
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.AddScopedSubscriber<SubscribingService>();
}
}
By default, all public methods in a registered subscriber class (excluding the ones declared in the based classes) are subscribed. To subscribe non-public methods, or methods from the base classes, or customize the subscription options, use the SubscribeAttribute to decorate the methods.
You can also disable automatic subscription of public methods:
services
.AddSilverback()
.AddScopedSubscriber<SubscribingService>(autoSubscribeAllPublicMethods: false);
Delegate based subscription
In some cases you may prefer to subscribe a method delegate (or an inline lambda) directly using the AddDelegateSubscriber method.
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.AddDelegateSubscriber((SampleMessage message) =>
{
// TODO: Process messages
});
}
}
Supported methods and parameters
The subscribed method (or delegate) can either be synchronous or asynchronous (returning a Task or a ValueTask).
The first parameter must be the message and the parameter type can be the specific message, a base class or an implemented interface.
The method can have other parameters that will be resolved using the service provider, or the cancellation token (see Cancellation).
public class SubscribingService
{
public async Task OnMessageReceived(BasketCheckoutMessage message, CheckoutService service)
{
service.Checkout(message.BaksetId, message.UserId);
}
}
or
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.AddDelegateSubscriber(
(BasketCheckoutMessage message, CheckoutService service) =>
{
service.Checkout(message.BaksetId, message.UserId);
});
}
}
Return values
A subscriber can also have a return value that can be collected by the publisher.
public class SubscribingService
{
public async Task<SampleResult> OnMessageReceived(SampleMessage message)
{
...
return new SampleResult(...);
}
}
Ideally, you should use the specialized interfaces ICommand<TResult> and IQuery<TResult> to define the return type and make it easier to handle the result.
Return New Messages (Republishing)
The subscriber method can also return a message or a collection of messages (either IEnumerable
public class SubscribingService
{
public async Task<OtherSampleMessage> OnMessageReceived(SampleMessage message)
{
...
return new OtherSampleMessage
{
...
};
}
}
or
public class SubscribingService
{
public IEnumerable<IMessage> OnMessageReceived(IEnumerable<SampleMessage> messages) =>
messages.SelectMany(message =>
{
yield return new OtherSampleMessage1
{
...
};
yield return new OtherSampleMessage2
{
...
};
});
}
Silverback recognizes only the messages implementing IMessage as messages to be republished (IEvent, ICommand/ICommand<TResult>, and IQuery<TResult> all implement that interface), but you can register your own types, base types or interfaces.
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.HandleMessagesOfType<ICustomMessage>();
}
}
Cancellation
If a subscriber method accepts a CancellationToken, Silverback will forward the optional cancellation token to the subscribers.
await _publisher.ExecuteCommandAsync(myCommand, cancellationToken);
The cancellation token can be used to interrupt long-running operations or can be passed to other API such as HTTP requests or database operations supporting cancellation.
public async Task OnCommandReceived(MyCommand command, CancellationToken cancellationToken)
{
while (...) // Long-running operation
{
cancellationToken.ThrowIfCancellationRequested();
// Processing
}
}
Behaviors
Behaviors allow you to implement a custom pipeline (similar to the ASP.NET middleware), adding cross-cutting concerns like logging and validation.
public class TracingBehavior : IBehavior
{
private readonly ITracer _tracer;
public TracingBehavior(ITracer tracer)
{
_tracer = tracer;
}
public async Task<IReadOnlyCollection<object?>> HandleAsync(
object message,
MessageHandler next)
{
try
{
_tracer.TraceProcessing(message);
object result = await next(message);
_tracer.TraceProcessed(message);
return result;
}
catch (Exception ex)
{
_tracer.TraceError(message, ex);
throw;
}
}
}
Register the behavior with the DI container using AddScopedBehavior, AddSingletonBehavior, or AddTransientBehavior.
services
.AddSilverback()
.AddScopedBehavior<TracingBehavior>();
If execution order is important, implement ISorted and specify the SortIndex.
public class SortedBehavior : IBehavior, ISorted
{
public int SortIndex => 120;
public Task<IReadOnlyCollection<object?>> HandleAsync(
object message,
MessageHandler next)
{
return next(message);
}
}