Kafka - Avro
This sample implements a producer and consumer which take advantage of the schema registry and serializes the messages as Avro.
Common
The message being exchanged is defined in a common project.
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.7.7.5
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
using Silverback.Messaging.Messages;
namespace Silverback.Examples.Messages
{
using System;
using System.Collections.Generic;
using System.Text;
using global::Avro;
using global::Avro.Specific;
public partial class AvroMessage : ISpecificRecord
{
public static Schema _SCHEMA = Schema.Parse("{\"type\":\"record\",\"name\":\"AvroMessage\",\"namespace\":\"Silverback.Examples.Messages\"," +
"\"fields\":[{\"name\":\"number\",\"type\":\"string\"}]}");
private string _number;
public virtual Schema Schema
{
get
{
return AvroMessage._SCHEMA;
}
}
public string number
{
get
{
return this._number;
}
set
{
this._number = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.number;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
}
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0:
this.number = (System.String) fieldValue;
break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
}
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Common
Producer
The producer uses a hosted service to publish some messages in the background.
using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;
namespace Silverback.Samples.Kafka.Avro.Producer;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services.AddSilverback()
// Use Apache Kafka as message broker and the Confluent schema registry
.WithConnectionToMessageBroker(options => options
.AddKafka()
.AddConfluentSchemaRegistry())
// Delegate the broker clients configuration to a separate class
.AddBrokerClientsConfigurator<BrokerClientsConfigurator>();
// Add the hosted service that produces the random sample messages
services.AddHostedService<ProducerBackgroundService>();
}
public void Configure()
{
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Producer
Consumer
The consumer processes the messages and outputs their value to the standard output.
using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;
namespace Silverback.Samples.Kafka.Avro.Consumer;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services.AddSilverback()
// Use Apache Kafka as message broker and the Confluent schema registry
.WithConnectionToMessageBroker(options => options
.AddKafka()
.AddConfluentSchemaRegistry())
// Delegate the broker clients configuration to a separate class
.AddBrokerClientsConfigurator<BrokerClientsConfigurator>()
// Register the subscribers
.AddSingletonSubscriber<AvroMessageSubscriber>();
}
public void Configure()
{
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Consumer