Kafka Schema Registry
Use Confluent Schema Registry to validate and evolve message schemas for Kafka topics. Silverback integrates via Silverback.Integration.Kafka.SchemaRegistry and supports Avro, JSON Schema, and Protobuf.
Prerequisites
- A running Confluent Schema Registry instance.
- The
Silverback.Integration.Kafka.SchemaRegistrypackage referenced by your application.
Configure Consumer Endpoints
Configure Schema Registry per consumer endpoint using one of:
DeserializeAvroDeserializeJsonUsingSchemaRegistryDeserializeProtobuf
Example (JSON Schema):
services.AddSilverback()
.WithConnectionToMessageBroker(options => options.AddKafka())
.AddKafkaClients(clients => clients
.WithBootstrapServers("PLAINTEXT://localhost:9092")
.AddConsumer("consumer1", consumer => consumer
.Consume<MyMessage>("endpoint1", endpoint => endpoint
.ConsumeFrom("my-topic")
.DeserializeJsonUsingSchemaRegistry(deserializer => deserializer
.ConnectToSchemaRegistry("http://localhost:4242")
.Configure(config =>
{
config.AutoRegisterSchemas = false;
})))));
Note
Producer setup is documented in Kafka Schema Registry.
Supported Deserializers
JSON Schema
services.AddSilverback()
.WithConnectionToMessageBroker(options => options.AddKafka())
.AddKafkaClients(clients => clients
.WithBootstrapServers("PLAINTEXT://localhost:9092")
.AddConsumer("consumer1", consumer => consumer
.Consume<MyMessage>("endpoint1", endpoint => endpoint
.ConsumeFrom("my-topic")
.DeserializeJsonUsingSchemaRegistry(deserializer => deserializer
.ConnectToSchemaRegistry("http://localhost:4242")))));
Avro
services.AddSilverback()
.WithConnectionToMessageBroker(options => options.AddKafka())
.AddKafkaClients(clients => clients
.WithBootstrapServers("PLAINTEXT://localhost:9092")
.AddConsumer("consumer1", consumer => consumer
.Consume<MyMessage>("endpoint1", endpoint => endpoint
.ConsumeFrom("my-topic")
.DeserializeAvro(deserializer => deserializer
.ConnectToSchemaRegistry("http://localhost:4242")))));
Tip
You can generate C# models from an Avro schema using AvroGen.
Protobuf
services.AddSilverback()
.WithConnectionToMessageBroker(options => options.AddKafka())
.AddKafkaClients(clients => clients
.WithBootstrapServers("PLAINTEXT://localhost:9092")
.AddConsumer("consumer1", consumer => consumer
.Consume<MyMessage>("endpoint1", endpoint => endpoint
.ConsumeFrom("my-topic")
.DeserializeProtobuf(deserializer => deserializer
.ConnectToSchemaRegistry("http://localhost:4242")))));
Schema Registration
In most setups, schemas are registered by producers and consumers only read existing schemas.
If you disable automatic schema registration (AutoRegisterSchemas = false), ensure schemas are registered before producing messages.
You can register schemas programmatically using IConfluentSchemaRegistryClientFactory:
class SchemaRegistrationService
{
private readonly IConfluentSchemaRegistryClientFactory _factory;
public SchemaRegistrationService(IConfluentSchemaRegistryClientFactory factory)
{
_factory = factory;
}
public async Task RegisterSchemasAsync(string formattedSchema, string topicName)
{
ISchemaRegistryClient client = _factory.GetClient(registry => registry
.WithUrl("http://localhost:4242"));
await client.RegisterSchemaAsync(
topicName + "-value",
new Schema(formattedSchema, SchemaType.Protobuf));
}
}