Show / Hide Table of Contents

    Message Headers

    Custom headers

    There are multiple ways to add custom headers to an outbound message:

    • adding an enricher to the IProducerEndpoint
    • annotating some properties with the HeaderAttribute, as shown in the next chapter.
    • using a custom IBehavior or IProducerBehavior can be implemented, as shown in the Behaviors and Broker behaviors pipeline sections.
    Warning

    Some message broker implementations might not support headers and Silverback doesn't currently provide any workaround, thus the headers will simply be ignored.

    Using enrichers

    • Fluent
    • Legacy
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddOutbound<InventoryEvent>(endpoint => endpoint
                        .ProduceTo("inventory-events")
                        .AddHeader(
                            "x-my-header",
                            "static value")
                        .AddHeader<InventoryEvent>(
                            "x-product-id", 
                            envelope => envelope.Message?.ProductId)));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddOutbound<InventoryEvent>(
                    new KafkaProducerEndpoint("inventory-events")
                    {
                        Configuration = new KafkaProducerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        },
                        MessageEnrichers = new List<IOutboundMessageEnricher>
                        {
                            new GenericOutboundHeadersEnricher(
                                "x-my-header",
                                "static value"),
                            new GenericOutboundHeadersEnricher<InventoryEvent>(
                                "x-product-id", 
                                envelope => envelope.Message?.ProductId)
                        }
                    });
    }
    

    Using HeaderAttribute

    The HeaderAttribute usage is very simple: you just have to decorate the properties you want to publish as headers and specify a name for the header.

    The headers value will also automatically be mapped back to the property upon consuming if the property declares a setter.

    using Silverback.Messaging.Messages;
    
    namespace Sample
    {
        public class OrderCreatedEvent
        {
            public List<LineItems> Items { get; set; }
    
            [Header("x-order-type", PublishDefaultValue = true)]
            [JsonIgnore]
            public OrderType OrderType { get; set; }
    
            [Header("x-books-order")]
            public bool ContainsBooks => Items.Any(item => item.Type == "book")
    
            [Header("x-dvd-order")]
            public bool ContainsDvd => Items.Any(item => item.Type == "dvd")
        }
    }
    
    Note

    The PublishDefaultValue boolean property defines whether the header has to be published even if the property is set to the default value for its data type. The default is false.

    Note that the JsonIgnoreAttribute can be used to prevent the same properties to be serialized in the JSON body, when using the JsonMessageSerializer.

    Important

    Only the message type will be scanned, therefore the properties decorated with the HeaderAttribute must be in the root of the message object.

    Default headers

    Silverback will add some headers to the produced messages. They may vary depending on the scenario. Here is the list of the default headers that may be sent.

    Header Key Description
    x-message-id The message identifier.
    x-message-type The assembly qualified name of the message type. Used by the default JsonMessageSerializer.
    x-failed-attempts If an exception if thrown the failed attempts will be incremented and stored as header. This is necessary for the error policies to work.
    x-source-endpoint This will be set by the Move error policy and will contain the name of the endpoint the failed message is being moved from.
    x-chunk-index The message chunk index, used when chunking is enabled.
    x-chunk-count The total number of chunks the message was split into, used when chunking is enabled.
    x-chunk-last A boolean value indicating whether the message is the last one of a chunks sequence, used when chunking is enabled.
    x-first-chunk-offset The IBrokerMessageOffset value of the first chunk of the same message, used when chunking is enabled.
    traceparent Used for distributed tracing. It is set by the IProducer using the current Activity.Id. The IConsumer uses it's value to set the Activity.ParentId. Note that an Activity is automatically started by the default IProducer implementation. The header is implemented according to the W3C Trace Context proposal.
    tracestate Used for distributed tracing. It corresponds to the Activity.TraceStateString. The header is implemented according to the W3C Trace Context proposal.
    tracebaggage Used for distributed tracing. It corresponds to the string representation of the Activity.Baggage dictionary. This is not part of the w3c standard.
    content-type The content type of the binary file, used when producing or consuming an IBinaryFileMessage.
    x-failure-reason The header that will be set by the MoveMessageErrorPolicy and will contain the reason why the message failed to be processed.

    Kafka specific

    Header Key Description
    x-kafka-message-key The header that will be filled with the key of the message consumed from Kafka.
    x-kafka-message-timestamp The header that will be filled with the timestamp of the message consumed from Kafka.
    x-source-consumer-group-id The header that will be set by the MoveMessageErrorPolicy and will contain the GroupId the consumer that consumed the message that failed to be processed.
    x-source-topic The header that will be set by the MoveMessageErrorPolicy and will contain the source topic of the message that failed to be processed.
    x-source-partition The header that will be set by the MoveMessageErrorPolicy and will contain the source partition of the message that failed to be processed.
    x-source-offset The header that will be set by the MoveMessageErrorPolicy and will contain the offset of the message that failed to be processed.
    x-source-timestamp The header that will be set by the MoveMessageErrorPolicy and will contain the timestamp of the message that failed to be processed.

    The static classes DefaultMessageHeaders and KafkaMessageHeaders contain all default header names constants.

    Customizing header names

    The default header names can be overridden using the WithCustomHeaderName configuration method.

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services
                .AddSilverback()
                .WithConnectionToMessageBroker(options => options
                    .AddKafka())
                .AddEndpointsConfigurator<MyEndpointsConfigurator>()
                .WithCustomHeaderName(DefaultMessageHeaders.ChunkId, "x-ch-id")
                .WithCustomHeaderName(DefaultMessageHeaders.ChunksCount, "x-ch-cnt"));
        }
    }
    
    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini