Serialization
Being flexible when serializing and deserializing the messages sent over the message broker is crucial for interoperability and these mechanisms are therefore completely customizable.
Default JSON serialization
The default JsonMessageSerializer internally uses System.Text.Json to serialize the messages as JSON and encode them in UTF-8.
A few headers are added to the message, in particular x-message-type
is used by the JsonMessageSerializer to know the message type when deserializing it in the consumer, thus allowing messages of different types being sent over the same topic or queue.
Warning
The JsonMessageSerializer will obviously try to map the message to a type with the exact assembly qualified name found in the x-message-type
header. It is therefore a good practice to share the message models among the services, maybe through a shared project or a nuget package.
This is the suggested serialization strategy when both producer and consumer are based on Silverback but may not be ideal for interoperability.
Have a look at the Message Headers section for an overview on the headers that are appended to the messages.
Fixed-type JSON for interoperability
If you are consuming a message coming from another system (not based on Silverback), chances are that the message type name is not being delivered as header.
In that case you can resort to the typed JsonMessageSerializer<TMessage>. This serializer works like the default one seen in the previous chapter, but the message type is hard-coded, instead of being resolved according to the message header.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<InventoryEvent>(endpoint => endpoint
.ProduceTo("inventory-events")
.SerializeAsJson(serializer => serializer
.UseFixedType<InventoryEvent>()))
// Specifying the message type will automatically
// switch to the JsonMessageSerializer<TMessage>
// and deserialize the specified type without
// needing the type header
.AddInbound<OrderEvent>(endpoint => endpoint
.ConsumeFrom("order-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
}))
// The following configurations is equivalent to the
// previous one, but more verbose
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.DeserializeJson(serializer => serializer
.UseFixedType<OrderEvent>())));
}
JSON using Newtonsoft.Json
Prior to release 3.0.0 the default JsonMessageSerializer was based on Newtonsoft.Json instead of System.Text.Json. For backward compatibility reasons and since System.Text.Json may not support all use cases covered by Newtonsoft.Json, the old serializers have been renamed to NewtonsoftJsonMessageSerializer and NewtonsoftJsonMessageSerializer<TMessage> and moved into the dedicated Silverback.Integration.Newtonsoft package.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<InventoryEvent>(endpoint => endpoint
.ProduceTo("inventory-events")
.SerializeAsJsonUsingNewtonsoft())
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.DeserializeJsonUsingNewtonsoft())
// Specifying the message type will automatically
// switch to the NewtonsoftJsonMessageSerializer<TMessage>
.AddInbound<DeliveryNotification>(endpoint => endpoint
.ConsumeFrom("delivery-notification-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.DeserializeJsonUsingNewtonsoft())
);
}
Apache Avro
The AvroMessageSerializer<TMessage> contained in the Silverback.Integration.Kafka.SchemaRegistry package can be used to connect with a schema registry and exchange messages in Apache Avro format.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<InventoryEvent>(endpoint => endpoint
.ProduceTo("inventory-events")
.SerializeAsAvro(serializer => serializer
.UseType<InventoryEvent>()
.Configure(
schemaRegistryConfig =>
{
schemaRegistryConfig.Url = "localhost:8081";
},
serializerConfig =>
{
serializerConfig.AutoRegisterSchemas = true;
})))
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.DeserializeAvro(serializer => serializer
.UseType<OrderEvent>()
.Configure(
schemaRegistryConfig =>
{
schemaRegistryConfig.Url = "localhost:8081";
},
serializerConfig =>
{
serializerConfig.AutoRegisterSchemas = true;
}))));
}
Note
The C# message models can be generated from an Avro schema using AvroGen.
Note
This serializer is built for Kafka but it could work with other brokers, as long as a schema registry is available.
Custom serializer
In some cases you may want to build your very own custom serializer implementing IMessageSerializer directly.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<InventoryEvent>(endpoint => endpoint
.ProduceTo("inventory-events")
.SerializeUsing(new MyCustomSerializer()))
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.DeserializeUsing(new MyCustomSerializer())));
}
Note
You may need to implement IKafkaMessageSerializer
if you want to have full control over the serialization of the Kafka key as well.
Binary Files
Please refer to the Binary Files page if you need to produce or consume raw binary files.