Kafka Partitioning and Message Key
Producer
Destination partition
If the destination topic contains multiple partitions, the destination partition is picked according to the hash of the message key. If no explicit message key was set, a random one is generated, resulting in the messages being randomly spread across the partitions.
You can override this default behavior explicitly setting the target partition in the endpoint. The endpoint can be statically defined like in the following snippet or resolved via dynamic routing.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<IIntegrationEvent>(endpoint => endpoint
.ProduceTo("order-events", 2))); // <- partition 2
}
Producing to a fixed partition may be required in the case you have multiple producers to the same topic and you have to prevent the messages from the different clients to be interleaved (e.g. because you are relying on sequences, like chunking).
Message key
Apache Kafka require a message key for different purposes, such as:
- Partitioning: Kafka can guarantee ordering only inside the same partition and it is therefore important to be able to route correlated messages into the same partition. To do so you need to specify a key for each message and Kafka will put all messages with the same key in the same partition.
- Compacting topics: A topic can be configured with
cleanup.policy=compact
to instruct Kafka to keep only the latest message related to a certain object, identified by the message key. In other words Kafka will retain only 1 message per each key value.
Silverback will always generate a message key (same value as the x-message-id
header) but you can also generate your own key, either adding an enricher to the IProducerEndpoint or decorating the properties that must be part of the key with KafkaKeyMemberAttribute.
Using enricher
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<InventoryEvent>(endpoint => endpoint
.ProduceTo("inventory-events")
.WithKafkaKey<InventoryEvent>(
envelope => envelope.Message?.ProductId)));
}
Using KafkaKeyMemberAttribute
public class MultipleKeyMembersMessage : IIntegrationMessage
{
public Guid Id { get; set; }
[KafkaKeyMember]
public string One { get; set; }
[KafkaKeyMember]
public string Two { get; set; }
public string Three { get; set; }
}
Note
The message key will also be received as header (see Message Headers for details).
Consumer
Partitions processing
While using a single poll loop, Silverback processes the messages consumed from each Kafka partition independently and concurrently.
By default up to 10 messages/partitions are processed concurrently (per topic). This value can be tweaked in the endpoint configuration or disabled completely.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events")
.LimitParallelism(2)
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("inventory-events")
.ProcessAllPartitionsTogether()
.Configure(config =>
{
config.GroupId = "my-consumer";
})));
}
Manual partitions assignment
In some cases you don't want to let the broker randomly distribute the partitions among the consumers.
This might also be useful when dealing with large sequences (e.g. large messages/files being chunked or when batch processing), to prevent that a rebalance occurs in the middle of a sequence, forcing the consumer to abort and restart from the beginning.
The assignment can either be completely static or dynamic using a resolver function that will receive all available partitions as input (see IKafkaConsumerEndpointBuilder and KafkaConsumerEndpoint for details).
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom(
new TopicPartition("order-events", 0),
new TopicPartition("order-events", 1))
.Configure(config =>
{
config.GroupId = "my-consumer";
})));
}