Show / Hide Table of Contents

    Kafka Schema Registry

    Use Confluent Schema Registry to validate and evolve message schemas for Kafka topics. Silverback integrates via Silverback.Integration.Kafka.SchemaRegistry and supports Avro, JSON Schema, and Protobuf.

    Prerequisites

    • A running Confluent Schema Registry instance.
    • The Silverback.Integration.Kafka.SchemaRegistry package referenced by your application.

    Configure Consumer Endpoints

    Configure Schema Registry per consumer endpoint using one of:

    • DeserializeAvro
    • DeserializeJsonUsingSchemaRegistry
    • DeserializeProtobuf

    Example (JSON Schema):

    services.AddSilverback()
        .WithConnectionToMessageBroker(options => options.AddKafka())
        .AddKafkaClients(clients => clients
            .WithBootstrapServers("PLAINTEXT://localhost:9092")
            .AddConsumer("consumer1", consumer => consumer
                .Consume<MyMessage>("endpoint1", endpoint => endpoint
                    .ConsumeFrom("my-topic")
                    .DeserializeJsonUsingSchemaRegistry(deserializer => deserializer
                        .ConnectToSchemaRegistry("http://localhost:4242")
                        .Configure(config =>
                        {
                            config.AutoRegisterSchemas = false;
                        })))));
    
    Note

    Producer setup is documented in Kafka Schema Registry.

    Supported Deserializers

    JSON Schema

    services.AddSilverback()
        .WithConnectionToMessageBroker(options => options.AddKafka())
        .AddKafkaClients(clients => clients
            .WithBootstrapServers("PLAINTEXT://localhost:9092")
            .AddConsumer("consumer1", consumer => consumer
                .Consume<MyMessage>("endpoint1", endpoint => endpoint
                    .ConsumeFrom("my-topic")
                    .DeserializeJsonUsingSchemaRegistry(deserializer => deserializer
                        .ConnectToSchemaRegistry("http://localhost:4242")))));
    

    Avro

    services.AddSilverback()
        .WithConnectionToMessageBroker(options => options.AddKafka())
        .AddKafkaClients(clients => clients
            .WithBootstrapServers("PLAINTEXT://localhost:9092")
            .AddConsumer("consumer1", consumer => consumer
                .Consume<MyMessage>("endpoint1", endpoint => endpoint
                    .ConsumeFrom("my-topic")
                    .DeserializeAvro(deserializer => deserializer
                        .ConnectToSchemaRegistry("http://localhost:4242")))));
    
    Tip

    You can generate C# models from an Avro schema using AvroGen.

    Protobuf

    services.AddSilverback()
        .WithConnectionToMessageBroker(options => options.AddKafka())
        .AddKafkaClients(clients => clients
            .WithBootstrapServers("PLAINTEXT://localhost:9092")
            .AddConsumer("consumer1", consumer => consumer
                .Consume<MyMessage>("endpoint1", endpoint => endpoint
                    .ConsumeFrom("my-topic")
                    .DeserializeProtobuf(deserializer => deserializer
                        .ConnectToSchemaRegistry("http://localhost:4242")))));
    

    Schema Registration

    In most setups, schemas are registered by producers and consumers only read existing schemas.

    If you disable automatic schema registration (AutoRegisterSchemas = false), ensure schemas are registered before producing messages.

    You can register schemas programmatically using IConfluentSchemaRegistryClientFactory:

    class SchemaRegistrationService
    {
        private readonly IConfluentSchemaRegistryClientFactory _factory;
    
        public SchemaRegistrationService(IConfluentSchemaRegistryClientFactory factory)
        {
            _factory = factory;
        }
    
        public async Task RegisterSchemasAsync(string formattedSchema, string topicName)
        {
            ISchemaRegistryClient client = _factory.GetClient(registry => registry
                .WithUrl("http://localhost:4242"));
    
            await client.RegisterSchemaAsync(
                topicName + "-value",
                new Schema(formattedSchema, SchemaType.Protobuf));
        }
    }
    

    Additional Resources

    • API Reference
    • Deserializing the Consumed Messages
    • Kafka Schema Registry
    • Confluent Schema Registry
    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2026 Sergio Aquilini