Show / Hide Table of Contents

    Kafka - Batch Processing

    In this sample the consumed messages are processed in batch.

    See also: Consuming Messages

    Common

    The message being exchanged is defined in a common project.

    namespace Silverback.Samples.Kafka.Batch.Common;
    
    public class SampleMessage
    {
        public int Number { get; set; }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Common

    Producer

    The producer uses a hosted service to publish some messages in the background.

    • Startup
    • BrokerClientsConfigurator
    • Background Service
    using Microsoft.Extensions.DependencyInjection;
    using Silverback.Configuration;
    using Silverback.Messaging.Configuration;
    
    namespace Silverback.Samples.Kafka.Batch.Producer;
    
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            // Enable Silverback
            services.AddSilverback()
    
                // Use Apache Kafka as message broker
                .WithConnectionToMessageBroker(options => options.AddKafka())
    
                // Delegate the broker clients configuration to a separate class
                .AddBrokerClientsConfigurator<BrokerClientsConfigurator>();
    
            // Add the hosted service that produces the random sample messages
            services.AddHostedService<ProducerBackgroundService>();
        }
    
        public void Configure()
        {
        }
    }
    
    using Silverback.Messaging.Configuration;
    using Silverback.Samples.Kafka.Batch.Common;
    
    namespace Silverback.Samples.Kafka.Batch.Producer;
    
    public class BrokerClientsConfigurator : IBrokerClientsConfigurator
    {
        public void Configure(BrokerClientsConfigurationBuilder builder)
        {
            builder
                .AddKafkaClients(
                    clients => clients
    
                        // The bootstrap server address is needed to connect
                        .WithBootstrapServers("PLAINTEXT://localhost:19092")
    
                        // Add a producer
                        .AddProducer(
                            producer => producer
    
                                // Produce the SampleMessage to the samples-batch topic
                                .Produce<SampleMessage>(
                                    endpoint => endpoint
                                        .ProduceTo("samples-batch"))));
        }
    }
    
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using Silverback.Messaging.Messages;
    using Silverback.Messaging.Publishing;
    using Silverback.Samples.Kafka.Batch.Common;
    
    namespace Silverback.Samples.Kafka.Batch.Producer;
    
    public class ProducerBackgroundService : BackgroundService
    {
        private readonly IPublisher _publisher;
    
        private readonly ILogger<ProducerBackgroundService> _logger;
    
        public ProducerBackgroundService(
            IPublisher publisher,
            ILogger<ProducerBackgroundService> logger)
        {
            _publisher = publisher;
            _logger = logger;
        }
    
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            int number = 0;
    
            while (!stoppingToken.IsCancellationRequested)
            {
                await ProduceMessagesAsync(++number + 100);
    
                await Task.Delay(50, stoppingToken);
            }
        }
    
        private async Task ProduceMessagesAsync(int number)
        {
            try
            {
                List<SampleMessage> messages = [];
                for (int i = 0; i < 100; i++)
                {
                    messages.Add(new SampleMessage { Number = number + i });
                }
    
                await _publisher.WrapAndPublishBatchAsync(
                    messages,
                    envelope => envelope.SetKafkaKey($"N{envelope.Message?.Number}"));
    
                _logger.LogInformation("Produced {FirstNumber}-{LastNumber}", number, number + 99);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to produce {FirstNumber}-{LastNumber}", number, number + 99);
            }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Producer

    Consumer

    The consumer processes the messages in batch and outputs the batch sum to the standard output.

    • Startup
    • BrokerClientsConfigurator
    • Subscriber
    using Microsoft.Extensions.DependencyInjection;
    using Silverback.Configuration;
    using Silverback.Messaging.Configuration;
    
    namespace Silverback.Samples.Kafka.Batch.Consumer;
    
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            // Enable Silverback
            services.AddSilverback()
    
                // Use Apache Kafka as message broker
                .WithConnectionToMessageBroker(options => options.AddKafka())
    
                // Delegate the broker clients configuration to a separate class
                .AddBrokerClientsConfigurator<BrokerClientsConfigurator>()
    
                // Register the subscribers
                .AddSingletonSubscriber<SampleMessageBatchSubscriber>();
        }
    
        public void Configure()
        {
        }
    }
    
    using System;
    using Silverback.Messaging.Configuration;
    using Silverback.Samples.Kafka.Batch.Common;
    
    namespace Silverback.Samples.Kafka.Batch.Consumer;
    
    public class BrokerClientsConfigurator : IBrokerClientsConfigurator
    {
        public void Configure(BrokerClientsConfigurationBuilder builder)
        {
            builder
                .AddKafkaClients(
                    clients => clients
    
                        // The bootstrap server address is needed to connect
                        .WithBootstrapServers("PLAINTEXT://localhost:19092")
    
                        // Add a consumer
                        .AddConsumer(
                            consumer => consumer
    
                                // Set the consumer group id
                                .WithGroupId("sample-consumer")
    
                                // AutoOffsetReset.Earliest means that the consumer
                                // must start consuming from the beginning of the topic,
                                // if no offset was stored for this consumer group
                                .AutoResetOffsetToEarliest()
    
                                // Consume the SampleMessage from the samples-batch topic
                                .Consume<SampleMessage>(
                                    endpoint => endpoint
                                        .ConsumeFrom("samples-batch")
    
                                        // Configure processing in batches of 100 messages,
                                        // with a max wait time of 5 seconds
                                        .EnableBatchProcessing(100, TimeSpan.FromSeconds(5)))));
        }
    }
    
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Logging;
    using Silverback.Samples.Kafka.Batch.Common;
    
    namespace Silverback.Samples.Kafka.Batch.Consumer;
    
    public class SampleMessageBatchSubscriber
    {
        private readonly ILogger<SampleMessageBatchSubscriber> _logger;
    
        public SampleMessageBatchSubscriber(ILogger<SampleMessageBatchSubscriber> logger)
        {
            _logger = logger;
        }
    
        public async Task OnBatchReceivedAsync(IAsyncEnumerable<SampleMessage> batch)
        {
            int sum = 0;
            int count = 0;
    
            await foreach (SampleMessage message in batch)
            {
                sum += message.Number;
                count++;
            }
    
            _logger.LogInformation(
                "Received batch of {Count} message -> sum: {Sum}",
                count,
                sum);
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Consumer

    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2026 Sergio Aquilini