Class Consumer
Consumes an endpoint and invokes a callback delegate when a message is received.
Inherited Members
Namespace: Silverback.Messaging.Broker
Assembly: Silverback.Integration.dll
Syntax
public abstract class Consumer : IConsumer, IDisposable
Constructors
| Improve this doc View sourceConsumer(IBroker, IConsumerEndpoint, IBrokerBehaviorsProvider<IConsumerBehavior>, IServiceProvider, ISilverbackLogger<Consumer>)
Initializes a new instance of the Consumer class.
Declaration
protected Consumer(IBroker broker, IConsumerEndpoint endpoint, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IServiceProvider serviceProvider, ISilverbackLogger<Consumer> logger)
Parameters
Type | Name | Description |
---|---|---|
IBroker | broker | The IBroker that is instantiating the consumer. |
IConsumerEndpoint | endpoint | The endpoint to be consumed. |
IBrokerBehaviorsProvider<IConsumerBehavior> | behaviorsProvider | |
IServiceProvider | serviceProvider | The IServiceProvider to be used to resolve the needed services. |
ISilverbackLogger<Consumer> | logger | The ISilverbackLogger. |
Properties
| Improve this doc View sourceBroker
Gets the IBroker that owns this consumer.
Declaration
public IBroker Broker { get; }
Property Value
Type | Description |
---|---|
IBroker |
Endpoint
Gets the IConsumerEndpoint representing the endpoint that is being consumed.
Declaration
public IConsumerEndpoint Endpoint { get; }
Property Value
Type | Description |
---|---|
IConsumerEndpoint |
Id
Gets the InstanceIdentifier uniquely identifying the consumer instance.
Declaration
public InstanceIdentifier Id { get; }
Property Value
Type | Description |
---|---|
InstanceIdentifier |
IsConnected
Gets a value indicating whether this consumer has successfully initialized the connection to the message broker.
Declaration
public bool IsConnected { get; }
Property Value
Type | Description |
---|---|
bool |
Remarks
This doesn't necessary mean that it is connected and ready to consume. The underlying library might handle the connection process asynchronously in the background or the protocol might require extra steps (e.g. Kafka might require the partitions to be assigned).
IsConnecting
Gets a value indicating whether this consumer is initializing the connection to the message broker.
Declaration
public bool IsConnecting { get; }
Property Value
Type | Description |
---|---|
bool |
IsConsuming
Gets a value indicating whether this consumer is connected and consuming (started).
Declaration
public bool IsConsuming { get; protected set; }
Property Value
Type | Description |
---|---|
bool |
IsDisconnecting
Gets a value indicating whether the consumer is being disconnected.
Declaration
public bool IsDisconnecting { get; }
Property Value
Type | Description |
---|---|
bool |
IsStopping
Gets a value indicating whether the consumer is being stopped.
Declaration
protected bool IsStopping { get; }
Property Value
Type | Description |
---|---|
bool |
ServiceProvider
Gets the IServiceProvider to be used to resolve the required services.
Declaration
protected IServiceProvider ServiceProvider { get; }
Property Value
Type | Description |
---|---|
IServiceProvider |
StatusInfo
Gets the IConsumerStatusInfo containing the status details and basic statistics of this consumer.
Declaration
public IConsumerStatusInfo StatusInfo { get; }
Property Value
Type | Description |
---|---|
IConsumerStatusInfo |
Methods
| Improve this doc View sourceCommitAsync(IBrokerMessageIdentifier)
Declaration
public Task CommitAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
Type | Name | Description |
---|---|---|
IBrokerMessageIdentifier | brokerMessageIdentifier | The identifier of the message to be committed. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Declaration
public Task CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
CommitCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Commits the specified messages sending the acknowledgement to the message broker.
Declaration
protected abstract Task CommitCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
ConnectAsync()
Connects and starts consuming.
Declaration
public Task ConnectAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
ConnectCoreAsync()
Connects to the message broker.
Declaration
protected abstract Task ConnectCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
DisconnectAsync()
Disconnects and stops consuming.
Declaration
public Task DisconnectAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
DisconnectCoreAsync()
Disconnects from the message broker.
Declaration
protected abstract Task DisconnectCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
Dispose()
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
public void Dispose()
Dispose(bool)
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
protected virtual void Dispose(bool disposing)
Parameters
Type | Name | Description |
---|---|---|
bool | disposing | A value indicating whether the method has been called by the |
GetCurrentSequenceStores()
Gets the ISequenceStore instances used by this consumer. Some brokers will require
multiple stores (e.g. the KafkaConsumer
will create a store per each assigned partition).
Declaration
public abstract IReadOnlyList<ISequenceStore> GetCurrentSequenceStores()
Returns
Type | Description |
---|---|
IReadOnlyList<ISequenceStore> | The list of ISequenceStore. |
Remarks
Used only for testing and maintained to preserve backward compatibility.
HandleMessageAsync(byte[]?, IReadOnlyCollection<MessageHeader>, string, IBrokerMessageIdentifier, ISequenceStore)
Handles the consumed message invoking each IConsumerBehavior in the pipeline.
Declaration
protected virtual Task HandleMessageAsync(byte[]? message, IReadOnlyCollection<MessageHeader> headers, string sourceEndpointName, IBrokerMessageIdentifier brokerMessageIdentifier, ISequenceStore sequenceStore)
Parameters
Type | Name | Description |
---|---|---|
byte[] | message | The body of the consumed message. |
IReadOnlyCollection<MessageHeader> | headers | The headers of the consumed message. |
string | sourceEndpointName | The name of the actual endpoint (topic) where the message has been delivered. |
IBrokerMessageIdentifier | brokerMessageIdentifier | The identifier of the consumed message. |
ISequenceStore | sequenceStore | The ISequenceStore to be used. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
IncrementFailedAttempts(IRawInboundEnvelope)
Increments the stored failed attempts count for the specified envelope.
Declaration
public int IncrementFailedAttempts(IRawInboundEnvelope envelope)
Parameters
Type | Name | Description |
---|---|---|
IRawInboundEnvelope | envelope | The envelope. |
Returns
Type | Description |
---|---|
int | The current failed attempts count after the increment. |
RevertReadyStatus()
Called when the connection is lost to transitions the consumer back to Connected.
Declaration
protected void RevertReadyStatus()
RollbackAsync(IBrokerMessageIdentifier)
Declaration
public Task RollbackAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
Type | Name | Description |
---|---|---|
IBrokerMessageIdentifier | brokerMessageIdentifier | The identifier of the message to be rolled back. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Declaration
public Task RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
RollbackCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
If necessary notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.
Declaration
protected abstract Task RollbackCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
Type | Name | Description |
---|---|---|
IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
SetReadyStatus()
Called when fully connected to transitions the consumer to Ready.
Declaration
protected void SetReadyStatus()
StartAsync()
Starts consuming. Used after StopAsync() has been called to resume consuming.
Declaration
public Task StartAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
StartCoreAsync()
Starts consuming. Called to resume consuming after StopAsync() has been called.
Declaration
protected abstract Task StartCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
StopAsync()
Stops the consumer without disconnecting. Can be used to pause and resume consuming.
Declaration
public Task StopAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent. |
StopCoreAsync()
Stops consuming while staying connected to the message broker.
Declaration
protected abstract Task StopCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
TriggerReconnectAsync()
Disconnects and stops consuming.
Declaration
public Task TriggerReconnectAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
WaitUntilConsumingStoppedCoreAsync()
Waits until the consuming is stopped.
Declaration
protected abstract Task WaitUntilConsumingStoppedCoreAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |