Show / Hide Table of Contents

    Multiple Consumer Groups (in same process)

    In some cases you may want to subscribe multiple times the same consumed message, to perform independent tasks. Having multiple subscribers handling the very same message is not a good idea since a failure in one of them will cause the message to be consumed again and thus reprocessed by all subscribers.

    A much safer approach is to bind multiple consumers to the same topic, using a different consumer group id. This will cause the message to be consumed multiple times (once per consumer group) and being committed independently. The KafkaGroupIdFilterAttribute can be used to execute a subscribed method according to the group id.

    • EndpointsConfigurator (fluent)
    • EndpointsConfigurator (legacy)
    • Subscriber
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddKafkaEndpoints(endpoints => endpoints
                    .Configure(config => 
                        {
                            config.BootstrapServers = "PLAINTEXT://kafka:9092"; 
                        })
                    .AddInbound(endpoint => endpoint
                        .ConsumeFrom("document-events")
                        .Configure(config =>
                            {
                                config.GroupId = "group1";
                            }))
                    .AddInbound(endpoint => endpoint
                        .ConsumeFrom("document-events")
                        .Configure(config =>
                            {
                                config.GroupId = "group2";
                            })));
    }
    
    public class MyEndpointsConfigurator : IEndpointsConfigurator
    {
        public void Configure(IEndpointsConfigurationBuilder builder) =>
            builder
                .AddInbound(
                    new KafkaConsumerEndpoint("document-events")
                    {
                        Configuration = new KafkaConsumerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092",
                            GroupId = "group1"
                        }
                    })
                .AddInbound(
                    new KafkaConsumerEndpoint("document-events")
                    {
                        Configuration = new KafkaConsumerConfig
                        {
                            BootstrapServers = "PLAINTEXT://kafka:9092",
                            GroupId = "group2"
                        }
                    });
    }
    
    public class MySubscriber
    {
        [KafkaGroupIdFilter("group1")]
        public void PerformTask1(MyEvent @event) => ...
    
        [KafkaGroupIdFilter("group2")]
        public void PerformTask2(MyEvent @event) => ...
    }
    
    Note

    The filters can be added dynamically using the overloads of AddSubscriber accepting a SubscriptionOptions or TypeSubscriptionOptions and this allows you to use a variable for the group id.

    .AddSingletonSubscriber<MySubscriber>(
        new TypeSubscriptionOptions
        {
            Filters = new[]
            {
                new KafkaGroupIdFilterAttribute("consumer1")
            }
        })
    

    Using the KafkaGroupIdFilterAttribute is the cleanest and easiest approach but alternatively you can always subscribe to the IInboundEnvelope<TMessage> and perform different tasks according to the GroupId value.

    public class MySubscriber
    {
        public void OnMessageReceived(IInboundEnvelope<MyEvent> envelope)
        {
            switch (((KafkaConsumerEndpoint)envelope.Endpoint).Configuration.GroupId)
            {
                case "group1":
                    PerformTask1(envelope.Message);
                    break;
                case "group2":
                    PerformTask2(envelope.Message);
                    break;
            }
        }
    
        private void PerformTask1(MyEvent @event) => ...
    
        private void PerformTask2(MyEvent @event) => ...
    }
    
    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini