Table of Contents

Class KafkaConsumer

Namespace
Silverback.Messaging.Broker
Assembly
Silverback.Integration.Kafka.dll

Consumes from one or more endpoints and pushes the received messages via the message bus.

public class KafkaConsumer : Consumer<KafkaOffset>, IDisposable, IKafkaConsumer, IConsumer
Inheritance
KafkaConsumer
Implements
Inherited Members

Constructors

KafkaConsumer(string, IConfluentConsumerWrapper, KafkaConsumerConfiguration, IBrokerBehaviorsProvider<IConsumerBehavior>, IBrokerClientCallbacksInvoker, IKafkaOffsetStoreFactory, IServiceProvider, ISilverbackLogger<KafkaConsumer>)

Initializes a new instance of the KafkaConsumer class.

public KafkaConsumer(string name, IConfluentConsumerWrapper client, KafkaConsumerConfiguration configuration, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IBrokerClientCallbacksInvoker callbacksInvoker, IKafkaOffsetStoreFactory offsetStoreFactory, IServiceProvider serviceProvider, ISilverbackLogger<KafkaConsumer> logger)

Parameters

name string

The consumer identifier.

client IConfluentConsumerWrapper

The IConfluentConsumerWrapper to be used.

configuration KafkaConsumerConfiguration

The KafkaConsumerConfiguration.

behaviorsProvider IBrokerBehaviorsProvider<IConsumerBehavior>

The IBrokerBehaviorsProvider<TBehavior>.

callbacksInvoker IBrokerClientCallbacksInvoker

The IBrokerClientCallbacksInvoker.

offsetStoreFactory IKafkaOffsetStoreFactory

The IKafkaOffsetStoreFactory.

serviceProvider IServiceProvider

The IServiceProvider to be used to resolve the necessary services.

logger ISilverbackLogger<KafkaConsumer>

The ISilverbackLogger<TCategoryName>.

Properties

Client

Gets the related IBrokerClient.

public IConfluentConsumerWrapper Client { get; }

Property Value

IConfluentConsumerWrapper

Configuration

Gets the consumer configuration.

public KafkaConsumerConfiguration Configuration { get; }

Property Value

KafkaConsumerConfiguration

EndpointsConfiguration

Gets the endpoints configuration.

public IReadOnlyCollection<KafkaConsumerEndpointConfiguration> EndpointsConfiguration { get; }

Property Value

IReadOnlyCollection<KafkaConsumerEndpointConfiguration>

Methods

CommitCoreAsync(IReadOnlyCollection<KafkaOffset>)

Commits the specified messages sending the acknowledgement to the message broker.

protected override ValueTask CommitCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)

Parameters

brokerMessageIdentifiers IReadOnlyCollection<KafkaOffset>

The identifiers of to message be committed.

Returns

ValueTask

A ValueTask representing the asynchronous operation.

Dispose(bool)

Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.

protected override void Dispose(bool disposing)

Parameters

disposing bool

A value indicating whether the method has been called by the Dispose method and not from the finalizer.

GetOffsetsForTimestamps(IEnumerable<TopicPartitionTimestamp>, TimeSpan)

Looks up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset for which the timestamp is greater than or equal to the given timestamp. If the provided timestamp exceeds that of the last message in the partition, a value of Confluent.Kafka.Offset.End will be returned.

public IReadOnlyList<TopicPartitionOffset> GetOffsetsForTimestamps(IEnumerable<TopicPartitionTimestamp> topicPartitionTimestamps, TimeSpan timeout)

Parameters

topicPartitionTimestamps IEnumerable<TopicPartitionTimestamp>

The mapping from partition to the timestamp to look up.

timeout TimeSpan

The maximum period of time the call may block.

Returns

IReadOnlyList<TopicPartitionOffset>

A mapping from partition to the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp.

Remarks

The consumer does not need to be assigned to the requested partitions.

Pause(IEnumerable<TopicPartition>)

Pauses the consumption of the specified partitions.

public void Pause(IEnumerable<TopicPartition> partitions)

Parameters

partitions IEnumerable<TopicPartition>

The list of Confluent.Kafka.TopicPartition to be paused.

Resume(IEnumerable<TopicPartition>)

Resumes the consumption of the specified partitions.

public void Resume(IEnumerable<TopicPartition> partitions)

Parameters

partitions IEnumerable<TopicPartition>

The list of Confluent.Kafka.TopicPartition to be paused.

RollbackCoreAsync(IReadOnlyCollection<KafkaOffset>)

If necessary, notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.

protected override ValueTask RollbackCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)

Parameters

brokerMessageIdentifiers IReadOnlyCollection<KafkaOffset>

The identifiers of to message be rolled back.

Returns

ValueTask

A ValueTask representing the asynchronous operation.

Seek(TopicPartitionOffset)

Seeks the specified partition to the specified offset.

public void Seek(TopicPartitionOffset topicPartitionOffset)

Parameters

topicPartitionOffset TopicPartitionOffset

The offset.

StartCoreAsync()

Starts consuming. Called to resume consuming after StopAsync(bool) has been called.

protected override ValueTask StartCoreAsync()

Returns

ValueTask

A ValueTask representing the asynchronous operation.

StopCoreAsync()

Stops consuming while staying connected to the message broker.

protected override ValueTask StopCoreAsync()

Returns

ValueTask

A ValueTask representing the asynchronous operation.

WaitUntilConsumingStoppedCoreAsync()

Waits until the consuming is stopped.

protected override ValueTask WaitUntilConsumingStoppedCoreAsync()

Returns

ValueTask

A ValueTask representing the asynchronous operation.