Table of Contents

Kafka Schema Registry

Use Confluent Schema Registry to validate and evolve message schemas for Kafka topics. Silverback integrates with Schema Registry via Silverback.Integration.Kafka.SchemaRegistry and provides endpoint-level serialization for Avro, JSON Schema, and Protobuf.

Prerequisites

  • A running Confluent Schema Registry instance.
  • The Silverback.Integration.Kafka.SchemaRegistry package referenced by your application.

Configure Producer Endpoints

Configure Schema Registry per endpoint using one of:

  • SerializeAsAvro
  • SerializeAsJsonUsingSchemaRegistry
  • SerializeAsProtobuf

Example (JSON Schema):

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddProducer("producer1", producer => producer
            .Produce<MyMessage>("endpoint1", endpoint => endpoint
                .ProduceTo("my-topic")
                .SerializeAsJsonUsingSchemaRegistry(json => json
                    .ConnectToSchemaRegistry("http://localhost:4242")
                    .Configure(config =>
                    {
                        config.AutoRegisterSchemas = false;
                    })))));
Note

Consumer setup is documented in Kafka Schema Registry.

Supported Serializers

JSON Schema

Use SerializeAsJsonUsingSchemaRegistry:

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddProducer("producer1", producer => producer
            .Produce<MyMessage>("endpoint1", endpoint => endpoint
                .ProduceTo("my-topic")
                .SerializeAsJsonUsingSchemaRegistry(serializer => serializer
                    .ConnectToSchemaRegistry("http://localhost:4242")))));

Avro

Use SerializeAsAvro:

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddProducer("producer1", producer => producer
            .Produce<MyMessage>("endpoint1", endpoint => endpoint
                .ProduceTo("my-topic")
                .SerializeAsAvro(serializer => serializer
                    .ConnectToSchemaRegistry("http://localhost:4242")))));
Tip

You can generate C# models from an Avro schema using AvroGen.

Protobuf

Use SerializeAsProtobuf:

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddProducer("producer1", producer => producer
            .Produce<MyMessage>("endpoint1", endpoint => endpoint
                .ProduceTo("my-topic")
                .SerializeAsProtobuf(serializer => serializer
                    .ConnectToSchemaRegistry("http://localhost:4242")))));

Schema Registration

In most setups, schemas are registered by producers.

If you disable automatic schema registration (AutoRegisterSchemas = false), ensure schemas are registered before producing messages.

You can register schemas programmatically using IConfluentSchemaRegistryClientFactory:

class SchemaRegistrationService
{
    private readonly IConfluentSchemaRegistryClientFactory _factory;

    public SchemaRegistrationService(IConfluentSchemaRegistryClientFactory factory)
    {
        _factory = factory;
    }

    public async Task RegisterSchemasAsync(string formattedSchema, string topicName)
    {
        ISchemaRegistryClient client = _factory.GetClient(registry => registry
            .WithUrl("http://localhost:4242"));

        await client.RegisterSchemaAsync(
            topicName + "-value",
            new Schema(formattedSchema, SchemaType.Protobuf));
    }
}

Additional Resources