Kafka - Avro
This sample implements a producer and consumer which take advantage of the schema registry and serializes the messages as Avro.
Common
The message being exchanged is defined in a common project.
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.7.7.5
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
using Silverback.Messaging.Messages;
namespace Silverback.Examples.Messages
{
using System;
using System.Collections.Generic;
using System.Text;
using global::Avro;
using global::Avro.Specific;
public partial class AvroMessage : ISpecificRecord
{
public static Schema _SCHEMA = Schema.Parse("{\"type\":\"record\",\"name\":\"AvroMessage\",\"namespace\":\"Silverback.Examples.Messages\"," +
"\"fields\":[{\"name\":\"number\",\"type\":\"string\"}]}");
private string _number;
public virtual Schema Schema
{
get
{
return AvroMessage._SCHEMA;
}
}
public string number
{
get
{
return this._number;
}
set
{
this._number = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.number;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
}
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0:
this.number = (System.String) fieldValue;
break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
}
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Common
Producer
The producer uses a hosted service to publish some messages in the background.
using Microsoft.Extensions.DependencyInjection;
namespace Silverback.Samples.Kafka.Avro.Producer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services
.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(
options => options
.AddKafka())
// Delegate the inbound/outbound endpoints configuration to a separate
// class.
.AddEndpointsConfigurator<EndpointsConfigurator>();
// Add the hosted service that produces the random sample messages
services.AddHostedService<ProducerBackgroundService>();
}
public void Configure()
{
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Producer
Consumer
The consumer processes the messages and outputs their value to the standard output.
using Microsoft.Extensions.DependencyInjection;
namespace Silverback.Samples.Kafka.Avro.Consumer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services
.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(
options => options
.AddKafka())
// Delegate the inbound/outbound endpoints configuration to a separate
// class.
.AddEndpointsConfigurator<EndpointsConfigurator>()
// Register the subscribers
.AddSingletonSubscriber<AvroMessageSubscriber>();
}
public void Configure()
{
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Consumer