Binary Files
Serializing a binary file (a stream or a byte array) using the regular Json
In this page it's shown how to use an IBinary
Producer configuration
The IBinary
For convenience the BinaryContentType
property as well, resulting in the content-type
header to be produced.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<IBinaryFileMessage>(endpoint => endpoint
.ProduceTo("raw-files")));
}
Otherwise you can implement the interface yourself or extend the Binary
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<IBinaryFileMessage>(endpoint => endpoint
.ProduceTo("raw-files")));
}
Consumer configuration
You don't need to do anything special to consume a binary file, if all necessary headers are in place (ensured by Silverback, if it was used to produce the message). The message will be wrapped again in a Binary
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("raw-files")
.Configure(config =>
{
config.GroupId = "my-consumer"
}));
}
If the message wasn't produced by Silverback chances are that the message type header is not there. In that case you need to explicitly configure the Binary
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("raw-files")
.ConsumeBinaryFiles()
.Configure(config =>
{
config.GroupId = "my-consumer"
}));
}
If you need to read additional headers you can either extend the Binary
The following snippet assumes that the files aren't being streamed by a Silverback producer, otherwise it wouldn't be necessary to explicitly set the serializer and the type would be inferred from the x-message-type
header.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("raw-files")
.ConsumeBinaryFiles(serializer => serializer.UseModel<MyBinaryFileMessage>()));
}