Table of Contents

Broker Callbacks

Broker callbacks let you react to lifecycle events raised by broker clients (Kafka consumers/producers, MQTT clients, etc.).

Register a Callback Handler

To handle callbacks, implement one (or more) callback interfaces and register your handler as an IBrokerClientCallback.

You can register the handler with the desired lifetime:

  • AddSingletonBrokerClientCallback<THandler>()
  • AddScopedBrokerClientCallback<THandler>()
  • AddTransientBrokerClientCallback<THandler>()

Generic Callback

The following callback applies to all broker implementations:

Example

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddProducer(...)
        .AddConsumer(...))
    .AddSingletonBrokerClientCallback<MyCallbackHandler>();
public class MyCallbackHandler : IBrokerClientsConfiguredCallback
{
    public async Task OnBrokerClientsConfiguredAsync()
    {
        // Perform required initialization logic.
    }
}

Kafka-Specific Callbacks

Confluent.Kafka (used by the Kafka integration) exposes events for partition assignments, errors, statistics, and more. Silverback surfaces those events as callbacks.

Consumer callbacks

Available callbacks for Kafka consumers:

Example: reset offsets on partition assignment

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddKafka())
    .AddKafkaClients(clients => clients
        .WithBootstrapServers("PLAINTEXT://localhost:9092")
        .AddConsumer(...))
    .AddSingletonBrokerClientCallback<OffsetsResetCallbackHandler>();
public class OffsetsResetCallbackHandler : IKafkaPartitionsAssignedCallback
{
    public IEnumerable<TopicPartitionOffset>? OnPartitionsAssigned(
        IReadOnlyCollection<TopicPartition> topicPartitions,
        IKafkaConsumer consumer)
    {
        // Reset offset to beginning for each assigned partition.
        return topicPartitions.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning));
    }
}

Producer callbacks

Available callbacks for Kafka producers:

MQTT-Specific Callbacks

MQTT clients expose connect/disconnect lifecycle events.

Example: publish a message when the client connects

services.AddSilverback()
    .WithConnectionToMessageBroker(options => options.AddMqtt())
    .AddMqttClients(clients => clients
        .AddClient(...))
    .AddSingletonBrokerClientCallback<ConnectionCallbackHandler>();
public class ConnectionCallbackHandler : IMqttClientConnectedCallback
{
    private readonly IPublisher _publisher;

    public ConnectionCallbackHandler(IPublisher publisher)
    {
        _publisher = publisher;
    }

    public async Task OnClientConnectedAsync(MqttClientConfig config) =>
        await _publisher.PublishAsync(new ClientConnectedMessage());
}

Additional Resources