Kafka - Files Streaming
This sample demonstrates how to deal with raw binary contents and large messages, to transfer some files through Kafka.
See also: Binary Files, Chunking
Producer
The producer exposes two REST API that receive the path of a local file to be streamed. The second API uses a custom BinaryFileMessage
to forward further metadata (the file name in this example).
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services
.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(
options => options
.AddKafka())
// Delegate the inbound/outbound endpoints configuration to a separate
// class.
.AddEndpointsConfigurator<EndpointsConfigurator>();
// Add API controllers and SwaggerGen
services.AddControllers();
services.AddSwaggerGen();
}
public void Configure(IApplicationBuilder app)
{
// Enable middlewares to serve generated Swagger JSON and UI
app.UseSwagger().UseSwaggerUI(
uiOptions =>
{
uiOptions.SwaggerEndpoint(
"/swagger/v1/swagger.json",
$"{GetType().Assembly.FullName} API");
});
// Enable routing and endpoints for controllers
app.UseRouting();
app.UseEndpoints(endpoints => { endpoints.MapControllers(); });
}
}
}
using Silverback.Messaging.Configuration;
using Silverback.Messaging.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer
{
public class EndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder)
{
builder
.AddKafkaEndpoints(
endpoints => endpoints
// Configure the properties needed by all consumers/producers
.Configure(
config =>
{
// The bootstrap server address is needed to connect
config.BootstrapServers =
"PLAINTEXT://localhost:9092";
})
// Produce the binary files to the
// samples-binary-file-streaming topic
.AddOutbound<BinaryFileMessage>(
endpoint => endpoint
// Force producing to a specific partition (0 in this
// case) to be able to scale to multiple producers
// writing to the same topic. Assigning a different
// partition to each one will ensure that the chunks
// are always contiguous.
// This isn't mandatory and necessary only when
// horizontally scaling the producer.
// (In the final solution the "0" constant value
// should be replaced by a configuration setting.)
.ProduceTo("samples-binary-file-streaming", 0)
// Split the binary files into chunks of 512 kB
.EnableChunking(524288)));
}
}
}
using Silverback.Messaging.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Messages
{
public class CustomBinaryFileMessage : BinaryFileMessage
{
[Header("x-filename")]
public string? Filename { get; set; }
}
}
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Silverback.Messaging.Messages;
using Silverback.Messaging.Publishing;
using Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Controllers
{
[ApiController]
[Route("[controller]")]
public class ProducerController : ControllerBase
{
private readonly IPublisher _publisher;
public ProducerController(IPublisher publisher)
{
_publisher = publisher;
}
[HttpPost("binary-file")]
public async Task<IActionResult> ProduceBinaryFileAsync(
string filePath,
string? contentType)
{
// Open specified file stream
using var fileStream = System.IO.File.OpenRead(filePath);
// Create a BinaryFileMessage that wraps the file stream
var binaryFileMessage = new BinaryFileMessage(fileStream);
if (!string.IsNullOrEmpty(contentType))
binaryFileMessage.ContentType = contentType;
// Publish the BinaryFileMessage that will be routed to the outbound
// endpoint. The FileStream will be read and produced chunk by chunk,
// without the entire file being loaded into memory.
await _publisher.PublishAsync(binaryFileMessage);
return NoContent();
}
[HttpPost("custom-binary-file")]
public async Task<IActionResult> ProduceBinaryFileWithCustomHeadersAsync(
string filePath,
string? contentType)
{
// Open specified file stream
using var fileStream = System.IO.File.OpenRead(filePath);
// Create a CustomBinaryFileMessage that wraps the file stream. The
// CustomBinaryFileMessage extends the BinaryFileMessage adding an extra
// Filename property that is also exported as header.
var binaryFileMessage = new CustomBinaryFileMessage
{
Content = fileStream,
Filename = Path.GetFileName(filePath)
};
if (!string.IsNullOrEmpty(contentType))
binaryFileMessage.ContentType = contentType;
// Publish the BinaryFileMessage that will be routed to the outbound
// endpoint. The FileStream will be read and produced chunk by chunk,
// without the entire file being loaded into memory.
await _publisher.PublishAsync(binaryFileMessage);
return NoContent();
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/BinaryFileStreaming.Producer
Consumer
The consumer simply streams the file to a temporary folder in the local file system.
using Microsoft.Extensions.DependencyInjection;
using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Subscribers;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services
.AddSilverback()
// Use Apache Kafka as message broker
.WithConnectionToMessageBroker(
options => options
.AddKafka())
// Delegate the inbound/outbound endpoints configuration to a separate
// class.
.AddEndpointsConfigurator<EndpointsConfigurator>()
// Register the subscribers
.AddSingletonSubscriber<BinaryFileSubscriber>();
}
public void Configure()
{
}
}
}
using Confluent.Kafka;
using Silverback.Messaging.Configuration;
using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer
{
public class EndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder)
{
builder
.AddKafkaEndpoints(
endpoints => endpoints
// Configure the properties needed by all consumers/producers
.Configure(
config =>
{
// The bootstrap server address is needed to connect
config.BootstrapServers =
"PLAINTEXT://localhost:9092";
})
// Consume the samples-binary-file-streaming topic
.AddInbound(
endpoint => endpoint
// Manually assign the partitions to prevent the
// broker to rebalance in the middle of a potentially
// huge sequence of chunks. This is just an
// optimization and isn't strictly necessary.
// (The partitions resolver function returns the
// untouched collection to assign all available
// partitions.)
.ConsumeFrom(
"samples-binary-file-streaming",
partitions => partitions)
.Configure(
config =>
{
// The consumer needs at least the bootstrap
// server address and a group id to be able
// to connect
config.GroupId = "sample-consumer";
// AutoOffsetReset.Earliest means that the
// consumer must start consuming from the
// beginning of the topic, if no offset was
// stored for this consumer group
config.AutoOffsetReset =
AutoOffsetReset.Earliest;
})
// Force the consumer to use the
// BinaryFileMessageSerializer: this is not strictly
// necessary when the messages are produced by
// Silverback but it increases the interoperability,
// since it doesn't have to rely on the
// 'x-message-type' header value to switch to the
// BinaryFileMessageSerializer.
//
// In this example the BinaryFileMessageSerializer is
// also set to return a CustomBinaryFileMessage
// instead of the normal BinaryFileMessage. This is
// only needed because we want to read the custom
// 'x-message-filename' header, otherwise
// 'ConsumeBinaryFiles()' would work perfectly fine
// (returning a basic BinaryFileMessage, without the
// extra properties).
.ConsumeBinaryFiles(
serializer =>
serializer
.UseModel<CustomBinaryFileMessage>())
// Retry each chunks sequence 5 times in case of an
// exception
.OnError(policy => policy.Retry(5))));
}
}
}
using Silverback.Messaging.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages
{
public class CustomBinaryFileMessage : BinaryFileMessage
{
[Header("x-filename")]
public string? Filename { get; set; }
}
}
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Subscribers
{
public class BinaryFileSubscriber
{
private const string OutputPath = "../../temp";
private readonly ILogger<BinaryFileSubscriber> _logger;
public BinaryFileSubscriber(ILogger<BinaryFileSubscriber> logger)
{
_logger = logger;
}
public async Task OnBinaryFileMessageReceivedAsync(
CustomBinaryFileMessage binaryFileMessage)
{
EnsureTargetFolderExists();
var filename = Guid.NewGuid().ToString("N") + binaryFileMessage.Filename;
_logger.LogInformation("Saving binary file as {Filename}...", filename);
// Create a FileStream to save the file
using var fileStream = File.OpenWrite(Path.Combine(OutputPath, filename));
if (binaryFileMessage.Content != null)
{
// Asynchronously copy the message content to the FileStream.
// The message chunks are streamed directly and the entire file is
// never loaded into memory.
await binaryFileMessage.Content.CopyToAsync(fileStream);
}
_logger.LogInformation(
"Written {FileStreamLength} bytes into {Filename}",
fileStream.Length,
filename);
}
private static void EnsureTargetFolderExists()
{
if (!Directory.Exists(OutputPath))
Directory.CreateDirectory(OutputPath);
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/BinaryFileStreaming.Consumer