Show / Hide Table of Contents

    Kafka - Batch Processing

    In this sample the consumed messages are processed in batch.

    See also: Inbound Endpoint - Batch processing

    Common

    The message being exchanged is defined in a common project.

    namespace Silverback.Samples.Kafka.Batch.Common
    {
        public class SampleMessage
        {
            public int Number { get; set; }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Common

    Producer

    The producer uses a hosted service to publish some messages in the background.

    • Startup
    • EndpointsConfigurator
    • Background Service
    using Microsoft.Extensions.DependencyInjection;
    
    namespace Silverback.Samples.Kafka.Batch.Producer
    {
        public class Startup
        {
            public void ConfigureServices(IServiceCollection services)
            {
                // Enable Silverback
                services
                    .AddSilverback()
    
                    // Use Apache Kafka as message broker
                    .WithConnectionToMessageBroker(
                        options => options
                            .AddKafka())
    
                    // Delegate the inbound/outbound endpoints configuration to a separate
                    // class.
                    .AddEndpointsConfigurator<EndpointsConfigurator>();
    
                // Add the hosted service that produces the random sample messages
                services.AddHostedService<ProducerBackgroundService>();
            }
    
            public void Configure()
            {
            }
        }
    }
    
    using Silverback.Messaging.Configuration;
    using Silverback.Samples.Kafka.Batch.Common;
    
    namespace Silverback.Samples.Kafka.Batch.Producer
    {
        public class EndpointsConfigurator : IEndpointsConfigurator
        {
            public void Configure(IEndpointsConfigurationBuilder builder)
            {
                builder
                    .AddKafkaEndpoints(
                        endpoints => endpoints
    
                            // Configure the properties needed by all consumers/producers
                            .Configure(
                                config =>
                                {
                                    // The bootstrap server address is needed to connect
                                    config.BootstrapServers =
                                        "PLAINTEXT://localhost:9092";
                                })
    
                            // Produce the SampleMessage to the samples-batch topic
                            .AddOutbound<SampleMessage>(
                                endpoint => endpoint
                                    .ProduceTo("samples-batch")));
            }
        }
    }
    
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using Silverback.Messaging.Broker;
    using Silverback.Messaging.Publishing;
    using Silverback.Samples.Kafka.Batch.Common;
    
    namespace Silverback.Samples.Kafka.Batch.Producer
    {
        public class ProducerBackgroundService : BackgroundService
        {
            private readonly IServiceScopeFactory _serviceScopeFactory;
    
            private readonly ILogger<ProducerBackgroundService> _logger;
    
            public ProducerBackgroundService(
                IServiceScopeFactory serviceScopeFactory,
                ILogger<ProducerBackgroundService> logger)
            {
                _serviceScopeFactory = serviceScopeFactory;
                _logger = logger;
            }
    
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                // Create a service scope and resolve the IPublisher
                // (the IPublisher cannot be resolved from the root scope and cannot
                // therefore be directly injected into the BackgroundService)
                using var scope = _serviceScopeFactory.CreateScope();
                var publisher = scope.ServiceProvider.GetRequiredService<IPublisher>();
                var broker = scope.ServiceProvider.GetRequiredService<IBroker>();
    
                int number = 0;
    
                while (!stoppingToken.IsCancellationRequested)
                {
                    // Check whether the connection has been established, since the
                    // BackgroundService will start immediately, before the application
                    // is completely bootstrapped
                    if (!broker.IsConnected)
                    {
                        await Task.Delay(100, stoppingToken);
                        continue;
                    }
    
                    await ProduceMessageAsync(publisher, ++number);
    
                    await Task.Delay(50, stoppingToken);
                }
            }
    
            private async Task ProduceMessageAsync(IPublisher publisher, int number)
            {
                try
                {
                    await publisher.PublishAsync(
                        new SampleMessage
                        {
                            Number = number
                        });
    
                    _logger.LogInformation("Produced {Number}", number);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to produce {Number}", number);
                }
            }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Producer

    Consumer

    The consumer processes the messages in batch and outputs the batch sum to the standard output.

    • Startup
    • EndpointsConfigurator
    • Subscriber
    using Microsoft.Extensions.DependencyInjection;
    
    namespace Silverback.Samples.Kafka.Batch.Consumer
    {
        public class Startup
        {
            public void ConfigureServices(IServiceCollection services)
            {
                // Enable Silverback
                services
                    .AddSilverback()
    
                    // Use Apache Kafka as message broker
                    .WithConnectionToMessageBroker(
                        options => options
                            .AddKafka())
    
                    // Delegate the inbound/outbound endpoints configuration to a separate
                    // class.
                    .AddEndpointsConfigurator<EndpointsConfigurator>()
    
                    // Register the subscribers
                    .AddSingletonSubscriber<SampleMessageBatchSubscriber>();
            }
    
            public void Configure()
            {
            }
        }
    }
    
    using System;
    using Confluent.Kafka;
    using Silverback.Messaging.Configuration;
    
    namespace Silverback.Samples.Kafka.Batch.Consumer
    {
        public class EndpointsConfigurator : IEndpointsConfigurator
        {
            public void Configure(IEndpointsConfigurationBuilder builder)
            {
                builder
                    .AddKafkaEndpoints(
                        endpoints => endpoints
    
                            // Configure the properties needed by all consumers/producers
                            .Configure(
                                config =>
                                {
                                    // The bootstrap server address is needed to connect
                                    config.BootstrapServers =
                                        "PLAINTEXT://localhost:9092";
                                })
    
                            // Consume the samples-batch topic
                            .AddInbound(
                                endpoint => endpoint
                                    .ConsumeFrom("samples-batch")
                                    .Configure(
                                        config =>
                                        {
                                            // The consumer needs at least the bootstrap
                                            // server address and a group id to be able
                                            // to connect
                                            config.GroupId = "sample-consumer";
    
                                            // AutoOffsetReset.Earliest means that the
                                            // consumer must start consuming from the
                                            // beginning of the topic, if no offset was
                                            // stored for this consumer group
                                            config.AutoOffsetReset =
                                                AutoOffsetReset.Earliest;
                                        })
    
                                    // Configure processing in batches of 100 messages,
                                    // with a max wait time of 5 seconds
                                    .EnableBatchProcessing(100, TimeSpan.FromSeconds(5))));
            }
        }
    }
    
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Logging;
    using Silverback.Samples.Kafka.Batch.Common;
    
    namespace Silverback.Samples.Kafka.Batch.Consumer
    {
        public class SampleMessageBatchSubscriber
        {
            private readonly ILogger<SampleMessageBatchSubscriber> _logger;
    
            public SampleMessageBatchSubscriber(
                ILogger<SampleMessageBatchSubscriber> logger)
            {
                _logger = logger;
            }
    
            public async Task OnBatchReceivedAsync(IAsyncEnumerable<SampleMessage> batch)
            {
                int sum = 0;
                int count = 0;
    
                await foreach (var message in batch)
                {
                    sum += message.Number;
                    count++;
                }
    
                _logger.LogInformation(
                    "Received batch of {Count} message -> sum: {Sum}",
                    count,
                    sum);
            }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Consumer

    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini