Show / Hide Table of Contents

    Binary Files

    Serializing a binary file (a stream or a byte array) using the regular JsonMessageSerializer would mean to encode it in base64 and convert it to a UTF-8 encoded byte array. Beside not being very elegant this approach may cause you some trouble when integrating with other systems expecting the raw file content. This procedure would also result in the transferred byte array to be approximately a 30% bigger than the file itself.

    In this page it's shown how to use an IBinaryFileMessage to more efficiently transfer raw binary files.

    Producer configuration

    The IBinaryFileMessage interface is meant to transfer files over the message broker and is natively supported by Silverback. This means that the raw file content will be transferred in its original form.

    For convenience the BinaryFileMessage class already implements the IBinaryFileMessage interface. This class exposes a ContentType property as well, resulting in the content-type header to be produced.

    • EndpointsConfigurator (fluent)
    • EndpointsConfigurator (legacy)
    • Publisher
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddOutbound<IBinaryFileMessage>(endpoint => endpoint
                        .ProduceTo("raw-files")));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddOutbound<IBinaryFileMessage>(
                    new KafkaProducerEndpoint("inventory-events")
                    {
                        Configuration = new KafkaProducerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        } 
                    });
    }
    
    public class FileTransferService
    {
        private readonly IPublisher _publisher;
    
        public FileTransferService(IPublisher publisher)
        {
            _publisher = publisher;
        }
    
        public async Task TransferFile(byte[] content, string contentType)
        {
            await _publihser.PublishAsync(
                new BinaryFileMessage(content, contentType));
        }
    }
    

    Otherwise you can implement the interface yourself or extend the BinaryFileMessage (e.g. to add some additional headers, as explained in the Message Headers section).

    • EndpointsConfigurator (fluent)
    • EndpointsConfigurator (legacy)
    • Message
    • Publisher
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddOutbound<IBinaryFileMessage>(endpoint => endpoint
                        .ProduceTo("raw-files")));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddOutbound<IBinaryFileMessage>(
                    new KafkaProducerEndpoint("raw-files")
                    {
                        Configuration = new KafkaProducerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        } 
                    });
    }
    
    public class MyBinaryFileMessage : BinaryFileMessage
    {
        [Header("x-user-id")]
        public Guid UserId { get; set; }
    }
    
    public class FileTransferService
    {
        private readonly IPublisher _publisher;
    
        public FileTransferService(IPublisher publisher)
        {
            _publisher = publisher;
        }
    
        public async Task TransferFile(
            byte[] content, 
            string contentType,
            Guid userId)
        {
            await _publihser.PublishAsync(
                new MyBinaryFileMessage
                {
                    Content = content,
                    ContentType = contentType,
                    UserId = userId
                });
        }
    }
    

    Consumer configuration

    You don't need to do anything special to consume a binary file, if all necessary headers are in place (ensured by Silverback, if it was used to produce the message). The message will be wrapped again in a BinaryFileMessage that can be subscribed like any other message.

    • EndpointsConfigurator (fluent)
    • EndpointsConfigurator (legacy)
    • Subscriber
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddInbound(endpoint => endpoint
                        .ConsumeFrom("raw-files")
                        .Configure(config =>
                            {
                                config.GroupId = "my-consumer"
                            }));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddInbound(
                    new KafkaConsumerEndpoint("raw-files")
                    {
                        Configuration = new KafkaConsumerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        } 
                    });
    }
    
    public class FileSubscriberService
    {
        public async Task OnFileReceived(IBinaryFileMessage message)
        {
            // ...your file handling logic...
        }
    }
    

    If the message wasn't produced by Silverback chances are that the message type header is not there. In that case you need to explicitly configure the BinaryFileMessageSerializer in the inbound endpoint.

    • EndpointsConfigurator (fluent)
    • EndpointsConfigurator (legacy)
    • Subscriber
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddInbound(endpoint => endpoint
                        .ConsumeFrom("raw-files")
                        .ConsumeBinaryFiles()
                        .Configure(config =>
                            {
                                config.GroupId = "my-consumer"
                            }));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddInbound(
                    new KafkaConsumerEndpoint("raw-files")
                    {
                        Configuration = new KafkaConsumerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        } ,
                        Serializer = BinaryFileMessageSerializer.Default
                    });
    }
    
    public class FileSubscriberService
    {
        public async Task OnFileReceived(IBinaryFileMessage message)
        {
            // ...your file handling logic...
        }
    }
    

    If you need to read additional headers you can either extend the BinaryFileMessage (suggested approach) or subscribe to an IInboundEnvelope<TMessage> .

    The following snippet assumes that the files aren't being streamed by a Silverback producer, otherwise it wouldn't be necessary to explicitly set the serializer and the type would be inferred from the x-message-type header.

    • EndpointsConfigurator (fluent)
    • EndpointsConfigurator (legacy)
    • Message
    • Subscriber
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddInbound(endpoint => endpoint
                        .ConsumeFrom("raw-files")
                        .ConsumeBinaryFiles(serializer => serializer.UseModel<MyBinaryFileMessage>()));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddOutbound<IBinaryFileMessage>(
                    new KafkaProducerEndpoint("raw-files")
                    {
                        Configuration = new KafkaProducerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092"
                        },
                        Serializer = new BinaryFileMessageSerializer<MyBinaryFileMessage>()
                    });
    }
    
    public class MyBinaryFileMessage : BinaryFileMessage
    {
        [Header("x-user-id")]
        public Guid UserId { get; set; }
    }
    
    public class FileSubscriberService
    {
        public async Task OnFileReceived(MyBinaryFileMessage message)
        {
            // ...your file handling logic...
        }
    }
    

    Samples

    • Kafka - Files Streaming
    • Improve this doc
    In this article
    • Producer configuration
    • Consumer configuration
    • Samples
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini