Show / Hide Table of Contents

    Broker Callbacks

    Broker callbacks let you react to lifecycle events raised by broker clients (Kafka consumers/producers, MQTT clients, etc.).

    Register a Callback Handler

    To handle callbacks, implement one (or more) callback interfaces and register your handler as an IBrokerClientCallback.

    You can register the handler with the desired lifetime:

    • AddSingletonBrokerClientCallback<THandler>()
    • AddScopedBrokerClientCallback<THandler>()
    • AddTransientBrokerClientCallback<THandler>()

    Generic Callback

    The following callback applies to all broker implementations:

    • IBrokerClientsConfiguredCallback
      Invoked immediately after clients and endpoints have been configured.

    Example

    services.AddSilverback()
        .WithConnectionToMessageBroker(options => options.AddKafka())
        .AddKafkaClients(clients => clients
            .WithBootstrapServers("PLAINTEXT://localhost:9092")
            .AddProducer(...)
            .AddConsumer(...))
        .AddSingletonBrokerClientCallback<MyCallbackHandler>();
    
    public class MyCallbackHandler : IBrokerClientsConfiguredCallback
    {
        public async Task OnBrokerClientsConfiguredAsync()
        {
            // Perform required initialization logic.
        }
    }
    

    Kafka-Specific Callbacks

    Confluent.Kafka (used by the Kafka integration) exposes events for partition assignments, errors, statistics, and more. Silverback surfaces those events as callbacks.

    Consumer callbacks

    Available callbacks for Kafka consumers:

    • IKafkaPartitionsAssignedCallback
      Triggered when the consumer receives a new partition assignment.
    • IKafkaPartitionsRevokedCallback
      Triggered when partition assignments are revoked.
    • IKafkaOffsetCommittedCallback
      Triggered when the consumer commits offsets.
    • IKafkaConsumerErrorCallback
      Triggered when an error occurs.
    • IKafkaConsumerStatisticsCallback
      Triggered when consumer statistics are received (requires StatisticsIntervalMs > 0).
    • IKafkaConsumerLogCallback
      Triggered when the consumer logs a message.
    • IKafkaPartitionEofCallback
      Triggered when the consumer reaches the end of a partition (requires EnablePartitionEof = true).

    Example: reset offsets on partition assignment

    services.AddSilverback()
        .WithConnectionToMessageBroker(options => options.AddKafka())
        .AddKafkaClients(clients => clients
            .WithBootstrapServers("PLAINTEXT://localhost:9092")
            .AddConsumer(...))
        .AddSingletonBrokerClientCallback<OffsetsResetCallbackHandler>();
    
    public class OffsetsResetCallbackHandler : IKafkaPartitionsAssignedCallback
    {
        public IEnumerable<TopicPartitionOffset>? OnPartitionsAssigned(
            IReadOnlyCollection<TopicPartition> topicPartitions,
            IKafkaConsumer consumer)
        {
            // Reset offset to beginning for each assigned partition.
            return topicPartitions.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning));
        }
    }
    

    Producer callbacks

    Available callbacks for Kafka producers:

    • IKafkaProducerStatisticsCallback
      Triggered when producer statistics are received (requires StatisticsIntervalMs > 0).
    • IKafkaProducerLogCallback
      Triggered when the producer logs a message.

    MQTT-Specific Callbacks

    MQTT clients expose connect/disconnect lifecycle events.

    • IMqttClientConnectedCallback
      Triggered when the client successfully connects to the broker.
    • IMqttClientDisconnectingCallback
      Triggered when the client is about to disconnect (not called if the connection is lost unexpectedly).

    Example: publish a message when the client connects

    services.AddSilverback()
        .WithConnectionToMessageBroker(options => options.AddMqtt())
        .AddMqttClients(clients => clients
            .AddClient(...))
        .AddSingletonBrokerClientCallback<ConnectionCallbackHandler>();
    
    public class ConnectionCallbackHandler : IMqttClientConnectedCallback
    {
        private readonly IPublisher _publisher;
    
        public ConnectionCallbackHandler(IPublisher publisher)
        {
            _publisher = publisher;
        }
    
        public async Task OnClientConnectedAsync(MqttClientConfig config) =>
            await _publisher.PublishAsync(new ClientConnectedMessage());
    }
    

    Additional Resources

    • API Reference
    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2026 Sergio Aquilini