Multiple Consumer Groups (in same process)
In some cases you may want to subscribe multiple times the same consumed message, to perform independent tasks. Having multiple subscribers handling the very same message is not a good idea since a failure in one of them will cause the message to be consumed again and thus reprocessed by all subscribers.
A much safer approach is to bind multiple consumers to the same topic, using a different consumer group id. This will cause the message to be consumed multiple times (once per consumer group) and being committed independently. The KafkaGroupIdFilterAttribute can be used to execute a subscribed method according to the group id.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("document-events")
.Configure(config =>
{
config.GroupId = "group1";
}))
.AddInbound(endpoint => endpoint
.ConsumeFrom("document-events")
.Configure(config =>
{
config.GroupId = "group2";
})));
}
Note
The filters can be added dynamically using the overloads of AddSubscriber
accepting a SubscriptionOptions or TypeSubscriptionOptions and this allows you to use a variable for the group id.
.AddSingletonSubscriber<MySubscriber>(
new TypeSubscriptionOptions
{
Filters = new[]
{
new KafkaGroupIdFilterAttribute("consumer1")
}
})
Using the KafkaGroupIdFilterAttribute is the cleanest and easiest approach but alternatively you can always subscribe to the IInboundEnvelope<TMessage> and perform different tasks according to the GroupId
value.
public class MySubscriber
{
public void OnMessageReceived(IInboundEnvelope<MyEvent> envelope)
{
switch (((KafkaConsumerEndpoint)envelope.Endpoint).Configuration.GroupId)
{
case "group1":
PerformTask1(envelope.Message);
break;
case "group2":
PerformTask2(envelope.Message);
break;
}
}
private void PerformTask1(MyEvent @event) => ...
private void PerformTask2(MyEvent @event) => ...
}