Table of Contents

Deserializing the Consumed Messages

The message payload received from the broker needs to be deserialized from a byte array into the target message type before it can be processed by the subscribers. By default, Silverback uses System.Text.Json to deserialize the message payload. This can be customized by tweaking the deserializer settings, using another one of the built-in deserializers or implementing a custom deserializer.

JSON

System.Text.Json

The default deserializer is System.Text.Json. It is used to deserialize the message payload from a JSON string. The deserializer settings can be customized using the Configure method.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<MyMessage>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic")
                .DeserializeJson(deserializer => deserializer
                    .Configure(
                        options =>
                        {
                            options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
                        })))));

Newtonsoft.Json

Silverback also provides a deserializer based on Newtonsoft.Json. It is found in the Silverback.Newtonsoft package and can be enabled by calling the DeserializeJsonUsingNewtonsoft method.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<MyMessage>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic")
                .DeserializeJsonUsingNewtonsoft())));

Optionally, the deserializer settings can be customized using the Configure method.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<MyMessage>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic")
                .DeserializeJsonUsingNewtonsoft(deserializer => deserializer
                    .Configure(
                        settings =>
                        {
                            settings.ContractResolver = new CamelCasePropertyNamesContractResolver();
                        }))));

Polymorphic Deserialization

Using the WithOptionalMessageTypeHeader or WithMandatoryMessageTypeHeader method, the deserializer can be configured to use the x-message-type header to determine the target message type. This allows for polymorphic deserialization, where different message types can be deserialized from the same topic.

For security reasons, this feature is disabled by default.

To handle polymorphic nested properties, you have instead to rely on the serializer built-in functionality (e.g., TypeNameHandling in Newtonsoft.Json).

Schema Registry

To integrate with Confluent Schema Registry, you can use the dedicated JSON deserializer designed for schema registry support found in the Silverback.Kafka.SchemaRegistry package. This uses the Confluent deserializer under the hood, which in turn uses Newtonsoft.Json.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<MyMessage>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic")
                .DeserializeJsonUsingSchemaRegistry(deserializer => deserializer
                    .ConnectToSchemaRegistry("http://localhost:4242")
                    .Configure(
                        config =>
                        {
                            config.AutoRegisterSchemas = false;
                        }))));
Note

To learn more about the schema registry support, refer to the Kafka Schema Registry guide.

Raw

The raw deserializer is not a traditional deserializer. Instead, it leaves the message payload as-is, returning the original byte array or stream. This is particularly useful when the payload is already a byte array or a stream and does not require transformation.

When consuming raw messages the consumer will provide one of the following types:

The generic type parameter in RawMessage<T> is used solely for message routing purposes, ensuring that the message is delivered to the correct endpoint, just as a custom derived class would.

When the consumer endpoint is configured to route a RawMessage or a derived class, the raw deserializer is used by default.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<RawMessage<MyMessage>>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic"))));

Binary

The binary deserializer is similar to the raw deserializer but supports the x-message-type header, meaning that these messages can be mixed with other (JSON-deserialized) messages on the same topic and Silverback will be able to discriminate them.

When consuming binary messages use:

When the consumer endpoint is configured to route a type implementing IBinaryMessage, the binary deserializer is used by default.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<MyBinaryMessage>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic"))));

String

The string deserializer decodes a byte array into a raw string.

To consume a string message, use one of the following types:

The generic type parameter in StringMessage<T> is used solely for message routing purposes, ensuring that the message is delivered to the correct endpoint, just as a custom derived class would.

When the consumer endpoint is configured to route a StringMessage or a derived class, the string deserializer is used by default.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<StringMessage<MyMessage>>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic"))));

The default encoding is UTF-8, but it can be customized using the WithEncoding method.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<StringMessage<MyMessage>>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic")
                .ConsumeStrings(deserializer => deserializer
                    .WithEncoding(MessageEncoding.Unicode)))));

Avro

To deserialize Avro formatted messages you can use the Avro deserializer found in the Silverback.Kafka.SchemaRegistry package. This deserializer is based on the Confluent Avro deserializer and requires a schema registry.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<MyMessage>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic")
                .DeserializeAvro(deserializer => deserializer
                    .ConnectToSchemaRegistry("http://localhost:4242")))));
Note

The C# message models can be generated from an Avro schema using AvroGen.

Note

To learn more about the schema registry support, refer to the Kafka Schema Registry guide.

Protobuf

To deserialize Protobuf messages you can use the Protobuf deserializer found in the Silverback.Kafka.SchemaRegistry package. This deserializer is based on the Confluent Protobuf deserializer and requires a schema registry.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<MyMessage>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic")
                .DeserializeProtobuf(deserializer => deserializer
                    .ConnectToSchemaRegistry("http://localhost:4242")))));
Note

To learn more about the schema registry support, refer to the Kafka Schema Registry guide.

Custom Deserializer

To implement a custom deserializer, create a class implementing the IMessageDeserializer interface.

public class MyCustomDeserializer : IMessageDeserializer
{
    public ValueTask<DeserializedMessage> DeserializeAsync(Stream? messageStream, MessageHeaderCollection headers, ConsumerEndpoint endpoint)
    {
        // Implement the deserialization logic here
    }
}

The custom deserializer can be plugged in calling the DeserializeUsing method.

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer("consumer1", consumer => consumer
            .Consume<MyMessage>("endpoint1", endpoint => endpoint
                .ConsumeFrom("my-topic")
                .DeserializeUsing(new MyCustomDeserializer()))));

Additional Resources