Kafka - File Streaming
This sample demonstrates how to deal with raw binary contents and large messages, to transfer some files through Kafka.
See also: Serializing the Produced Messages, Deserializing the Consumed Messages, Producing Chunked Messages, Consuming Chunked Messages
Producer
The producer exposes two REST API that receive the path of a local file to be streamed. The second API uses a custom BinaryMessage to forward further metadata (the file name in this example).
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(options => options.AddKafka())
// Delegate the broker clients configuration to a separate class
.AddBrokerClientsConfigurator<BrokerClientsConfigurator>();
// Add API controllers and SwaggerGen
services.AddControllers();
services.AddSwaggerGen();
}
public void Configure(IApplicationBuilder app)
{
// Enable middlewares to serve generated Swagger JSON and UI
app.UseSwagger().UseSwaggerUI(
uiOptions =>
{
uiOptions.SwaggerEndpoint(
"/swagger/v1/swagger.json",
$"{GetType().Assembly.FullName} API");
});
// Enable routing and endpoints for controllers
app.UseRouting();
app.UseEndpoints(endpoints => { endpoints.MapControllers(); });
}
}
using Silverback.Messaging.Configuration;
using Silverback.Messaging.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer;
public class BrokerClientsConfigurator : IBrokerClientsConfigurator
{
public void Configure(BrokerClientsConfigurationBuilder builder)
{
builder
.AddKafkaClients(
clients => clients
// The bootstrap server address is needed to connect
.WithBootstrapServers("PLAINTEXT://localhost:19092")
// Add a producer
.AddProducer(
producer => producer
// Produce the binary files to the
// samples-binary-file-streaming topic.
//
// Force producing to a specific partition (0 in this
// case) to be able to scale to multiple producers
// writing to the same topic. Assigning a different
// partition to each one will ensure that the chunks
// are always contiguous.
// This isn't mandatory and necessary only when
// horizontally scaling the producer.
// (In the final solution the "0" constant value
// should be replaced by a configuration setting.)
.Produce<BinaryMessage>(
endpoint => endpoint
.ProduceTo("samples-binary-file-streaming", 0)
// Split the binary files into chunks of 512 kB
.EnableChunking(524288))));
}
}
using Silverback.Messaging.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Messages;
public class CustomBinaryMessage : BinaryMessage
{
[Header("x-filename")]
public string? Filename { get; set; }
}
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Silverback.Messaging.Messages;
using Silverback.Messaging.Publishing;
using Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Controllers;
[ApiController]
[Route("[controller]")]
public class ProducerController : ControllerBase
{
private readonly IPublisher _publisher;
public ProducerController(IPublisher publisher)
{
_publisher = publisher;
}
[HttpPost("binary-file")]
public async Task<IActionResult> ProduceBinaryFileAsync(
string filePath,
string? contentType)
{
// Open specified file stream
await using FileStream fileStream = System.IO.File.OpenRead(filePath);
// Create a BinaryMessage that wraps the file stream
BinaryMessage binaryMessage = new(fileStream);
if (!string.IsNullOrEmpty(contentType))
binaryMessage.ContentType = contentType;
// Publish the BinaryMessage that will be routed to the outbound
// endpoint. The FileStream will be read and produced chunk by chunk,
// without the entire file being loaded into memory.
await _publisher.PublishAsync(binaryMessage);
return NoContent();
}
[HttpPost("custom-binary-file")]
public async Task<IActionResult> ProduceBinaryFileWithCustomHeadersAsync(
string filePath,
string? contentType)
{
// Open specified file stream
await using FileStream fileStream = System.IO.File.OpenRead(filePath);
// Create a CustomBinaryMessage that wraps the file stream. The
// CustomBinaryMessage extends the BinaryMessage adding an extra
// Filename property that is also exported as header.
CustomBinaryMessage binaryMessage = new()
{
Content = fileStream,
Filename = Path.GetFileName(filePath)
};
if (!string.IsNullOrEmpty(contentType))
binaryMessage.ContentType = contentType;
// Publish the BinaryMessage that will be routed to the outbound
// endpoint. The FileStream will be read and produced chunk by chunk,
// without the entire file being loaded into memory.
await _publisher.PublishAsync(binaryMessage);
return NoContent();
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/BinaryFileStreaming.Producer
Consumer
The consumer simply streams the file to a temporary folder in the local file system.
using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;
using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Subscribers;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(options => options.AddKafka())
// Delegate the broker clients configuration to a separate class
.AddBrokerClientsConfigurator<BrokerClientsConfigurator>()
// Register the subscribers
.AddSingletonSubscriber<BinaryFileSubscriber>();
}
public void Configure()
{
}
}
using Silverback.Messaging.Configuration;
using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer;
public class BrokerClientsConfigurator : IBrokerClientsConfigurator
{
public void Configure(BrokerClientsConfigurationBuilder builder)
{
builder
.AddKafkaClients(
clients => clients
// The bootstrap server address is needed to connect
.WithBootstrapServers("PLAINTEXT://localhost:19092")
// Add a consumer
.AddConsumer(
consumer => consumer
// Set the consumer group id
.WithGroupId("sample-consumer")
// AutoOffsetReset.Earliest means that the consumer
// must start consuming from the beginning of the topic,
// if no offset was stored for this consumer group
.AutoResetOffsetToEarliest()
// Consume the CustomBinaryMessage from the
// samples-binary-file-streaming topic.
//
// A CustomBinaryMessage is used instead of the built-in
// BinaryMessage to be able to add the extra header
// containing the file name.
//
// Since the CustomBinaryMessage extends the built-in
// BinaryMessage, the serializer will be automatically
// switched to the BinaryMessageSerializer.
.Consume<CustomBinaryMessage>(
endpoint => endpoint
// Manually assign the partitions to prevent the
// broker to rebalance in the middle of a
// potentially huge sequence of chunks. This is
// just an optimization and isn't strictly
// necessary.
// (The partitions resolver function returns the
// untouched collection to assign all available
// partitions.)
.ConsumeFrom(
"samples-binary-file-streaming",
partitions => partitions)
// Retry each chunks sequence 5 times in case of an
// exception
.OnError(policy => policy.Retry(5)))));
}
}
using Silverback.Messaging.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
public class CustomBinaryMessage : BinaryMessage
{
[Header("x-filename")]
public string? Filename { get; set; }
}
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Subscribers;
public class BinaryFileSubscriber
{
private const string OutputPath = "../../temp";
private readonly ILogger<BinaryFileSubscriber> _logger;
public BinaryFileSubscriber(ILogger<BinaryFileSubscriber> logger)
{
_logger = logger;
}
public async Task OnBinaryMessageReceivedAsync(CustomBinaryMessage binaryMessage)
{
EnsureTargetFolderExists();
string filename = Guid.NewGuid().ToString("N") + binaryMessage.Filename;
_logger.LogInformation("Saving binary file as {Filename}...", filename);
// Create a FileStream to save the file
using FileStream fileStream =
File.OpenWrite(Path.Combine(OutputPath, filename));
if (binaryMessage.Content != null)
{
// Asynchronously copy the message content to the FileStream.
// The message chunks are streamed directly and the entire file is
// never loaded into memory.
await binaryMessage.Content.CopyToAsync(fileStream);
}
_logger.LogInformation(
"Written {FileStreamLength} bytes into {Filename}",
fileStream.Length,
filename);
}
private static void EnsureTargetFolderExists()
{
if (!Directory.Exists(OutputPath))
Directory.CreateDirectory(OutputPath);
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/BinaryFileStreaming.Consumer