Class KafkaConsumer
Consumes from one or more endpoints and pushes the received messages via the message bus.
Inherited Members
Namespace: Silverback.Messaging.Broker
Assembly: Silverback.Integration.Kafka.dll
Syntax
public class KafkaConsumer : Consumer<KafkaOffset>, IDisposable, IKafkaConsumer, IConsumer
Constructors
KafkaConsumer(string, IConfluentConsumerWrapper, KafkaConsumerConfiguration, IBrokerBehaviorsProvider<IConsumerBehavior>, IBrokerClientCallbacksInvoker, IKafkaOffsetStoreFactory, IServiceProvider, ISilverbackLogger<KafkaConsumer>)
Initializes a new instance of the KafkaConsumer class.
Declaration
public KafkaConsumer(string name, IConfluentConsumerWrapper client, KafkaConsumerConfiguration configuration, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IBrokerClientCallbacksInvoker callbacksInvoker, IKafkaOffsetStoreFactory offsetStoreFactory, IServiceProvider serviceProvider, ISilverbackLogger<KafkaConsumer> logger)
Parameters
| Type | Name | Description |
|---|---|---|
| string | name | The consumer identifier. |
| IConfluentConsumerWrapper | client | The IConfluentConsumerWrapper to be used. |
| KafkaConsumerConfiguration | configuration | |
| IBrokerBehaviorsProvider<IConsumerBehavior> | behaviorsProvider | |
| IBrokerClientCallbacksInvoker | callbacksInvoker | |
| IKafkaOffsetStoreFactory | offsetStoreFactory | |
| IServiceProvider | serviceProvider | The IServiceProvider to be used to resolve the necessary services. |
| ISilverbackLogger<KafkaConsumer> | logger |
Properties
Client
Gets the related IBrokerClient.
Declaration
public IConfluentConsumerWrapper Client { get; }
Property Value
| Type | Description |
|---|---|
| IConfluentConsumerWrapper |
Configuration
Gets the consumer configuration.
Declaration
public KafkaConsumerConfiguration Configuration { get; }
Property Value
| Type | Description |
|---|---|
| KafkaConsumerConfiguration |
EndpointsConfiguration
Gets the endpoints configuration.
Declaration
public IReadOnlyCollection<KafkaConsumerEndpointConfiguration> EndpointsConfiguration { get; }
Property Value
| Type | Description |
|---|---|
| IReadOnlyCollection<KafkaConsumerEndpointConfiguration> |
Methods
CommitCoreAsync(IReadOnlyCollection<KafkaOffset>)
Commits the specified messages sending the acknowledgement to the message broker.
Declaration
protected override ValueTask CommitCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<KafkaOffset> | brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
Overrides
Dispose(bool)
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
protected override void Dispose(bool disposing)
Parameters
| Type | Name | Description |
|---|---|---|
| bool | disposing | A value indicating whether the method has been called by the |
Overrides
GetOffsetsForTimestamps(IEnumerable<TopicPartitionTimestamp>, TimeSpan)
Looks up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset for which the timestamp is greater than or equal to the given timestamp. If the provided timestamp exceeds that of the last message in the partition, a value of Confluent.Kafka.Offset.End will be returned.
Declaration
public IReadOnlyList<TopicPartitionOffset> GetOffsetsForTimestamps(IEnumerable<TopicPartitionTimestamp> topicPartitionTimestamps, TimeSpan timeout)
Parameters
| Type | Name | Description |
|---|---|---|
| IEnumerable<TopicPartitionTimestamp> | topicPartitionTimestamps | The mapping from partition to the timestamp to look up. |
| TimeSpan | timeout | The maximum period of time the call may block. |
Returns
| Type | Description |
|---|---|
| IReadOnlyList<TopicPartitionOffset> | A mapping from partition to the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp. |
Remarks
The consumer does not need to be assigned to the requested partitions.
Pause(IEnumerable<TopicPartition>)
Pauses the consumption of the specified partitions.
Declaration
public void Pause(IEnumerable<TopicPartition> partitions)
Parameters
| Type | Name | Description |
|---|---|---|
| IEnumerable<TopicPartition> | partitions | The list of Confluent.Kafka.TopicPartition to be paused. |
Resume(IEnumerable<TopicPartition>)
Resumes the consumption of the specified partitions.
Declaration
public void Resume(IEnumerable<TopicPartition> partitions)
Parameters
| Type | Name | Description |
|---|---|---|
| IEnumerable<TopicPartition> | partitions | The list of Confluent.Kafka.TopicPartition to be paused. |
RollbackCoreAsync(IReadOnlyCollection<KafkaOffset>)
If necessary, notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.
Declaration
protected override ValueTask RollbackCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<KafkaOffset> | brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
Overrides
Seek(TopicPartitionOffset)
Seeks the specified partition to the specified offset.
Declaration
public void Seek(TopicPartitionOffset topicPartitionOffset)
Parameters
| Type | Name | Description |
|---|---|---|
| TopicPartitionOffset | topicPartitionOffset | The offset. |
StartCoreAsync()
Starts consuming. Called to resume consuming after StopAsync(bool) has been called.
Declaration
protected override ValueTask StartCoreAsync()
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
Overrides
StopCoreAsync()
Stops consuming while staying connected to the message broker.
Declaration
protected override ValueTask StopCoreAsync()
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
Overrides
WaitUntilConsumingStoppedCoreAsync()
Waits until the consuming is stopped.
Declaration
protected override ValueTask WaitUntilConsumingStoppedCoreAsync()
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |