Show / Hide Table of Contents

    Class KafkaConsumer

    Consumes from one or more endpoints and pushes the received messages via the message bus.

    Inheritance
    object
    Consumer<KafkaOffset>
    KafkaConsumer
    Implements
    IDisposable
    IKafkaConsumer
    IConsumer
    Inherited Members
    Consumer<KafkaOffset>.TriggerReconnectAsync()
    Consumer<KafkaOffset>.StartAsync()
    Consumer<KafkaOffset>.StopAsync(bool)
    Consumer<KafkaOffset>.CommitAsync(IBrokerMessageIdentifier)
    Consumer<KafkaOffset>.CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
    Consumer<KafkaOffset>.RollbackAsync(IBrokerMessageIdentifier)
    Consumer<KafkaOffset>.RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
    Consumer<KafkaOffset>.IncrementFailedAttempts(IRawInboundEnvelope)
    Consumer<KafkaOffset>.Dispose()
    Consumer<KafkaOffset>.HandleMessageAsync(byte[], IReadOnlyCollection<MessageHeader>, ConsumerEndpoint, IBrokerMessageIdentifier, ISequenceStore)
    Consumer<KafkaOffset>.SetConnectedStatus()
    Consumer<KafkaOffset>.RevertConnectedStatus()
    Consumer<KafkaOffset>.IsStartedAndNotStopping()
    Consumer<KafkaOffset>.Name
    Consumer<KafkaOffset>.DisplayName
    Consumer<KafkaOffset>.StatusInfo
    Consumer<KafkaOffset>.ServiceProvider
    Consumer<KafkaOffset>.IsStarting
    Consumer<KafkaOffset>.IsStarted
    Consumer<KafkaOffset>.IsStopping
    object.GetType()
    object.MemberwiseClone()
    object.ToString()
    object.Equals(object)
    object.Equals(object, object)
    object.ReferenceEquals(object, object)
    object.GetHashCode()
    Namespace: Silverback.Messaging.Broker
    Assembly: Silverback.Integration.Kafka.dll
    Syntax
    public class KafkaConsumer : Consumer<KafkaOffset>, IDisposable, IKafkaConsumer, IConsumer

    Constructors

    KafkaConsumer(string, IConfluentConsumerWrapper, KafkaConsumerConfiguration, IBrokerBehaviorsProvider<IConsumerBehavior>, IBrokerClientCallbacksInvoker, IKafkaOffsetStoreFactory, IServiceProvider, ISilverbackLogger<KafkaConsumer>)

    Initializes a new instance of the KafkaConsumer class.

    Declaration
    public KafkaConsumer(string name, IConfluentConsumerWrapper client, KafkaConsumerConfiguration configuration, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IBrokerClientCallbacksInvoker callbacksInvoker, IKafkaOffsetStoreFactory offsetStoreFactory, IServiceProvider serviceProvider, ISilverbackLogger<KafkaConsumer> logger)
    Parameters
    Type Name Description
    string name

    The consumer identifier.

    IConfluentConsumerWrapper client

    The IConfluentConsumerWrapper to be used.

    KafkaConsumerConfiguration configuration

    The KafkaConsumerConfiguration.

    IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider

    The IBrokerBehaviorsProvider<TBehavior>.

    IBrokerClientCallbacksInvoker callbacksInvoker

    The IBrokerClientCallbacksInvoker.

    IKafkaOffsetStoreFactory offsetStoreFactory

    The IKafkaOffsetStoreFactory.

    IServiceProvider serviceProvider

    The IServiceProvider to be used to resolve the necessary services.

    ISilverbackLogger<KafkaConsumer> logger

    The ISilverbackLogger<TCategoryName>.

    Properties

    Client

    Gets the related IBrokerClient.

    Declaration
    public IConfluentConsumerWrapper Client { get; }
    Property Value
    Type Description
    IConfluentConsumerWrapper

    Configuration

    Gets the consumer configuration.

    Declaration
    public KafkaConsumerConfiguration Configuration { get; }
    Property Value
    Type Description
    KafkaConsumerConfiguration

    EndpointsConfiguration

    Gets the endpoints configuration.

    Declaration
    public IReadOnlyCollection<KafkaConsumerEndpointConfiguration> EndpointsConfiguration { get; }
    Property Value
    Type Description
    IReadOnlyCollection<KafkaConsumerEndpointConfiguration>

    Methods

    CommitCoreAsync(IReadOnlyCollection<KafkaOffset>)

    Commits the specified messages sending the acknowledgement to the message broker.

    Declaration
    protected override ValueTask CommitCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers

    The identifiers of to message be committed.

    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    Overrides
    Consumer<KafkaOffset>.CommitCoreAsync(IReadOnlyCollection<KafkaOffset>)

    Dispose(bool)

    Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.

    Declaration
    protected override void Dispose(bool disposing)
    Parameters
    Type Name Description
    bool disposing

    A value indicating whether the method has been called by the Dispose method and not from the finalizer.

    Overrides
    Consumer<KafkaOffset>.Dispose(bool)

    GetOffsetsForTimestamps(IEnumerable<TopicPartitionTimestamp>, TimeSpan)

    Looks up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset for which the timestamp is greater than or equal to the given timestamp. If the provided timestamp exceeds that of the last message in the partition, a value of Confluent.Kafka.Offset.End will be returned.

    Declaration
    public IReadOnlyList<TopicPartitionOffset> GetOffsetsForTimestamps(IEnumerable<TopicPartitionTimestamp> topicPartitionTimestamps, TimeSpan timeout)
    Parameters
    Type Name Description
    IEnumerable<TopicPartitionTimestamp> topicPartitionTimestamps

    The mapping from partition to the timestamp to look up.

    TimeSpan timeout

    The maximum period of time the call may block.

    Returns
    Type Description
    IReadOnlyList<TopicPartitionOffset>

    A mapping from partition to the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp.

    Remarks

    The consumer does not need to be assigned to the requested partitions.

    Pause(IEnumerable<TopicPartition>)

    Pauses the consumption of the specified partitions.

    Declaration
    public void Pause(IEnumerable<TopicPartition> partitions)
    Parameters
    Type Name Description
    IEnumerable<TopicPartition> partitions

    The list of Confluent.Kafka.TopicPartition to be paused.

    Resume(IEnumerable<TopicPartition>)

    Resumes the consumption of the specified partitions.

    Declaration
    public void Resume(IEnumerable<TopicPartition> partitions)
    Parameters
    Type Name Description
    IEnumerable<TopicPartition> partitions

    The list of Confluent.Kafka.TopicPartition to be paused.

    RollbackCoreAsync(IReadOnlyCollection<KafkaOffset>)

    If necessary, notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.

    Declaration
    protected override ValueTask RollbackCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers

    The identifiers of to message be rolled back.

    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    Overrides
    Consumer<KafkaOffset>.RollbackCoreAsync(IReadOnlyCollection<KafkaOffset>)

    Seek(TopicPartitionOffset)

    Seeks the specified partition to the specified offset.

    Declaration
    public void Seek(TopicPartitionOffset topicPartitionOffset)
    Parameters
    Type Name Description
    TopicPartitionOffset topicPartitionOffset

    The offset.

    StartCoreAsync()

    Starts consuming. Called to resume consuming after StopAsync(bool) has been called.

    Declaration
    protected override ValueTask StartCoreAsync()
    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    Overrides
    Consumer<KafkaOffset>.StartCoreAsync()

    StopCoreAsync()

    Stops consuming while staying connected to the message broker.

    Declaration
    protected override ValueTask StopCoreAsync()
    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    Overrides
    Consumer<KafkaOffset>.StopCoreAsync()

    WaitUntilConsumingStoppedCoreAsync()

    Waits until the consuming is stopped.

    Declaration
    protected override ValueTask WaitUntilConsumingStoppedCoreAsync()
    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    Overrides
    Consumer<KafkaOffset>.WaitUntilConsumingStoppedCoreAsync()

    Implements

    IDisposable
    IKafkaConsumer
    IConsumer
    GitHub E-Mail
    ↑ Back to top © 2026 Sergio Aquilini