Binary Files
Serializing a binary file (a stream or a byte array) using the regular JsonMessageSerializer would mean to encode it in base64 and convert it to a UTF-8 encoded byte array. Beside not being very elegant this approach may cause you some trouble when integrating with other systems expecting the raw file content. This procedure would also result in the transferred byte array to be approximately a 30% bigger than the file itself.
In this page it's shown how to use an IBinaryFileMessage to more efficiently transfer raw binary files.
Producer configuration
The IBinaryFileMessage interface is meant to transfer files over the message broker and is natively supported by Silverback. This means that the raw file content will be transferred in its original form.
For convenience the BinaryFileMessage class already implements the IBinaryFileMessage interface. This class exposes a ContentType
property as well, resulting in the content-type
header to be produced.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<IBinaryFileMessage>(endpoint => endpoint
.ProduceTo("raw-files")));
}
Otherwise you can implement the interface yourself or extend the BinaryFileMessage (e.g. to add some additional headers, as explained in the Message Headers section).
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<IBinaryFileMessage>(endpoint => endpoint
.ProduceTo("raw-files")));
}
Consumer configuration
You don't need to do anything special to consume a binary file, if all necessary headers are in place (ensured by Silverback, if it was used to produce the message). The message will be wrapped again in a BinaryFileMessage that can be subscribed like any other message.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("raw-files")
.Configure(config =>
{
config.GroupId = "my-consumer"
}));
}
If the message wasn't produced by Silverback chances are that the message type header is not there. In that case you need to explicitly configure the BinaryFileMessageSerializer in the inbound endpoint.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("raw-files")
.ConsumeBinaryFiles()
.Configure(config =>
{
config.GroupId = "my-consumer"
}));
}
If you need to read additional headers you can either extend the BinaryFileMessage (suggested approach) or subscribe to an IInboundEnvelope<TMessage> .
The following snippet assumes that the files aren't being streamed by a Silverback producer, otherwise it wouldn't be necessary to explicitly set the serializer and the type would be inferred from the x-message-type
header.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("raw-files")
.ConsumeBinaryFiles(serializer => serializer.UseModel<MyBinaryFileMessage>()));
}