Kafka - Batch Processing
In this sample the consumed messages are processed in batch.
See also: Inbound Endpoint - Batch processing
Common
The message being exchanged is defined in a common project.
namespace Silverback.Samples.Kafka.Batch.Common
{
public class SampleMessage
{
public int Number { get; set; }
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Common
Producer
The producer uses a hosted service to publish some messages in the background.
using Microsoft.Extensions.DependencyInjection;
namespace Silverback.Samples.Kafka.Batch.Producer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services
.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(
options => options
.AddKafka())
// Delegate the inbound/outbound endpoints configuration to a separate
// class.
.AddEndpointsConfigurator<EndpointsConfigurator>();
// Add the hosted service that produces the random sample messages
services.AddHostedService<ProducerBackgroundService>();
}
public void Configure()
{
}
}
}
using Silverback.Messaging.Configuration;
using Silverback.Samples.Kafka.Batch.Common;
namespace Silverback.Samples.Kafka.Batch.Producer
{
public class EndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder)
{
builder
.AddKafkaEndpoints(
endpoints => endpoints
// Configure the properties needed by all consumers/producers
.Configure(
config =>
{
// The bootstrap server address is needed to connect
config.BootstrapServers =
"PLAINTEXT://localhost:9092";
})
// Produce the SampleMessage to the samples-batch topic
.AddOutbound<SampleMessage>(
endpoint => endpoint
.ProduceTo("samples-batch")));
}
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Silverback.Messaging.Broker;
using Silverback.Messaging.Publishing;
using Silverback.Samples.Kafka.Batch.Common;
namespace Silverback.Samples.Kafka.Batch.Producer
{
public class ProducerBackgroundService : BackgroundService
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<ProducerBackgroundService> _logger;
public ProducerBackgroundService(
IServiceScopeFactory serviceScopeFactory,
ILogger<ProducerBackgroundService> logger)
{
_serviceScopeFactory = serviceScopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Create a service scope and resolve the IPublisher
// (the IPublisher cannot be resolved from the root scope and cannot
// therefore be directly injected into the BackgroundService)
using var scope = _serviceScopeFactory.CreateScope();
var publisher = scope.ServiceProvider.GetRequiredService<IPublisher>();
var broker = scope.ServiceProvider.GetRequiredService<IBroker>();
int number = 0;
while (!stoppingToken.IsCancellationRequested)
{
// Check whether the connection has been established, since the
// BackgroundService will start immediately, before the application
// is completely bootstrapped
if (!broker.IsConnected)
{
await Task.Delay(100, stoppingToken);
continue;
}
await ProduceMessageAsync(publisher, ++number);
await Task.Delay(50, stoppingToken);
}
}
private async Task ProduceMessageAsync(IPublisher publisher, int number)
{
try
{
await publisher.PublishAsync(
new SampleMessage
{
Number = number
});
_logger.LogInformation("Produced {Number}", number);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to produce {Number}", number);
}
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Producer
Consumer
The consumer processes the messages in batch and outputs the batch sum to the standard output.
using Microsoft.Extensions.DependencyInjection;
namespace Silverback.Samples.Kafka.Batch.Consumer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services
.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(
options => options
.AddKafka())
// Delegate the inbound/outbound endpoints configuration to a separate
// class.
.AddEndpointsConfigurator<EndpointsConfigurator>()
// Register the subscribers
.AddSingletonSubscriber<SampleMessageBatchSubscriber>();
}
public void Configure()
{
}
}
}
using System;
using Confluent.Kafka;
using Silverback.Messaging.Configuration;
namespace Silverback.Samples.Kafka.Batch.Consumer
{
public class EndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder)
{
builder
.AddKafkaEndpoints(
endpoints => endpoints
// Configure the properties needed by all consumers/producers
.Configure(
config =>
{
// The bootstrap server address is needed to connect
config.BootstrapServers =
"PLAINTEXT://localhost:9092";
})
// Consume the samples-batch topic
.AddInbound(
endpoint => endpoint
.ConsumeFrom("samples-batch")
.Configure(
config =>
{
// The consumer needs at least the bootstrap
// server address and a group id to be able
// to connect
config.GroupId = "sample-consumer";
// AutoOffsetReset.Earliest means that the
// consumer must start consuming from the
// beginning of the topic, if no offset was
// stored for this consumer group
config.AutoOffsetReset =
AutoOffsetReset.Earliest;
})
// Configure processing in batches of 100 messages,
// with a max wait time of 5 seconds
.EnableBatchProcessing(100, TimeSpan.FromSeconds(5))));
}
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Silverback.Samples.Kafka.Batch.Common;
namespace Silverback.Samples.Kafka.Batch.Consumer
{
public class SampleMessageBatchSubscriber
{
private readonly ILogger<SampleMessageBatchSubscriber> _logger;
public SampleMessageBatchSubscriber(
ILogger<SampleMessageBatchSubscriber> logger)
{
_logger = logger;
}
public async Task OnBatchReceivedAsync(IAsyncEnumerable<SampleMessage> batch)
{
int sum = 0;
int count = 0;
await foreach (var message in batch)
{
sum += message.Number;
count++;
}
_logger.LogInformation(
"Received batch of {Count} message -> sum: {Sum}",
count,
sum);
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/Batch.Consumer