Show / Hide Table of Contents

    Kafka - Avro

    This sample implements a producer and consumer which take advantage of the schema registry and serializes the messages as Avro.

    Common

    The message being exchanged is defined in a common project.

    // ------------------------------------------------------------------------------
    // <auto-generated>
    //    Generated by avrogen, version 1.7.7.5
    //    Changes to this file may cause incorrect behavior and will be lost if code
    //    is regenerated
    // </auto-generated>
    // ------------------------------------------------------------------------------
    
    using Silverback.Messaging.Messages;
    
    namespace Silverback.Examples.Messages
    {
        using System;
        using System.Collections.Generic;
        using System.Text;
        using global::Avro;
        using global::Avro.Specific;
        
        public partial class AvroMessage : ISpecificRecord
        {
            public static Schema _SCHEMA = Schema.Parse("{\"type\":\"record\",\"name\":\"AvroMessage\",\"namespace\":\"Silverback.Examples.Messages\"," +
                                                        "\"fields\":[{\"name\":\"number\",\"type\":\"string\"}]}");
            private string _number;
            public virtual Schema Schema
            {
                get
                {
                    return AvroMessage._SCHEMA;
                }
            }
            public string number
            {
                get
                {
                    return this._number;
                }
                set
                {
                    this._number = value;
                }
            }
            public virtual object Get(int fieldPos)
            {
                switch (fieldPos)
                {
                    case 0: return this.number;
                    default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
                }
            }
            public virtual void Put(int fieldPos, object fieldValue)
            {
                switch (fieldPos)
                {
                    case 0:
                        this.number = (System.String) fieldValue;
                        break;
                    default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
                }
            }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Common

    Producer

    The producer uses a hosted service to publish some messages in the background.

    • Startup
    • EndpointsConfigurator
    • Background Service
    using Microsoft.Extensions.DependencyInjection;
    
    namespace Silverback.Samples.Kafka.Avro.Producer
    {
        public class Startup
        {
            public void ConfigureServices(IServiceCollection services)
            {
                // Enable Silverback
                services
                    .AddSilverback()
    
                    // Use Apache Kafka as message broker
                    .WithConnectionToMessageBroker(
                        options => options
                            .AddKafka())
    
                    // Delegate the inbound/outbound endpoints configuration to a separate
                    // class.
                    .AddEndpointsConfigurator<EndpointsConfigurator>();
    
                // Add the hosted service that produces the random sample messages
                services.AddHostedService<ProducerBackgroundService>();
            }
    
            public void Configure()
            {
            }
        }
    }
    
    using Silverback.Examples.Messages;
    using Silverback.Messaging.Configuration;
    
    namespace Silverback.Samples.Kafka.Avro.Producer
    {
        public class EndpointsConfigurator : IEndpointsConfigurator
        {
            public void Configure(IEndpointsConfigurationBuilder builder)
            {
                builder
                    .AddKafkaEndpoints(
                        endpoints => endpoints
    
                            // Configure the properties needed by all consumers/producers
                            .Configure(
                                config =>
                                {
                                    // The bootstrap server address is needed to connect
                                    config.BootstrapServers =
                                        "PLAINTEXT://localhost:9092";
                                })
    
                            // Produce the AvroMessage to the samples-avro topic
                            .AddOutbound<AvroMessage>(
                                endpoint => endpoint
                                    .ProduceTo("samples-avro")
    
                                    // Configure Avro serialization
                                    .SerializeAsAvro(
                                        avro => avro.Configure(
                                            schemaRegistry =>
                                            {
                                                schemaRegistry.Url = "localhost:8081";
                                            },
                                            serializer =>
                                            {
                                                serializer.AutoRegisterSchemas = true;
                                            }))));
            }
        }
    }
    
    using System;
    using System.Globalization;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using Silverback.Examples.Messages;
    using Silverback.Messaging.Broker;
    using Silverback.Messaging.Publishing;
    
    namespace Silverback.Samples.Kafka.Avro.Producer
    {
        public class ProducerBackgroundService : BackgroundService
        {
            private readonly IServiceScopeFactory _serviceScopeFactory;
    
            private readonly ILogger<ProducerBackgroundService> _logger;
    
            public ProducerBackgroundService(
                IServiceScopeFactory serviceScopeFactory,
                ILogger<ProducerBackgroundService> logger)
            {
                _serviceScopeFactory = serviceScopeFactory;
                _logger = logger;
            }
    
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                // Create a service scope and resolve the IPublisher
                // (the IPublisher cannot be resolved from the root scope and cannot
                // therefore be directly injected into the BackgroundService)
                using var scope = _serviceScopeFactory.CreateScope();
                var publisher = scope.ServiceProvider.GetRequiredService<IPublisher>();
                var broker = scope.ServiceProvider.GetRequiredService<IBroker>();
    
                int number = 0;
    
                while (!stoppingToken.IsCancellationRequested)
                {
                    // Check whether the connection has been established, since the
                    // BackgroundService will start immediately, before the application
                    // is completely bootstrapped
                    if (!broker.IsConnected)
                    {
                        await Task.Delay(100, stoppingToken);
                        continue;
                    }
    
                    await ProduceMessageAsync(publisher, ++number);
    
                    await Task.Delay(100, stoppingToken);
                }
            }
    
            private async Task ProduceMessageAsync(IPublisher publisher, int number)
            {
                try
                {
                    await publisher.PublishAsync(
                        new AvroMessage
                        {
                            number = number.ToString(CultureInfo.InvariantCulture)
                        });
    
                    _logger.LogInformation("Produced {Number}", number);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to produce {Number}", number);
                }
            }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Producer

    Consumer

    The consumer processes the messages and outputs their value to the standard output.

    • Startup
    • EndpointsConfigurator
    • Subscriber
    using Microsoft.Extensions.DependencyInjection;
    
    namespace Silverback.Samples.Kafka.Avro.Consumer
    {
        public class Startup
        {
            public void ConfigureServices(IServiceCollection services)
            {
                // Enable Silverback
                services
                    .AddSilverback()
    
                    // Use Apache Kafka as message broker
                    .WithConnectionToMessageBroker(
                        options => options
                            .AddKafka())
    
                    // Delegate the inbound/outbound endpoints configuration to a separate
                    // class.
                    .AddEndpointsConfigurator<EndpointsConfigurator>()
    
                    // Register the subscribers
                    .AddSingletonSubscriber<AvroMessageSubscriber>();
            }
    
            public void Configure()
            {
            }
        }
    }
    
    using Confluent.Kafka;
    using Silverback.Examples.Messages;
    using Silverback.Messaging.Configuration;
    
    namespace Silverback.Samples.Kafka.Avro.Consumer
    {
        public class EndpointsConfigurator : IEndpointsConfigurator
        {
            public void Configure(IEndpointsConfigurationBuilder builder)
            {
                builder
                    .AddKafkaEndpoints(
                        endpoints => endpoints
    
                            // Configure the properties needed by all consumers/producers
                            .Configure(
                                config =>
                                {
                                    // The bootstrap server address is needed to connect
                                    config.BootstrapServers =
                                        "PLAINTEXT://localhost:9092";
                                })
    
                            // Consume the samples-avro topic
                            .AddInbound<AvroMessage>(
                                endpoint => endpoint
                                    .ConsumeFrom("samples-avro")
    
                                    // Configure Avro deserialization
                                    .DeserializeAvro(
                                        avro => avro.Configure(
                                            schemaRegistry =>
                                            {
                                                schemaRegistry.Url = "localhost:8081";
                                            }))
                                    .Configure(
                                        config =>
                                        {
                                            // The consumer needs at least the bootstrap
                                            // server address and a group id to be able
                                            // to connect
                                            config.GroupId = "sample-consumer";
    
                                            // AutoOffsetReset.Earliest means that the
                                            // consumer must start consuming from the
                                            // beginning of the topic, if no offset was
                                            // stored for this consumer group
                                            config.AutoOffsetReset =
                                                AutoOffsetReset.Earliest;
                                        })));
            }
        }
    }
    
    using Microsoft.Extensions.Logging;
    using Silverback.Examples.Messages;
    
    namespace Silverback.Samples.Kafka.Avro.Consumer
    {
        public class AvroMessageSubscriber
        {
            private readonly ILogger<AvroMessageSubscriber> _logger;
    
            public AvroMessageSubscriber(ILogger<AvroMessageSubscriber> logger)
            {
                _logger = logger;
            }
    
            public void OnMessageReceived(AvroMessage message) =>
                _logger.LogInformation("Received {MessageNumber}", message.number);
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Avro.Consumer

    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini