Broker behaviors pipeline
Silverback is built to be modular and most of its feature are plugged into the consumers and producers via some so-called behaviors. The inbound and outbound messages flow through this pipeline and each behavior take care of a specific task such as serialization, encryption, chunking, logging, etc.
The IProducerBehavior and IConsumerBehavior are the interfaces used to build such behaviors.
Note
IProducerBehavior and IConsumerBehavior inherit the ISorted interface. It is therefore mandatory to specify the exact sort index of each behavior.
Built-in producer behaviors
This behaviors build the producer pipeline and contain the actual logic to properly serialize the messages according to the applied configuration.
Name | Index | Description |
---|---|---|
ActivityProducerBehavior | 100 | Starts an Activity and adds the tracing information to the message headers. |
HeadersWriterProducerBehavior | 200 | Maps the properties decorated with the HeaderAttribute to the message headers. |
MessageEnricherProducerBehavior | 250 | Invokes all the IOutboundMessageEnricher configured for to the endpoint. |
MessageIdInitializerProducerBehavior | 300 | It ensures that an x-message-id header is always produced. |
BrokerKeyHeaderInitializer | 400 | Provided by the message broker implementation (e.g. KafkaMessageKeyInitializerProducerBehavior or RabbitRoutingKeyInitializerProducerBehavior), sets the message key header that will be used by the IProducer implementation to set the actual message key. |
BinaryFileHandlerProducerBehavior | 500 | Switches to the BinaryFileMessageSerializer if the message being produced implements the IBinaryFileMessage interface. |
SerializerProducerBehavior | 600 | Serializes the message being produced using the configured IMessageSerializer. |
EncryptorProducerBehavior | 700 | Encrypts the message according to the EncryptionSettings. |
SequencerProducerBehavior | 800 | Uses the available implementations of ISequenceWriter (e.g. ChunkSequenceWriter) to set the proper headers and split the published message or messages set to create the sequences. |
EndpointNameResolverProducerBehavior | 900 | Resolves the actual target endpoint name for the message being published. |
KafkaPartitionResolverProducerBehavior | 901 | Resolves the actual target endpoint name for the message being published. |
CustomHeadersMapperProducerBehavior | 1000 | Applies the custom header name mappings. |
Built-in consumer behaviors
This behaviors are the foundation of the consumer pipeline and contain the actual logic to deserialize the incoming messages.
Name | Index | Description |
---|---|---|
ActivityConsumerBehavior | 100 | Starts an Activity with the tracing information from the message headers. |
FatalExceptionLoggerConsumerBehavior | 200 | Logs the unhandled exceptions thrown while processing the message. These exceptions are fatal since they will usually cause the consumer to stop. |
CustomHeadersMapperConsumerBehavior | 300 | Applies the custom header name mappings. |
TransactionHandlerConsumerBehavior | 400 | Handles the consumer transaction and applies the error policies. |
RawSequencerConsumerBehavior | 500 | Uses the available implementations of ISequenceReader (e.g. ChunkSequenceReader) to assign the incoming message to the right sequence. |
ExactlyOnceGuardConsumerBehavior | 600 | Uses the configured implementation of IExactlyOnceStrategy to ensure that the message is processed only once. |
DecryptorConsumerBehavior | 700 | Decrypts the message according to the EncryptionSettings. |
BinaryFileHandlerProducerBehavior | 800 | Switches to the BinaryFileMessageSerializer if the message being consumed is a binary message (according to the x-message-type header. |
DeserializerConsumerBehavior | 900 | Deserializes the messages being consumed using the configured IMessageSerializer. |
HeadersReaderConsumerBehavior | 1000 | Maps the headers with the properties decorated with the HeaderAttribute. |
SequencerConsumerBehavior | 1100 | Uses the available implementations of ISequenceReader (e.g. BatchSequenceReader) to assign the incoming message to the right sequence. |
PublisherConsumerBehavior | 2000 | Publishes the consumed messages to the internal bus. |
Custom behaviors
The behaviors can be used to implement cross-cutting concerns or add new features to Silverback.
Custom IProducerBehavior example
The following example demonstrate how to set a custom message header on each outbound message.
Note
The ProducerPipelineContext and ConsumerPipelineContext hold a reference to the IServiceProvider and can be used to resolve the needed services. The IServiceProvider in the ConsumerPipelineContext can be either the root service provider or the scoped service provider for the processing of the consumed message (depending on the position of the behavior in the pipeline).
Note
The broker behaviors can be registered either as singleton or transient services. When registered as transient a new instance will be created per each producer or consumer.
public class CustomHeadersProducerBehavior : IProducerBehavior
{
public int SortIndex => 1000;
public async Task HandleAsync(
ProducerPipelineContext context,
ProducerBehaviorHandler next)
{
context.Envelope.Headers.Add("generated-by", "silverback");
await next(context);
}
}
Custom IConsumerBehavior example
The following example demonstrate how to log the headers received with each inbound message.
public class LogHeadersConsumerBehavior : IConsumerBehavior
{
private readonly ILogger<LogHeadersBehavior> _logger;
public LogHeadersBehavior(ILogger<LogHeadersBehavior> logger)
{
_logger = logger;
}
public int SortIndex => 1000;
public async Task HandleAsync(
ConsumerPipelineContext context,
ConsumerBehaviorHandler next)
{
foreach (var header in context.Envelope.Headers)
{
_logger.LogTrace(
"{Name}={Value}",
header.Name,
header.Value);
}
await next(context);
}
}