Message Headers
Custom headers
There are multiple ways to add custom headers to an outbound message:
- adding an enricher to the IProducerEndpoint
- annotating some properties with the HeaderAttribute, as shown in the next chapter.
- using a custom IBehavior or IProducerBehavior can be implemented, as shown in the Behaviors and Broker behaviors pipeline sections.
Warning
Some message broker implementations might not support headers and Silverback doesn't currently provide any workaround, thus the headers will simply be ignored.
Using enrichers
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<InventoryEvent>(endpoint => endpoint
.ProduceTo("inventory-events")
.AddHeader(
"x-my-header",
"static value")
.AddHeader<InventoryEvent>(
"x-product-id",
envelope => envelope.Message?.ProductId)));
}
Using HeaderAttribute
The HeaderAttribute usage is very simple: you just have to decorate the properties you want to publish as headers and specify a name for the header.
The headers value will also automatically be mapped back to the property upon consuming if the property declares a setter.
using Silverback.Messaging.Messages;
namespace Sample
{
public class OrderCreatedEvent
{
public List<LineItems> Items { get; set; }
[Header("x-order-type", PublishDefaultValue = true)]
[JsonIgnore]
public OrderType OrderType { get; set; }
[Header("x-books-order")]
public bool ContainsBooks => Items.Any(item => item.Type == "book")
[Header("x-dvd-order")]
public bool ContainsDvd => Items.Any(item => item.Type == "dvd")
}
}
Note
The PublishDefaultValue
boolean property defines whether the header has to be published even if the property is set to the default value for its data type. The default is false
.
Note that the JsonIgnoreAttribute can be used to prevent the same properties to be serialized in the JSON body, when using the JsonMessageSerializer.
Important
Only the message type will be scanned, therefore the properties decorated with the HeaderAttribute must be in the root of the message object.
Default headers
Silverback will add some headers to the produced messages. They may vary depending on the scenario. Here is the list of the default headers that may be sent.
Header Key | Description |
---|---|
x-message-id |
The message identifier. |
x-message-type |
The assembly qualified name of the message type. Used by the default JsonMessageSerializer. |
x-failed-attempts |
If an exception if thrown the failed attempts will be incremented and stored as header. This is necessary for the error policies to work. |
x-source-endpoint |
This will be set by the Move error policy and will contain the name of the endpoint the failed message is being moved from. |
x-chunk-index |
The message chunk index, used when chunking is enabled. |
x-chunk-count |
The total number of chunks the message was split into, used when chunking is enabled. |
x-chunk-last |
A boolean value indicating whether the message is the last one of a chunks sequence, used when chunking is enabled. |
x-first-chunk-offset |
The IBrokerMessageOffset value of the first chunk of the same message, used when chunking is enabled. |
traceparent |
Used for distributed tracing. It is set by the IProducer using the current Activity.Id. The IConsumer uses it's value to set the Activity.ParentId. Note that an Activity is automatically started by the default IProducer implementation. The header is implemented according to the W3C Trace Context proposal. |
tracestate |
Used for distributed tracing. It corresponds to the Activity.TraceStateString. The header is implemented according to the W3C Trace Context proposal. |
tracebaggage |
Used for distributed tracing. It corresponds to the string representation of the Activity.Baggage dictionary. This is not part of the w3c standard. |
content-type |
The content type of the binary file, used when producing or consuming an IBinaryFileMessage. |
x-failure-reason |
The header that will be set by the MoveMessageErrorPolicy and will contain the reason why the message failed to be processed. |
Kafka specific
Header Key | Description |
---|---|
x-kafka-message-key |
The header that will be filled with the key of the message consumed from Kafka. |
x-kafka-message-timestamp |
The header that will be filled with the timestamp of the message consumed from Kafka. |
x-source-consumer-group-id |
The header that will be set by the MoveMessageErrorPolicy and will contain the GroupId the consumer that consumed the message that failed to be processed. |
x-source-topic |
The header that will be set by the MoveMessageErrorPolicy and will contain the source topic of the message that failed to be processed. |
x-source-partition |
The header that will be set by the MoveMessageErrorPolicy and will contain the source partition of the message that failed to be processed. |
x-source-offset |
The header that will be set by the MoveMessageErrorPolicy and will contain the offset of the message that failed to be processed. |
x-source-timestamp |
The header that will be set by the MoveMessageErrorPolicy and will contain the timestamp of the message that failed to be processed. |
The static classes DefaultMessageHeaders and KafkaMessageHeaders contain all default header names constants.
Customizing header names
The default header names can be overridden using the WithCustomHeaderName
configuration method.
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.WithConnectionToMessageBroker(options => options
.AddKafka())
.AddEndpointsConfigurator<MyEndpointsConfigurator>()
.WithCustomHeaderName(DefaultMessageHeaders.ChunkId, "x-ch-id")
.WithCustomHeaderName(DefaultMessageHeaders.ChunksCount, "x-ch-cnt"));
}
}