Show / Hide Table of Contents

    Kafka - File Streaming

    This sample demonstrates how to deal with raw binary contents and large messages, to transfer some files through Kafka.

    See also: Serializing the Produced Messages, Deserializing the Consumed Messages, Producing Chunked Messages, Consuming Chunked Messages

    Producer

    The producer exposes two REST API that receive the path of a local file to be streamed. The second API uses a custom BinaryMessage to forward further metadata (the file name in this example).

    • Startup
    • BrokerClientsConfigurator
    • CustomBinaryFileMessage
    • API Controller
    using Microsoft.AspNetCore.Builder;
    using Microsoft.Extensions.DependencyInjection;
    using Silverback.Configuration;
    using Silverback.Messaging.Configuration;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer;
    
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            // Enable Silverback
            services.AddSilverback()
    
                // Use Apache Kafka as message broker
                .WithConnectionToMessageBroker(options => options.AddKafka())
    
                // Delegate the broker clients configuration to a separate class
                .AddBrokerClientsConfigurator<BrokerClientsConfigurator>();
    
            // Add API controllers and SwaggerGen
            services.AddControllers();
            services.AddSwaggerGen();
        }
    
        public void Configure(IApplicationBuilder app)
        {
            // Enable middlewares to serve generated Swagger JSON and UI
            app.UseSwagger().UseSwaggerUI(
                uiOptions =>
                {
                    uiOptions.SwaggerEndpoint(
                        "/swagger/v1/swagger.json",
                        $"{GetType().Assembly.FullName} API");
                });
    
            // Enable routing and endpoints for controllers
            app.UseRouting();
            app.UseEndpoints(endpoints => { endpoints.MapControllers(); });
        }
    }
    
    using Silverback.Messaging.Configuration;
    using Silverback.Messaging.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer;
    
    public class BrokerClientsConfigurator : IBrokerClientsConfigurator
    {
        public void Configure(BrokerClientsConfigurationBuilder builder)
        {
            builder
                .AddKafkaClients(
                    clients => clients
    
                        // The bootstrap server address is needed to connect
                        .WithBootstrapServers("PLAINTEXT://localhost:19092")
    
                        // Add a producer
                        .AddProducer(
                            producer => producer
    
                                // Produce the binary files to the
                                // samples-binary-file-streaming topic.
                                //
                                // Force producing to a specific partition (0 in this
                                // case) to be able to scale to multiple producers
                                // writing to the same topic. Assigning a different
                                // partition to each one will ensure that the chunks
                                // are always contiguous.
                                // This isn't mandatory and necessary only when
                                // horizontally scaling the producer.
                                // (In the final solution the "0" constant value
                                // should be replaced by a configuration setting.)
                                .Produce<BinaryMessage>(
                                    endpoint => endpoint
                                        .ProduceTo("samples-binary-file-streaming", 0)
    
                                        // Split the binary files into chunks of 512 kB
                                        .EnableChunking(524288))));
        }
    }
    
    using Silverback.Messaging.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Messages;
    
    public class CustomBinaryMessage : BinaryMessage
    {
        [Header("x-filename")]
        public string? Filename { get; set; }
    }
    
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Silverback.Messaging.Messages;
    using Silverback.Messaging.Publishing;
    using Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Controllers;
    
    [ApiController]
    [Route("[controller]")]
    public class ProducerController : ControllerBase
    {
        private readonly IPublisher _publisher;
    
        public ProducerController(IPublisher publisher)
        {
            _publisher = publisher;
        }
    
        [HttpPost("binary-file")]
        public async Task<IActionResult> ProduceBinaryFileAsync(
            string filePath,
            string? contentType)
        {
            // Open specified file stream
            await using FileStream fileStream = System.IO.File.OpenRead(filePath);
    
            // Create a BinaryMessage that wraps the file stream
            BinaryMessage binaryMessage = new(fileStream);
    
            if (!string.IsNullOrEmpty(contentType))
                binaryMessage.ContentType = contentType;
    
            // Publish the BinaryMessage that will be routed to the outbound
            // endpoint. The FileStream will be read and produced chunk by chunk,
            // without the entire file being loaded into memory.
            await _publisher.PublishAsync(binaryMessage);
    
            return NoContent();
        }
    
        [HttpPost("custom-binary-file")]
        public async Task<IActionResult> ProduceBinaryFileWithCustomHeadersAsync(
            string filePath,
            string? contentType)
        {
            // Open specified file stream
            await using FileStream fileStream = System.IO.File.OpenRead(filePath);
    
            // Create a CustomBinaryMessage that wraps the file stream. The
            // CustomBinaryMessage extends the BinaryMessage adding an extra
            // Filename property that is also exported as header.
            CustomBinaryMessage binaryMessage = new()
            {
                Content = fileStream,
                Filename = Path.GetFileName(filePath)
            };
    
            if (!string.IsNullOrEmpty(contentType))
                binaryMessage.ContentType = contentType;
    
            // Publish the BinaryMessage that will be routed to the outbound
            // endpoint. The FileStream will be read and produced chunk by chunk,
            // without the entire file being loaded into memory.
            await _publisher.PublishAsync(binaryMessage);
    
            return NoContent();
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/BinaryFileStreaming.Producer

    Consumer

    The consumer simply streams the file to a temporary folder in the local file system.

    • Startup
    • BrokerClientsConfigurator
    • CustomBinaryFileMessage
    • Subscriber
    using Microsoft.Extensions.DependencyInjection;
    using Silverback.Configuration;
    using Silverback.Messaging.Configuration;
    using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Subscribers;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer;
    
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            // Enable Silverback
            services.AddSilverback()
    
                // Use Apache Kafka as message broker
                .WithConnectionToMessageBroker(options => options.AddKafka())
    
                // Delegate the broker clients configuration to a separate class
                .AddBrokerClientsConfigurator<BrokerClientsConfigurator>()
    
                // Register the subscribers
                .AddSingletonSubscriber<BinaryFileSubscriber>();
        }
    
        public void Configure()
        {
        }
    }
    
    using Silverback.Messaging.Configuration;
    using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer;
    
    public class BrokerClientsConfigurator : IBrokerClientsConfigurator
    {
        public void Configure(BrokerClientsConfigurationBuilder builder)
        {
            builder
                .AddKafkaClients(
                    clients => clients
    
                        // The bootstrap server address is needed to connect
                        .WithBootstrapServers("PLAINTEXT://localhost:19092")
    
                        // Add a consumer
                        .AddConsumer(
                            consumer => consumer
    
                                // Set the consumer group id
                                .WithGroupId("sample-consumer")
    
                                // AutoOffsetReset.Earliest means that the consumer
                                // must start consuming from the beginning of the topic,
                                // if no offset was stored for this consumer group
                                .AutoResetOffsetToEarliest()
    
                                // Consume the CustomBinaryMessage from the
                                // samples-binary-file-streaming topic.
                                //
                                // A CustomBinaryMessage is used instead of the built-in
                                // BinaryMessage to be able to add the extra header
                                // containing the file name.
                                //
                                // Since the CustomBinaryMessage extends the built-in
                                // BinaryMessage, the serializer will be automatically
                                // switched to the BinaryMessageSerializer.
                                .Consume<CustomBinaryMessage>(
                                    endpoint => endpoint
    
                                        // Manually assign the partitions to prevent the
                                        // broker to rebalance in the middle of a
                                        // potentially huge sequence of chunks. This is
                                        // just an optimization and isn't strictly
                                        // necessary.
                                        // (The partitions resolver function returns the
                                        // untouched collection to assign all available
                                        // partitions.)
                                        .ConsumeFrom(
                                            "samples-binary-file-streaming",
                                            partitions => partitions)
    
                                        // Retry each chunks sequence 5 times in case of an
                                        // exception
                                        .OnError(policy => policy.Retry(5)))));
        }
    }
    
    using Silverback.Messaging.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
    
    public class CustomBinaryMessage : BinaryMessage
    {
        [Header("x-filename")]
        public string? Filename { get; set; }
    }
    
    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Logging;
    using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Subscribers;
    
    public class BinaryFileSubscriber
    {
        private const string OutputPath = "../../temp";
    
        private readonly ILogger<BinaryFileSubscriber> _logger;
    
        public BinaryFileSubscriber(ILogger<BinaryFileSubscriber> logger)
        {
            _logger = logger;
        }
    
        public async Task OnBinaryMessageReceivedAsync(CustomBinaryMessage binaryMessage)
        {
            EnsureTargetFolderExists();
    
            string filename = Guid.NewGuid().ToString("N") + binaryMessage.Filename;
    
            _logger.LogInformation("Saving binary file as {Filename}...", filename);
    
            // Create a FileStream to save the file
            using FileStream fileStream =
                File.OpenWrite(Path.Combine(OutputPath, filename));
    
            if (binaryMessage.Content != null)
            {
                // Asynchronously copy the message content to the FileStream.
                // The message chunks are streamed directly and the entire file is
                // never loaded into memory.
                await binaryMessage.Content.CopyToAsync(fileStream);
            }
    
            _logger.LogInformation(
                "Written {FileStreamLength} bytes into {Filename}",
                fileStream.Length,
                filename);
        }
    
        private static void EnsureTargetFolderExists()
        {
            if (!Directory.Exists(OutputPath))
                Directory.CreateDirectory(OutputPath);
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/BinaryFileStreaming.Consumer

    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2026 Sergio Aquilini