Class Consumer
Consumes an endpoint and invokes a callback delegate when a message is received.
Inherited Members
Namespace: Silverback.Messaging.Broker
Assembly: Silverback.Integration.dll
Syntax
public abstract class Consumer : IConsumer, IDisposable
Constructors
| Improve this doc View sourceConsumer(IBroker, IConsumerEndpoint, IBrokerBehaviorsProvider<IConsumerBehavior>, IServiceProvider, ISilverbackLogger<Consumer>)
Initializes a new instance of the Consumer class.
Declaration
protected Consumer(IBroker broker, IConsumerEndpoint endpoint, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IServiceProvider serviceProvider, ISilverbackLogger<Consumer> logger)
Parameters
| Type | Name | Description |
|---|---|---|
| IBroker | broker | The IBroker that is instantiating the consumer. |
| IConsumerEndpoint | endpoint | The endpoint to be consumed. |
| IBrokerBehaviorsProvider<IConsumerBehavior> | behaviorsProvider | |
| IServiceProvider | serviceProvider | The IServiceProvider to be used to resolve the needed services. |
| ISilverbackLogger<Consumer> | logger | The ISilverbackLogger. |
Properties
| Improve this doc View sourceBroker
Gets the IBroker that owns this consumer.
Declaration
public IBroker Broker { get; }
Property Value
| Type | Description |
|---|---|
| IBroker |
Endpoint
Gets the IConsumerEndpoint representing the endpoint that is being consumed.
Declaration
public IConsumerEndpoint Endpoint { get; }
Property Value
| Type | Description |
|---|---|
| IConsumerEndpoint |
Id
Gets the InstanceIdentifier uniquely identifying the consumer instance.
Declaration
public InstanceIdentifier Id { get; }
Property Value
| Type | Description |
|---|---|
| InstanceIdentifier |
IsConnected
Gets a value indicating whether this consumer has successfully initialized the connection to the message broker.
Declaration
public bool IsConnected { get; }
Property Value
| Type | Description |
|---|---|
| bool |
Remarks
This doesn't necessary mean that it is connected and ready to consume. The underlying library might handle the connection process asynchronously in the background or the protocol might require extra steps (e.g. Kafka might require the partitions to be assigned).
IsConnecting
Gets a value indicating whether this consumer is initializing the connection to the message broker.
Declaration
public bool IsConnecting { get; }
Property Value
| Type | Description |
|---|---|
| bool |
IsConsuming
Gets a value indicating whether this consumer is connected and consuming (started).
Declaration
public bool IsConsuming { get; protected set; }
Property Value
| Type | Description |
|---|---|
| bool |
IsDisconnecting
Gets a value indicating whether the consumer is being disconnected.
Declaration
public bool IsDisconnecting { get; }
Property Value
| Type | Description |
|---|---|
| bool |
IsStopping
Gets a value indicating whether the consumer is being stopped.
Declaration
protected bool IsStopping { get; }
Property Value
| Type | Description |
|---|---|
| bool |
ServiceProvider
Gets the IServiceProvider to be used to resolve the required services.
Declaration
protected IServiceProvider ServiceProvider { get; }
Property Value
| Type | Description |
|---|---|
| IServiceProvider |
StatusInfo
Gets the IConsumerStatusInfo containing the status details and basic statistics of this consumer.
Declaration
public IConsumerStatusInfo StatusInfo { get; }
Property Value
| Type | Description |
|---|---|
| IConsumerStatusInfo |
Methods
| Improve this doc View sourceCommitAsync(IBrokerMessageIdentifier)
Declaration
public Task CommitAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
| Type | Name | Description |
|---|---|---|
| IBrokerMessageIdentifier | brokerMessageIdentifier | The identifier of the message to be committed. |
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Declaration
public Task CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
CommitCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Commits the specified messages sending the acknowledgement to the message broker.
Declaration
protected abstract Task CommitCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
ConnectAsync()
Connects and starts consuming.
Declaration
public Task ConnectAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
ConnectCoreAsync()
Connects to the message broker.
Declaration
protected abstract Task ConnectCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
DisconnectAsync()
Disconnects and stops consuming.
Declaration
public Task DisconnectAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
DisconnectCoreAsync()
Disconnects from the message broker.
Declaration
protected abstract Task DisconnectCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
Dispose()
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
public void Dispose()
Dispose(bool)
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
protected virtual void Dispose(bool disposing)
Parameters
| Type | Name | Description |
|---|---|---|
| bool | disposing | A value indicating whether the method has been called by the |
GetCurrentSequenceStores()
Gets the ISequenceStore instances used by this consumer. Some brokers will require
multiple stores (e.g. the KafkaConsumer will create a store per each assigned partition).
Declaration
public abstract IReadOnlyList<ISequenceStore> GetCurrentSequenceStores()
Returns
| Type | Description |
|---|---|
| IReadOnlyList<ISequenceStore> | The list of ISequenceStore. |
Remarks
Used only for testing and maintained to preserve backward compatibility.
HandleMessageAsync(byte[]?, IReadOnlyCollection<MessageHeader>, string, IBrokerMessageIdentifier, ISequenceStore)
Handles the consumed message invoking each IConsumerBehavior in the pipeline.
Declaration
protected virtual Task HandleMessageAsync(byte[]? message, IReadOnlyCollection<MessageHeader> headers, string sourceEndpointName, IBrokerMessageIdentifier brokerMessageIdentifier, ISequenceStore sequenceStore)
Parameters
| Type | Name | Description |
|---|---|---|
| byte[] | message | The body of the consumed message. |
| IReadOnlyCollection<MessageHeader> | headers | The headers of the consumed message. |
| string | sourceEndpointName | The name of the actual endpoint (topic) where the message has been delivered. |
| IBrokerMessageIdentifier | brokerMessageIdentifier | The identifier of the consumed message. |
| ISequenceStore | sequenceStore | The ISequenceStore to be used. |
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
IncrementFailedAttempts(IRawInboundEnvelope)
Increments the stored failed attempts count for the specified envelope.
Declaration
public int IncrementFailedAttempts(IRawInboundEnvelope envelope)
Parameters
| Type | Name | Description |
|---|---|---|
| IRawInboundEnvelope | envelope | The envelope. |
Returns
| Type | Description |
|---|---|
| int | The current failed attempts count after the increment. |
RevertReadyStatus()
Called when the connection is lost to transitions the consumer back to Connected.
Declaration
protected void RevertReadyStatus()
RollbackAsync(IBrokerMessageIdentifier)
Declaration
public Task RollbackAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
| Type | Name | Description |
|---|---|---|
| IBrokerMessageIdentifier | brokerMessageIdentifier | The identifier of the message to be rolled back. |
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Declaration
public Task RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
RollbackCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
If necessary notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.
Declaration
protected abstract Task RollbackCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
SetReadyStatus()
Called when fully connected to transitions the consumer to Ready.
Declaration
protected void SetReadyStatus()
StartAsync()
Starts consuming. Used after StopAsync() has been called to resume consuming.
Declaration
public Task StartAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
StartCoreAsync()
Starts consuming. Called to resume consuming after StopAsync() has been called.
Declaration
protected abstract Task StartCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
StopAsync()
Stops the consumer without disconnecting. Can be used to pause and resume consuming.
Declaration
public Task StopAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent. |
StopCoreAsync()
Stops consuming while staying connected to the message broker.
Declaration
protected abstract Task StopCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
TriggerReconnectAsync()
Disconnects and stops consuming.
Declaration
public Task TriggerReconnectAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |
WaitUntilConsumingStoppedCoreAsync()
Waits until the consuming is stopped.
Declaration
protected abstract Task WaitUntilConsumingStoppedCoreAsync()
Returns
| Type | Description |
|---|---|
| Task | A Task representing the asynchronous operation. |