Kafka - Batch Processing
In this sample the consumed messages are processed in batch.
See also: Consuming Messages
Common
The message being exchanged is defined in a common project.
namespace Silverback.Samples.Kafka.Batch.Common;
public class SampleMessage
{
public int Number { get; set; }
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Common
Producer
The producer uses a hosted service to publish some messages in the background.
using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;
namespace Silverback.Samples.Kafka.Batch.Producer;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(options => options.AddKafka())
// Delegate the broker clients configuration to a separate class
.AddBrokerClientsConfigurator<BrokerClientsConfigurator>();
// Add the hosted service that produces the random sample messages
services.AddHostedService<ProducerBackgroundService>();
}
public void Configure()
{
}
}
using Silverback.Messaging.Configuration;
using Silverback.Samples.Kafka.Batch.Common;
namespace Silverback.Samples.Kafka.Batch.Producer;
public class BrokerClientsConfigurator : IBrokerClientsConfigurator
{
public void Configure(BrokerClientsConfigurationBuilder builder)
{
builder
.AddKafkaClients(
clients => clients
// The bootstrap server address is needed to connect
.WithBootstrapServers("PLAINTEXT://localhost:19092")
// Add a producer
.AddProducer(
producer => producer
// Produce the SampleMessage to the samples-batch topic
.Produce<SampleMessage>(
endpoint => endpoint
.ProduceTo("samples-batch"))));
}
}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Silverback.Messaging.Messages;
using Silverback.Messaging.Publishing;
using Silverback.Samples.Kafka.Batch.Common;
namespace Silverback.Samples.Kafka.Batch.Producer;
public class ProducerBackgroundService : BackgroundService
{
private readonly IPublisher _publisher;
private readonly ILogger<ProducerBackgroundService> _logger;
public ProducerBackgroundService(
IPublisher publisher,
ILogger<ProducerBackgroundService> logger)
{
_publisher = publisher;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
int number = 0;
while (!stoppingToken.IsCancellationRequested)
{
await ProduceMessagesAsync(++number + 100);
await Task.Delay(50, stoppingToken);
}
}
private async Task ProduceMessagesAsync(int number)
{
try
{
List<SampleMessage> messages = [];
for (int i = 0; i < 100; i++)
{
messages.Add(new SampleMessage { Number = number + i });
}
await _publisher.WrapAndPublishBatchAsync(
messages,
envelope => envelope.SetKafkaKey($"N{envelope.Message?.Number}"));
_logger.LogInformation("Produced {FirstNumber}-{LastNumber}", number, number + 99);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to produce {FirstNumber}-{LastNumber}", number, number + 99);
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Producer
Consumer
The consumer processes the messages in batch and outputs the batch sum to the standard output.
using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;
namespace Silverback.Samples.Kafka.Batch.Consumer;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(options => options.AddKafka())
// Delegate the broker clients configuration to a separate class
.AddBrokerClientsConfigurator<BrokerClientsConfigurator>()
// Register the subscribers
.AddSingletonSubscriber<SampleMessageBatchSubscriber>();
}
public void Configure()
{
}
}
using System;
using Silverback.Messaging.Configuration;
using Silverback.Samples.Kafka.Batch.Common;
namespace Silverback.Samples.Kafka.Batch.Consumer;
public class BrokerClientsConfigurator : IBrokerClientsConfigurator
{
public void Configure(BrokerClientsConfigurationBuilder builder)
{
builder
.AddKafkaClients(
clients => clients
// The bootstrap server address is needed to connect
.WithBootstrapServers("PLAINTEXT://localhost:19092")
// Add a consumer
.AddConsumer(
consumer => consumer
// Set the consumer group id
.WithGroupId("sample-consumer")
// AutoOffsetReset.Earliest means that the consumer
// must start consuming from the beginning of the topic,
// if no offset was stored for this consumer group
.AutoResetOffsetToEarliest()
// Consume the SampleMessage from the samples-batch topic
.Consume<SampleMessage>(
endpoint => endpoint
.ConsumeFrom("samples-batch")
// Configure processing in batches of 100 messages,
// with a max wait time of 5 seconds
.EnableBatchProcessing(100, TimeSpan.FromSeconds(5)))));
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Silverback.Samples.Kafka.Batch.Common;
namespace Silverback.Samples.Kafka.Batch.Consumer;
public class SampleMessageBatchSubscriber
{
private readonly ILogger<SampleMessageBatchSubscriber> _logger;
public SampleMessageBatchSubscriber(ILogger<SampleMessageBatchSubscriber> logger)
{
_logger = logger;
}
public async Task OnBatchReceivedAsync(IAsyncEnumerable<SampleMessage> batch)
{
int sum = 0;
int count = 0;
await foreach (SampleMessage message in batch)
{
sum += message.Number;
count++;
}
_logger.LogInformation(
"Received batch of {Count} message -> sum: {Sum}",
count,
sum);
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Consumer