Class KafkaConsumer
Consumes an endpoint and invokes a callback delegate when a message is received.
Inherited Members
Namespace: Silverback.Messaging.Broker
Assembly: Silverback.Integration.Kafka.dll
Syntax
public class KafkaConsumer : Consumer<KafkaBroker, KafkaConsumerEndpoint, KafkaOffset>, IConsumer, IDisposable
Constructors
| Improve this doc View sourceKafkaConsumer(KafkaBroker, KafkaConsumerEndpoint, IBrokerBehaviorsProvider<IConsumerBehavior>, IConfluentConsumerBuilder, IBrokerCallbacksInvoker, IServiceProvider, IInboundLogger<KafkaConsumer>)
Initializes a new instance of the KafkaConsumer class.
Declaration
public KafkaConsumer(KafkaBroker broker, KafkaConsumerEndpoint endpoint, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IConfluentConsumerBuilder confluentConsumerBuilder, IBrokerCallbacksInvoker callbacksInvoker, IServiceProvider serviceProvider, IInboundLogger<KafkaConsumer> logger)
Parameters
Type | Name | Description |
---|---|---|
KafkaBroker | broker | The IBroker that is instantiating the consumer. |
KafkaConsumerEndpoint | endpoint | The endpoint to be consumed. |
IBrokerBehaviorsProvider<IConsumerBehavior> | behaviorsProvider | |
IConfluentConsumerBuilder | confluentConsumerBuilder | |
IBrokerCallbacksInvoker | callbacksInvoker | |
IServiceProvider | serviceProvider | The IServiceProvider to be used to resolve the needed services. |
IInboundLogger<KafkaConsumer> | logger |
Properties
| Improve this doc View sourceMemberId
Gets the (dynamic) group member id of this consumer (as set by the broker).
Declaration
public string MemberId { get; }
Property Value
Type | Description |
---|---|
string |
Methods
| Improve this doc View sourceCommitCoreAsync(IReadOnlyCollection<KafkaOffset>)
Commits the specified messages sending the acknowledgement to the message broker.
Declaration
protected override Task CommitCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IReadOnlyCollection<KafkaOffset> | brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceConnectCoreAsync()
Connects to the message broker.
Declaration
protected override Task ConnectCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceDisconnectCoreAsync()
Disconnects from the message broker.
Declaration
protected override Task DisconnectCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceGetCurrentSequenceStores()
Gets the ISequenceStore instances used by this consumer. Some brokers will require
multiple stores (e.g. the KafkaConsumer
will create a store per each assigned partition).
Declaration
public override IReadOnlyList<ISequenceStore> GetCurrentSequenceStores()
Returns
Type | Description |
---|---|
IReadOnlyList<ISequenceStore> | The list of ISequenceStore. |
Overrides
Remarks
Used only for testing and maintained to preserve backward compatibility.
GetOffsetsForTimestamp(IEnumerable<TopicPartitionTimestamp>, TimeSpan)
Look up the offsets for the given partitions by timestamp. The returned offset for each partition
is the earliest offset for which the timestamp is greater than or equal to the given timestamp.
If the provided timestamp exceeds that of the last message in the partition, a value of
Offset.End
(-1) will be returned.
Declaration
public IReadOnlyList<TopicPartitionOffset> GetOffsetsForTimestamp(IEnumerable<TopicPartitionTimestamp> topicPartitionTimestamps, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionTimestamp> | topicPartitionTimestamps | The list of partitions with the target timestamps. |
TimeSpan | timeout | The maximum amount of time to block waiting for the requested offsets. |
Returns
Type | Description |
---|---|
IReadOnlyList<TopicPartitionOffset> | The list of offsets for each partition. |
RollbackCoreAsync(IReadOnlyCollection<KafkaOffset>)
If necessary notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.
Declaration
protected override Task RollbackCoreAsync(IReadOnlyCollection<KafkaOffset> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IReadOnlyCollection<KafkaOffset> | brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceStartCoreAsync()
Starts consuming. Called to resume consuming after StopAsync() has been called.
Declaration
protected override Task StartCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceStopCoreAsync()
Stops consuming while staying connected to the message broker.
Declaration
protected override Task StopCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
Overrides
| Improve this doc View sourceWaitUntilConsumingStoppedCoreAsync()
Waits until the consuming is stopped.
Declaration
protected override Task WaitUntilConsumingStoppedCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |