Show / Hide Table of Contents

    Broker behaviors pipeline

    Silverback is built to be modular and most of its feature are plugged into the consumers and producers via some so-called behaviors. The inbound and outbound messages flow through this pipeline and each behavior take care of a specific task such as serialization, encryption, chunking, logging, etc.

    The IProducerBehavior and IConsumerBehavior are the interfaces used to build such behaviors.

    Note

    IProducerBehavior and IConsumerBehavior inherit the ISorted interface. It is therefore mandatory to specify the exact sort index of each behavior.

    Built-in producer behaviors

    This behaviors build the producer pipeline and contain the actual logic to properly serialize the messages according to the applied configuration.

    Name Index Description
    ActivityProducerBehavior 100 Starts an Activity and adds the tracing information to the message headers.
    HeadersWriterProducerBehavior 200 Maps the properties decorated with the HeaderAttribute to the message headers.
    MessageEnricherProducerBehavior 250 Invokes all the IOutboundMessageEnricher configured for to the endpoint.
    MessageIdInitializerProducerBehavior 300 It ensures that an x-message-id header is always produced.
    BrokerKeyHeaderInitializer 400 Provided by the message broker implementation (e.g. KafkaMessageKeyInitializerProducerBehavior or RabbitRoutingKeyInitializerProducerBehavior), sets the message key header that will be used by the IProducer implementation to set the actual message key.
    BinaryFileHandlerProducerBehavior 500 Switches to the BinaryFileMessageSerializer if the message being produced implements the IBinaryFileMessage interface.
    SerializerProducerBehavior 600 Serializes the message being produced using the configured IMessageSerializer.
    EncryptorProducerBehavior 700 Encrypts the message according to the EncryptionSettings.
    SequencerProducerBehavior 800 Uses the available implementations of ISequenceWriter (e.g. ChunkSequenceWriter) to set the proper headers and split the published message or messages set to create the sequences.
    EndpointNameResolverProducerBehavior 900 Resolves the actual target endpoint name for the message being published.
    KafkaPartitionResolverProducerBehavior 901 Resolves the actual target endpoint name for the message being published.
    CustomHeadersMapperProducerBehavior 1000 Applies the custom header name mappings.

    Built-in consumer behaviors

    This behaviors are the foundation of the consumer pipeline and contain the actual logic to deserialize the incoming messages.

    Name Index Description
    ActivityConsumerBehavior 100 Starts an Activity with the tracing information from the message headers.
    FatalExceptionLoggerConsumerBehavior 200 Logs the unhandled exceptions thrown while processing the message. These exceptions are fatal since they will usually cause the consumer to stop.
    CustomHeadersMapperConsumerBehavior 300 Applies the custom header name mappings.
    TransactionHandlerConsumerBehavior 400 Handles the consumer transaction and applies the error policies.
    RawSequencerConsumerBehavior 500 Uses the available implementations of ISequenceReader (e.g. ChunkSequenceReader) to assign the incoming message to the right sequence.
    ExactlyOnceGuardConsumerBehavior 600 Uses the configured implementation of IExactlyOnceStrategy to ensure that the message is processed only once.
    DecryptorConsumerBehavior 700 Decrypts the message according to the EncryptionSettings.
    BinaryFileHandlerProducerBehavior 800 Switches to the BinaryFileMessageSerializer if the message being consumed is a binary message (according to the x-message-type header.
    DeserializerConsumerBehavior 900 Deserializes the messages being consumed using the configured IMessageSerializer.
    HeadersReaderConsumerBehavior 1000 Maps the headers with the properties decorated with the HeaderAttribute.
    SequencerConsumerBehavior 1100 Uses the available implementations of ISequenceReader (e.g. BatchSequenceReader) to assign the incoming message to the right sequence.
    PublisherConsumerBehavior 2000 Publishes the consumed messages to the internal bus.

    Custom behaviors

    The behaviors can be used to implement cross-cutting concerns or add new features to Silverback.

    Custom IProducerBehavior example

    The following example demonstrate how to set a custom message header on each outbound message.

    Note

    The ProducerPipelineContext and ConsumerPipelineContext hold a reference to the IServiceProvider and can be used to resolve the needed services. The IServiceProvider in the ConsumerPipelineContext can be either the root service provider or the scoped service provider for the processing of the consumed message (depending on the position of the behavior in the pipeline).

    Note

    The broker behaviors can be registered either as singleton or transient services. When registered as transient a new instance will be created per each producer or consumer.

    • ProducerBehavior
    • Startup
    public class CustomHeadersProducerBehavior : IProducerBehavior
    {
        public int SortIndex => 1000;
    
        public async Task HandleAsync(
            ProducerPipelineContext context, 
            ProducerBehaviorHandler next)
        {
            context.Envelope.Headers.Add("generated-by", "silverback");
    
            await next(context);
        }
    }
    
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services
                .AddSilverback()
                .WithConnectionToMessageBroker(options => options
                    .AddKafka())
                .AddSingletonBrokerBehavior<CustomHeadersBehavior>();
        }
    }
    

    Custom IConsumerBehavior example

    The following example demonstrate how to log the headers received with each inbound message.

    • ConsumerBehavior
    • Startup
    public class LogHeadersConsumerBehavior : IConsumerBehavior
    {
        private readonly ILogger<LogHeadersBehavior> _logger;
    
        public LogHeadersBehavior(ILogger<LogHeadersBehavior> logger)
        {
            _logger = logger;
        }
    
        public int SortIndex => 1000;
    
        public async Task HandleAsync(
            ConsumerPipelineContext context, 
            ConsumerBehaviorHandler next)
        {
            foreach (var header in context.Envelope.Headers)
            {
                _logger.LogTrace(
                    "{Name}={Value}",
                    header.Name,
                    header.Value);
            }
    
            await next(context);
        }
    }
    
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services
                .AddSilverback()
                .WithConnectionToMessageBroker(options => options
                    .AddKafka())
                .AddSingletonBrokerBehavior<LogHeadersBehavior>();
        }
    }
    

    See also

    Behaviors

    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini