Table of Contents

Kafka - Avro

This sample implements a producer and consumer which take advantage of the schema registry and serializes the messages as Avro.

Common

The message being exchanged is defined in a common project.

// ------------------------------------------------------------------------------
// <auto-generated>
//    Generated by avrogen, version 1.7.7.5
//    Changes to this file may cause incorrect behavior and will be lost if code
//    is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------

using Silverback.Messaging.Messages;

namespace Silverback.Examples.Messages
{
    using System;
    using System.Collections.Generic;
    using System.Text;
    using global::Avro;
    using global::Avro.Specific;
    
    public partial class AvroMessage : ISpecificRecord
    {
        public static Schema _SCHEMA = Schema.Parse("{\"type\":\"record\",\"name\":\"AvroMessage\",\"namespace\":\"Silverback.Examples.Messages\"," +
                                                    "\"fields\":[{\"name\":\"number\",\"type\":\"string\"}]}");
        private string _number;
        public virtual Schema Schema
        {
            get
            {
                return AvroMessage._SCHEMA;
            }
        }
        public string number
        {
            get
            {
                return this._number;
            }
            set
            {
                this._number = value;
            }
        }
        public virtual object Get(int fieldPos)
        {
            switch (fieldPos)
            {
                case 0: return this.number;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
            }
        }
        public virtual void Put(int fieldPos, object fieldValue)
        {
            switch (fieldPos)
            {
                case 0:
                    this.number = (System.String) fieldValue;
                    break;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
            }
        }
    }
}

Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Common

Producer

The producer uses a hosted service to publish some messages in the background.

using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;

namespace Silverback.Samples.Kafka.Avro.Producer;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        // Enable Silverback
        services.AddSilverback()

            // Use Apache Kafka as message broker and the Confluent schema registry
            .WithConnectionToMessageBroker(options => options
                .AddKafka()
                .AddConfluentSchemaRegistry())

            // Delegate the broker clients configuration to a separate class
            .AddBrokerClientsConfigurator<BrokerClientsConfigurator>();

        // Add the hosted service that produces the random sample messages
        services.AddHostedService<ProducerBackgroundService>();
    }

    public void Configure()
    {
    }
}

Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Producer

Consumer

The consumer processes the messages and outputs their value to the standard output.

using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;

namespace Silverback.Samples.Kafka.Avro.Consumer;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        // Enable Silverback
        services.AddSilverback()

            // Use Apache Kafka as message broker and the Confluent schema registry
            .WithConnectionToMessageBroker(options => options
                .AddKafka()
                .AddConfluentSchemaRegistry())

            // Delegate the broker clients configuration to a separate class
            .AddBrokerClientsConfigurator<BrokerClientsConfigurator>()

            // Register the subscribers
            .AddSingletonSubscriber<AvroMessageSubscriber>();
    }

    public void Configure()
    {
    }
}

Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Consumer