Show / Hide Table of Contents

    Kafka - Files Streaming

    This sample demonstrates how to deal with raw binary contents and large messages, to transfer some files through Kafka.

    See also: Binary Files, Chunking

    Producer

    The producer exposes two REST API that receive the path of a local file to be streamed. The second API uses a custom BinaryFileMessage to forward further metadata (the file name in this example).

    • Startup
    • EndpointsConfigurator
    • CustomBinaryFileMessage
    • API Controller
    using Microsoft.AspNetCore.Builder;
    using Microsoft.Extensions.DependencyInjection;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer
    {
        public class Startup
        {
            public void ConfigureServices(IServiceCollection services)
            {
                // Enable Silverback
                services
                    .AddSilverback()
    
                    // Use Apache Kafka as message broker
                    .WithConnectionToMessageBroker(
                        options => options
                            .AddKafka())
    
                    // Delegate the inbound/outbound endpoints configuration to a separate
                    // class.
                    .AddEndpointsConfigurator<EndpointsConfigurator>();
    
                // Add API controllers and SwaggerGen
                services.AddControllers();
                services.AddSwaggerGen();
            }
    
            public void Configure(IApplicationBuilder app)
            {
                // Enable middlewares to serve generated Swagger JSON and UI
                app.UseSwagger().UseSwaggerUI(
                    uiOptions =>
                    {
                        uiOptions.SwaggerEndpoint(
                            "/swagger/v1/swagger.json",
                            $"{GetType().Assembly.FullName} API");
                    });
    
                // Enable routing and endpoints for controllers
                app.UseRouting();
                app.UseEndpoints(endpoints => { endpoints.MapControllers(); });
            }
        }
    }
    
    using Silverback.Messaging.Configuration;
    using Silverback.Messaging.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer
    {
        public class EndpointsConfigurator : IEndpointsConfigurator
        {
            public void Configure(IEndpointsConfigurationBuilder builder)
            {
                builder
                    .AddKafkaEndpoints(
                        endpoints => endpoints
    
                            // Configure the properties needed by all consumers/producers
                            .Configure(
                                config =>
                                {
                                    // The bootstrap server address is needed to connect
                                    config.BootstrapServers =
                                        "PLAINTEXT://localhost:9092";
                                })
    
                            // Produce the binary files to the
                            // samples-binary-file-streaming topic
                            .AddOutbound<BinaryFileMessage>(
                                endpoint => endpoint
    
                                    // Force producing to a specific partition (0 in this
                                    // case) to be able to scale to multiple producers
                                    // writing to the same topic. Assigning a different
                                    // partition to each one will ensure that the chunks
                                    // are always contiguous.
                                    // This isn't mandatory and necessary only when
                                    // horizontally scaling the producer.
                                    // (In the final solution the "0" constant value
                                    // should be replaced by a configuration setting.)
                                    .ProduceTo("samples-binary-file-streaming", 0)
    
                                    // Split the binary files into chunks of 512 kB
                                    .EnableChunking(524288)));
            }
        }
    }
    
    using Silverback.Messaging.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Messages
    {
        public class CustomBinaryFileMessage : BinaryFileMessage
        {
            [Header("x-filename")]
            public string? Filename { get; set; }
        }
    }
    
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Silverback.Messaging.Messages;
    using Silverback.Messaging.Publishing;
    using Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Producer.Controllers
    {
        [ApiController]
        [Route("[controller]")]
        public class ProducerController : ControllerBase
        {
            private readonly IPublisher _publisher;
    
            public ProducerController(IPublisher publisher)
            {
                _publisher = publisher;
            }
    
            [HttpPost("binary-file")]
            public async Task<IActionResult> ProduceBinaryFileAsync(
                string filePath,
                string? contentType)
            {
                // Open specified file stream
                using var fileStream = System.IO.File.OpenRead(filePath);
    
                // Create a BinaryFileMessage that wraps the file stream
                var binaryFileMessage = new BinaryFileMessage(fileStream);
    
                if (!string.IsNullOrEmpty(contentType))
                    binaryFileMessage.ContentType = contentType;
    
                // Publish the BinaryFileMessage that will be routed to the outbound
                // endpoint. The FileStream will be read and produced chunk by chunk,
                // without the entire file being loaded into memory.
                await _publisher.PublishAsync(binaryFileMessage);
    
                return NoContent();
            }
    
            [HttpPost("custom-binary-file")]
            public async Task<IActionResult> ProduceBinaryFileWithCustomHeadersAsync(
                string filePath,
                string? contentType)
            {
                // Open specified file stream
                using var fileStream = System.IO.File.OpenRead(filePath);
    
                // Create a CustomBinaryFileMessage that wraps the file stream. The
                // CustomBinaryFileMessage extends the BinaryFileMessage adding an extra
                // Filename property that is also exported as header.
                var binaryFileMessage = new CustomBinaryFileMessage
                {
                    Content = fileStream,
                    Filename = Path.GetFileName(filePath)
                };
    
                if (!string.IsNullOrEmpty(contentType))
                    binaryFileMessage.ContentType = contentType;
    
                // Publish the BinaryFileMessage that will be routed to the outbound
                // endpoint. The FileStream will be read and produced chunk by chunk,
                // without the entire file being loaded into memory.
                await _publisher.PublishAsync(binaryFileMessage);
    
                return NoContent();
            }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/BinaryFileStreaming.Producer

    Consumer

    The consumer simply streams the file to a temporary folder in the local file system.

    • Startup
    • EndpointsConfigurator
    • CustomBinaryFileMessage
    • Subscriber
    using Microsoft.Extensions.DependencyInjection;
    using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Subscribers;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer
    {
        public class Startup
        {
            public void ConfigureServices(IServiceCollection services)
            {
                // Enable Silverback
                services
                    .AddSilverback()
    
                    // Use Apache Kafka as message broker
                    .WithConnectionToMessageBroker(
                        options => options
                            .AddKafka())
    
                    // Delegate the inbound/outbound endpoints configuration to a separate
                    // class.
                    .AddEndpointsConfigurator<EndpointsConfigurator>()
    
                    // Register the subscribers
                    .AddSingletonSubscriber<BinaryFileSubscriber>();
            }
    
            public void Configure()
            {
            }
        }
    }
    
    using Confluent.Kafka;
    using Silverback.Messaging.Configuration;
    using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer
    {
        public class EndpointsConfigurator : IEndpointsConfigurator
        {
            public void Configure(IEndpointsConfigurationBuilder builder)
            {
                builder
                    .AddKafkaEndpoints(
                        endpoints => endpoints
    
                            // Configure the properties needed by all consumers/producers
                            .Configure(
                                config =>
                                {
                                    // The bootstrap server address is needed to connect
                                    config.BootstrapServers =
                                        "PLAINTEXT://localhost:9092";
                                })
    
                            // Consume the samples-binary-file-streaming topic
                            .AddInbound(
                                endpoint => endpoint
    
                                    // Manually assign the partitions to prevent the
                                    // broker to rebalance in the middle of a potentially
                                    // huge sequence of chunks. This is just an
                                    // optimization and isn't strictly necessary.
                                    // (The partitions resolver function returns the
                                    // untouched collection to assign all available
                                    // partitions.)
                                    .ConsumeFrom(
                                        "samples-binary-file-streaming",
                                        partitions => partitions)
                                    .Configure(
                                        config =>
                                        {
                                            // The consumer needs at least the bootstrap
                                            // server address and a group id to be able
                                            // to connect
                                            config.GroupId = "sample-consumer";
    
                                            // AutoOffsetReset.Earliest means that the
                                            // consumer must start consuming from the
                                            // beginning of the topic, if no offset was
                                            // stored for this consumer group
                                            config.AutoOffsetReset =
                                                AutoOffsetReset.Earliest;
                                        })
    
                                    // Force the consumer to use the
                                    // BinaryFileMessageSerializer: this is not strictly
                                    // necessary when the messages are produced by
                                    // Silverback but it increases the interoperability,
                                    // since it doesn't have to rely on the
                                    // 'x-message-type' header value to switch to the
                                    // BinaryFileMessageSerializer.
                                    //
                                    // In this example the BinaryFileMessageSerializer is
                                    // also set to return a CustomBinaryFileMessage
                                    // instead of the normal BinaryFileMessage. This is
                                    // only needed because we want to read the custom
                                    // 'x-message-filename' header, otherwise
                                    // 'ConsumeBinaryFiles()' would work perfectly fine
                                    // (returning a basic BinaryFileMessage, without the
                                    // extra properties).
                                    .ConsumeBinaryFiles(
                                        serializer =>
                                            serializer
                                                .UseModel<CustomBinaryFileMessage>())
    
                                    // Retry each chunks sequence 5 times in case of an
                                    // exception
                                    .OnError(policy => policy.Retry(5))));
            }
        }
    }
    
    using Silverback.Messaging.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages
    {
        public class CustomBinaryFileMessage : BinaryFileMessage
        {
            [Header("x-filename")]
            public string? Filename { get; set; }
        }
    }
    
    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Logging;
    using Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Messages;
    
    namespace Silverback.Samples.Kafka.BinaryFileStreaming.Consumer.Subscribers
    {
        public class BinaryFileSubscriber
        {
            private const string OutputPath = "../../temp";
    
            private readonly ILogger<BinaryFileSubscriber> _logger;
    
            public BinaryFileSubscriber(ILogger<BinaryFileSubscriber> logger)
            {
                _logger = logger;
            }
    
            public async Task OnBinaryFileMessageReceivedAsync(
                CustomBinaryFileMessage binaryFileMessage)
            {
                EnsureTargetFolderExists();
    
                var filename = Guid.NewGuid().ToString("N") + binaryFileMessage.Filename;
    
                _logger.LogInformation("Saving binary file as {Filename}...", filename);
    
                // Create a FileStream to save the file
                using var fileStream = File.OpenWrite(Path.Combine(OutputPath, filename));
    
                if (binaryFileMessage.Content != null)
                {
                    // Asynchronously copy the message content to the FileStream.
                    // The message chunks are streamed directly and the entire file is
                    // never loaded into memory.
                    await binaryFileMessage.Content.CopyToAsync(fileStream);
                }
    
                _logger.LogInformation(
                    "Written {FileStreamLength} bytes into {Filename}",
                    fileStream.Length,
                    filename);
            }
    
            private static void EnsureTargetFolderExists()
            {
                if (!Directory.Exists(OutputPath))
                    Directory.CreateDirectory(OutputPath);
            }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Kafka/BinaryFileStreaming.Consumer

    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini