Class Consumer
Consumes an endpoint and invokes a callback delegate when a message is received.
Inherited Members
Namespace: Silverback.Messaging.Broker
Assembly: Silverback.Integration.dll
Syntax
public abstract class Consumer : IConsumer, IDisposable
Constructors
| Improve this doc View sourceConsumer(IBroker, IConsumerEndpoint, IBrokerBehaviorsProvider<IConsumerBehavior>, IServiceProvider, ISilverbackLogger<Consumer>)
Initializes a new instance of the Consumer class.
Declaration
protected Consumer(IBroker broker, IConsumerEndpoint endpoint, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IServiceProvider serviceProvider, ISilverbackLogger<Consumer> logger)
Parameters
Type | Name | Description |
---|---|---|
IBroker | broker | The IBroker that is instantiating the consumer. |
IConsumer |
endpoint | The endpoint to be consumed. |
IBroker |
behaviorsProvider | |
IService |
serviceProvider | The IService |
ISilverback |
logger | The ISilverback |
Properties
| Improve this doc View sourceBroker
Gets the IBroker that owns this consumer.
Declaration
public IBroker Broker { get; }
Property Value
Type | Description |
---|---|
IBroker |
Endpoint
Gets the IConsumer
Declaration
public IConsumerEndpoint Endpoint { get; }
Property Value
Type | Description |
---|---|
IConsumer |
Id
Gets the Instance
Declaration
public InstanceIdentifier Id { get; }
Property Value
Type | Description |
---|---|
Instance |
IsConnected
Gets a value indicating whether this consumer has successfully initialized the connection to the message broker.
Declaration
public bool IsConnected { get; }
Property Value
Type | Description |
---|---|
bool |
Remarks
This doesn't necessary mean that it is connected and ready to consume. The underlying library might handle the connection process asynchronously in the background or the protocol might require extra steps (e.g. Kafka might require the partitions to be assigned).
IsConnecting
Gets a value indicating whether this consumer is initializing the connection to the message broker.
Declaration
public bool IsConnecting { get; }
Property Value
Type | Description |
---|---|
bool |
IsConsuming
Gets a value indicating whether this consumer is connected and consuming (started).
Declaration
public bool IsConsuming { get; protected set; }
Property Value
Type | Description |
---|---|
bool |
IsDisconnecting
Gets a value indicating whether the consumer is being disconnected.
Declaration
public bool IsDisconnecting { get; }
Property Value
Type | Description |
---|---|
bool |
IsStopping
Gets a value indicating whether the consumer is being stopped.
Declaration
protected bool IsStopping { get; }
Property Value
Type | Description |
---|---|
bool |
ServiceProvider
Gets the IService
Declaration
protected IServiceProvider ServiceProvider { get; }
Property Value
Type | Description |
---|---|
IService |
StatusInfo
Gets the IConsumer
Declaration
public IConsumerStatusInfo StatusInfo { get; }
Property Value
Type | Description |
---|---|
IConsumer |
Methods
| Improve this doc View sourceCommitAsync(IBrokerMessageIdentifier)
Declaration
public Task CommitAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
Type | Name | Description |
---|---|---|
IBroker |
brokerMessageIdentifier | The identifier of the message to be committed. |
Returns
| Improve this doc View sourceCommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Declaration
public Task CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IRead |
brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
| Improve this doc View sourceCommitCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Commits the specified messages sending the acknowledgement to the message broker.
Declaration
protected abstract Task CommitCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IRead |
brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
| Improve this doc View sourceConnectAsync()
Connects and starts consuming.
Declaration
public Task ConnectAsync()
Returns
| Improve this doc View sourceConnectCoreAsync()
Connects to the message broker.
Declaration
protected abstract Task ConnectCoreAsync()
Returns
| Improve this doc View sourceDisconnectAsync()
Disconnects and stops consuming.
Declaration
public Task DisconnectAsync()
Returns
| Improve this doc View sourceDisconnectCoreAsync()
Disconnects from the message broker.
Declaration
protected abstract Task DisconnectCoreAsync()
Returns
| Improve this doc View sourceDispose()
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
public void Dispose()
Dispose(bool)
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
protected virtual void Dispose(bool disposing)
Parameters
Type | Name | Description |
---|---|---|
bool | disposing | A value indicating whether the method has been called by the |
GetCurrentSequenceStores()
Gets the ISequenceKafkaConsumer
will create a store per each assigned partition).
Declaration
public abstract IReadOnlyList<ISequenceStore> GetCurrentSequenceStores()
Returns
Type | Description |
---|---|
IRead |
The list of ISequence |
Remarks
Used only for testing and maintained to preserve backward compatibility.
HandleMessageAsync(byte[]?, IReadOnlyCollection<MessageHeader>, string, IBrokerMessageIdentifier, ISequenceStore)
Handles the consumed message invoking each IConsumer
Declaration
protected virtual Task HandleMessageAsync(byte[]? message, IReadOnlyCollection<MessageHeader> headers, string sourceEndpointName, IBrokerMessageIdentifier brokerMessageIdentifier, ISequenceStore sequenceStore)
Parameters
Type | Name | Description |
---|---|---|
byte[] | message | The body of the consumed message. |
IRead |
headers | The headers of the consumed message. |
string | sourceEndpointName | The name of the actual endpoint (topic) where the message has been delivered. |
IBroker |
brokerMessageIdentifier | The identifier of the consumed message. |
ISequence |
sequenceStore | The ISequence |
Returns
| Improve this doc View sourceIncrementFailedAttempts(IRawInboundEnvelope)
Increments the stored failed attempts count for the specified envelope.
Declaration
public int IncrementFailedAttempts(IRawInboundEnvelope envelope)
Parameters
Type | Name | Description |
---|---|---|
IRaw |
envelope | The envelope. |
Returns
Type | Description |
---|---|
int | The current failed attempts count after the increment. |
RevertReadyStatus()
Called when the connection is lost to transitions the consumer back to Connected.
Declaration
protected void RevertReadyStatus()
RollbackAsync(IBrokerMessageIdentifier)
Declaration
public Task RollbackAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
Type | Name | Description |
---|---|---|
IBroker |
brokerMessageIdentifier | The identifier of the message to be rolled back. |
Returns
| Improve this doc View sourceRollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Declaration
public Task RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IRead |
brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
| Improve this doc View sourceRollbackCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
If necessary notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.
Declaration
protected abstract Task RollbackCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IRead |
brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
| Improve this doc View sourceSetReadyStatus()
Called when fully connected to transitions the consumer to Ready.
Declaration
protected void SetReadyStatus()
StartAsync()
Starts consuming. Used after Stop
Declaration
public Task StartAsync()
Returns
| Improve this doc View sourceStartCoreAsync()
Starts consuming. Called to resume consuming after Stop
Declaration
protected abstract Task StartCoreAsync()
Returns
| Improve this doc View sourceStopAsync()
Stops the consumer without disconnecting. Can be used to pause and resume consuming.
Declaration
public Task StopAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent. |
StopCoreAsync()
Stops consuming while staying connected to the message broker.
Declaration
protected abstract Task StopCoreAsync()
Returns
| Improve this doc View sourceTriggerReconnectAsync()
Disconnects and stops consuming.
Declaration
public Task TriggerReconnectAsync()
Returns
| Improve this doc View sourceWaitUntilConsumingStoppedCoreAsync()
Waits until the consuming is stopped.
Declaration
protected abstract Task WaitUntilConsumingStoppedCoreAsync()