Class KafkaConsumer
Consumes an endpoint and invokes a callback delegate when a message is received.
Inherited Members
Namespace: Silverback.Messaging.Broker
Assembly: Silverback.Integration.Kafka.dll
Syntax
public class KafkaConsumer : Consumer<KafkaBroker, KafkaConsumerEndpoint, KafkaOffset>, IConsumer, IDisposable
Constructors
| Improve this doc View sourceKafkaConsumer(KafkaBroker, KafkaConsumerEndpoint, IBrokerBehaviorsProvider<IConsumerBehavior>, IConfluentConsumerBuilder, IBrokerCallbacksInvoker, IServiceProvider, IInboundLogger<KafkaConsumer>)
Initializes a new instance of the KafkaConsumer class.
Declaration
public KafkaConsumer(KafkaBroker broker, KafkaConsumerEndpoint endpoint, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IConfluentConsumerBuilder confluentConsumerBuilder, IBrokerCallbacksInvoker callbacksInvoker, IServiceProvider serviceProvider, IInboundLogger<KafkaConsumer> logger)
Parameters
| Type | Name | Description |
|---|---|---|
| KafkaBroker | broker | The IBroker that is instantiating the consumer. |
| KafkaConsumerEndpoint | endpoint | The endpoint to be consumed. |
| IBrokerBehaviorsProvider<IConsumerBehavior> | behaviorsProvider | |
| IConfluentConsumerBuilder | confluentConsumerBuilder | |
| IBrokerCallbacksInvoker | callbacksInvoker | |
| IServiceProvider | serviceProvider | The IServiceProvider to be used to resolve the needed services. |
| IInboundLogger<KafkaConsumer> | logger |
Properties
| Improve this doc View sourceMemberId
Gets the (dynamic) group member id of this consumer (as set by the broker).
Declaration
public string MemberId { get; }
Property Value
| Type | Description |
|---|---|
| string |
Methods
| Improve this doc View sourceCommitCoreAsync(IReadOnlyCollection<KafkaOffset>)
Commits the specified messages sending the acknowledgement to the message broker.
Declaration
protected override Task CommitCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<KafkaOffset> | brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceConnectCoreAsync()
Connects to the message broker.
Declaration
protected override Task ConnectCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceDisconnectCoreAsync()
Disconnects from the message broker.
Declaration
protected override Task DisconnectCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceGetCurrentSequenceStores()
Gets the ISequenceStore instances used by this consumer. Some brokers will require
multiple stores (e.g. the KafkaConsumer will create a store per each assigned partition).
Declaration
public override IReadOnlyList<ISequenceStore> GetCurrentSequenceStores()
Returns
| Type | Description |
|---|---|
| IReadOnlyList<ISequenceStore> | The list of ISequenceStore. |
Overrides
Remarks
Used only for testing and maintained to preserve backward compatibility.
GetOffsetsForTimestamp(IEnumerable<TopicPartitionTimestamp>, TimeSpan)
Look up the offsets for the given partitions by timestamp. The returned offset for each partition
is the earliest offset for which the timestamp is greater than or equal to the given timestamp.
If the provided timestamp exceeds that of the last message in the partition, a value of
Offset.End (-1) will be returned.
Declaration
public IReadOnlyList<TopicPartitionOffset> GetOffsetsForTimestamp(IEnumerable<TopicPartitionTimestamp> topicPartitionTimestamps, TimeSpan timeout)
Parameters
| Type | Name | Description |
|---|---|---|
| IEnumerable<TopicPartitionTimestamp> | topicPartitionTimestamps | The list of partitions with the target timestamps. |
| TimeSpan | timeout | The maximum amount of time to block waiting for the requested offsets. |
Returns
| Type | Description |
|---|---|
| IReadOnlyList<TopicPartitionOffset> | The list of offsets for each partition. |
RollbackCoreAsync(IReadOnlyCollection<KafkaOffset>)
If necessary notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.
Declaration
protected override Task RollbackCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<KafkaOffset> | brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceStartCoreAsync()
Starts consuming. Called to resume consuming after StopAsync() has been called.
Declaration
protected override Task StartCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceStopCoreAsync()
Stops consuming while staying connected to the message broker.
Declaration
protected override Task StopCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceWaitUntilConsumingStoppedCoreAsync()
Waits until the consuming is stopped.
Declaration
protected override Task WaitUntilConsumingStoppedCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |