Show / Hide Table of Contents

    Serialization

    Being flexible when serializing and deserializing the messages sent over the message broker is crucial for interoperability and these mechanisms are therefore completely customizable.

    Default JSON serialization

    The default JsonMessageSerializer internally uses System.Text.Json to serialize the messages as JSON and encode them in UTF-8.

    A few headers are added to the message, in particular x-message-type is used by the JsonMessageSerializer to know the message type when deserializing it in the consumer, thus allowing messages of different types being sent over the same topic or queue.

    Warning

    The JsonMessageSerializer will obviously try to map the message to a type with the exact assembly qualified name found in the x-message-type header. It is therefore a good practice to share the message models among the services, maybe through a shared project or a nuget package.

    This is the suggested serialization strategy when both producer and consumer are based on Silverback but may not be ideal for interoperability.

    Have a look at the Message Headers section for an overview on the headers that are appended to the messages.

    Fixed-type JSON for interoperability

    If you are consuming a message coming from another system (not based on Silverback), chances are that the message type name is not being delivered as header.

    In that case you can resort to the typed JsonMessageSerializer<TMessage>. This serializer works like the default one seen in the previous chapter, but the message type is hard-coded, instead of being resolved according to the message header.

    • Fluent
    • Legacy
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddOutbound<InventoryEvent>(endpoint => endpoint
                        .ProduceTo("inventory-events")
                        .SerializeAsJson(serializer => serializer
                            .UseFixedType<InventoryEvent>()))
                    
                    // Specifying the message type will automatically
                    // switch to the JsonMessageSerializer<TMessage>
                    // and deserialize the specified type without
                    // needing the type header
                    .AddInbound<OrderEvent>(endpoint => endpoint
                        .ConsumeFrom("order-events")
                        .Configure(config => 
                            {
                                config.GroupId = "my-consumer";
                            }))
                    
                    // The following configurations is equivalent to the
                    // previous one, but more verbose
                    .AddInbound(endpoint => endpoint
                        .ConsumeFrom("order-events")
                        .Configure(config => 
                            {
                                config.GroupId = "my-consumer";
                            })
                        .DeserializeJson(serializer => serializer
                            .UseFixedType<OrderEvent>())));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddOutbound<InventoryEvent>(
                    new KafkaProducerEndpoint("inventory-events")
                    {
                        Configuration = new KafkaProducerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        },
                        Serializer = new JsonMessageSerializer<InventoryEvent>() 
                    })
                .AddInbound(
                    new KafkaConsumerEndpoint("order-events")
                    {
                        Configuration = new KafkaConsumerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092",
                            GroupId = "my-consumer"
                        },
                        Serializer = new JsonMessageSerializer<OrderEvent>()
                    });
    }
    

    JSON using Newtonsoft.Json

    Prior to release 3.0.0 the default JsonMessageSerializer was based on Newtonsoft.Json instead of System.Text.Json. For backward compatibility reasons and since System.Text.Json may not support all use cases covered by Newtonsoft.Json, the old serializers have been renamed to NewtonsoftJsonMessageSerializer and NewtonsoftJsonMessageSerializer<TMessage> and moved into the dedicated Silverback.Integration.Newtonsoft package.

    • Fluent
    • Legacy
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddOutbound<InventoryEvent>(endpoint => endpoint
                        .ProduceTo("inventory-events")
                        .SerializeAsJsonUsingNewtonsoft())
                    .AddInbound(endpoint => endpoint
                        .ConsumeFrom("order-events")
                        .Configure(config => 
                            {
                                config.GroupId = "my-consumer";
                            })
                        .DeserializeJsonUsingNewtonsoft())
    
                    // Specifying the message type will automatically
                    // switch to the NewtonsoftJsonMessageSerializer<TMessage>
                    .AddInbound<DeliveryNotification>(endpoint => endpoint
                        .ConsumeFrom("delivery-notification-events")
                        .Configure(config => 
                            {
                                config.GroupId = "my-consumer";
                            })
                        .DeserializeJsonUsingNewtonsoft())
                        );
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddOutbound<InventoryEvent>(
                    new KafkaProducerEndpoint("inventory-events")
                    {
                        Configuration = new KafkaProducerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        },
                        Serializer = new NewtonsoftJsonMessageSerializer() 
                    })
                .AddInbound(
                    new KafkaConsumerEndpoint("order-events")
                    {
                        Configuration = new KafkaConsumerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092",
                            GroupId = "my-consumer"
                        },
                        Serializer = new NewtonsoftJsonMessageSerializer() 
                    });
    }
    

    Apache Avro

    The AvroMessageSerializer<TMessage> contained in the Silverback.Integration.Kafka.SchemaRegistry package can be used to connect with a schema registry and exchange messages in Apache Avro format.

    • Fluent
    • Legacy
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddOutbound<InventoryEvent>(endpoint => endpoint
                        .ProduceTo("inventory-events")
                        .SerializeAsAvro(serializer => serializer
                            .UseType<InventoryEvent>()
                            .Configure(
                                schemaRegistryConfig =>
                                {
                                    schemaRegistryConfig.Url = "localhost:8081";
                                },
                                serializerConfig =>
                                {
                                    serializerConfig.AutoRegisterSchemas = true;
                                })))
                    .AddInbound(endpoint => endpoint
                        .ConsumeFrom("order-events")
                        .Configure(config => 
                            {
                                config.GroupId = "my-consumer";
                            })
                        .DeserializeAvro(serializer => serializer
                            .UseType<OrderEvent>()
                            .Configure(
                                schemaRegistryConfig =>
                                {
                                    schemaRegistryConfig.Url = "localhost:8081";
                                },
                                serializerConfig =>
                                {
                                    serializerConfig.AutoRegisterSchemas = true;
                                }))));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddOutbound<InventoryEvent>(
                    new KafkaProducerEndpoint("inventory-events")
                    {
                        Configuration = new KafkaProducerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        },
                        Serializer = new AvroMessageSerializer<InventoryEvent>
                        {
                            SchemaRegistryConfig = new SchemaRegistryConfig
                            {
                                Url = "localhost:8081"
                            },
                            AvroSerializerConfig = new AvroSerializerConfig
                            {
                                AutoRegisterSchemas = true
                            }
                        } 
                    })
                .AddInbound(
                    new KafkaConsumerEndpoint("order-events")
                    {
                        Configuration = new KafkaConsumerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092",
                            GroupId = "my-consumer"
                        },
                        Serializer = new AvroMessageSerializer<OrderEvent>
                        {
                            SchemaRegistryConfig = new SchemaRegistryConfig
                            {
                                Url = "localhost:8081"
                            },
                            AvroSerializerConfig = new AvroSerializerConfig
                            {
                                AutoRegisterSchemas = true
                            }
                        } 
                    });
    }
    
    Note

    The C# message models can be generated from an Avro schema using AvroGen.

    Note

    This serializer is built for Kafka but it could work with other brokers, as long as a schema registry is available.

    Custom serializer

    In some cases you may want to build your very own custom serializer implementing IMessageSerializer directly.

    • Fluent
    • Legacy
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddOutbound<InventoryEvent>(endpoint => endpoint
                        .ProduceTo("inventory-events")
                        .SerializeUsing(new MyCustomSerializer()))
                    .AddInbound(endpoint => endpoint
                        .ConsumeFrom("order-events")
                        .Configure(config => 
                            {
                                config.GroupId = "my-consumer";
                            })
                        .DeserializeUsing(new MyCustomSerializer())));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddOutbound<InventoryEvent>(
                    new KafkaProducerEndpoint("inventory-events")
                    {
                        Configuration = new KafkaProducerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        },
                        Serializer = new MyCustomSerialzer()
                    })
                .AddInbound(
                    new KafkaConsumerEndpoint("order-events")
                    {
                        Configuration = new KafkaConsumerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092",
                            GroupId = "my-consumer"
                        },
                        Serializer = new MyCustomSerialzer()
                    });
    }
    
    Note

    You may need to implement IKafkaMessageSerializer if you want to have full control over the serialization of the Kafka key as well.

    Binary Files

    Please refer to the Binary Files page if you need to produce or consume raw binary files.

    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini