Class Consumer<TIdentifier>
Consumes from one or more endpoints and pushes the received messages via the message bus.
Inherited Members
Namespace: Silverback.Messaging.Broker
Assembly: Silverback.Integration.dll
Syntax
public abstract class Consumer<TIdentifier> : IConsumer, IDisposable where TIdentifier : class, IBrokerMessageIdentifier
Type Parameters
| Name | Description |
|---|---|
| TIdentifier | The type of the IBrokerMessageIdentifier used by the consumer implementation. |
Constructors
Consumer(string, IBrokerClient, IReadOnlyCollection<ConsumerEndpointConfiguration>, IBrokerBehaviorsProvider<IConsumerBehavior>, IServiceProvider, ISilverbackLogger<IConsumer>)
Initializes a new instance of the Consumer<TIdentifier> class.
Declaration
protected Consumer(string name, IBrokerClient client, IReadOnlyCollection<ConsumerEndpointConfiguration> endpointsConfiguration, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IServiceProvider serviceProvider, ISilverbackLogger<IConsumer> logger)
Parameters
| Type | Name | Description |
|---|---|---|
| string | name | The consumer name. |
| IBrokerClient | client | The IBrokerClient. |
| IReadOnlyCollection<ConsumerEndpointConfiguration> | endpointsConfiguration | The endpoints' configuration. |
| IBrokerBehaviorsProvider<IConsumerBehavior> | behaviorsProvider | |
| IServiceProvider | serviceProvider | The IServiceProvider to be used to resolve the necessary services. |
| ISilverbackLogger<IConsumer> | logger | The ISilverbackLogger. |
Properties
Client
Gets the related IBrokerClient.
Declaration
public IBrokerClient Client { get; }
Property Value
| Type | Description |
|---|---|
| IBrokerClient |
DisplayName
Gets the name to be displayed in the human-targeted output (e.g. logs, health checks result, etc.).
Declaration
public string DisplayName { get; }
Property Value
| Type | Description |
|---|---|
| string |
Remarks
The DisplayName is currently returning the Name but this might change in future implementations.
EndpointsConfiguration
Gets the endpoints configuration.
Declaration
public IReadOnlyCollection<ConsumerEndpointConfiguration> EndpointsConfiguration { get; }
Property Value
| Type | Description |
|---|---|
| IReadOnlyCollection<ConsumerEndpointConfiguration> |
IsStarted
Gets a value indicating whether the consumer is started.
Declaration
protected bool IsStarted { get; }
Property Value
| Type | Description |
|---|---|
| bool |
IsStarting
Gets a value indicating whether the consumer is starting (or connecting to start).
Declaration
protected bool IsStarting { get; }
Property Value
| Type | Description |
|---|---|
| bool |
IsStopping
Gets a value indicating whether the consumer is being stopped.
Declaration
protected bool IsStopping { get; }
Property Value
| Type | Description |
|---|---|
| bool |
Name
Gets the consumer name.
Declaration
public string Name { get; }
Property Value
| Type | Description |
|---|---|
| string |
ServiceProvider
Gets the IServiceProvider to be used to resolve the necessary services.
Declaration
protected IServiceProvider ServiceProvider { get; }
Property Value
| Type | Description |
|---|---|
| IServiceProvider |
StatusInfo
Gets the IConsumerStatusInfo containing the status details and basic statistics of this consumer.
Declaration
public IConsumerStatusInfo StatusInfo { get; }
Property Value
| Type | Description |
|---|---|
| IConsumerStatusInfo |
Methods
CommitAsync(IBrokerMessageIdentifier)
Declaration
public ValueTask CommitAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
| Type | Name | Description |
|---|---|---|
| IBrokerMessageIdentifier | brokerMessageIdentifier | The identifier of the message to be committed. |
Returns
| Type | Description |
|---|---|
| ValueTask | A Task representing the asynchronous operation. |
CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Declaration
public ValueTask CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
| Type | Description |
|---|---|
| ValueTask | A Task representing the asynchronous operation. |
CommitCoreAsync(IReadOnlyCollection<TIdentifier>)
Commits the specified messages sending the acknowledgement to the message broker.
Declaration
protected abstract ValueTask CommitCoreAsync(IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<TIdentifier> | brokerMessageIdentifiers | The identifiers of to message be committed. |
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
Dispose()
Consumes from one or more endpoints and pushes the received messages via the message bus.
Declaration
public void Dispose()
Dispose(bool)
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
protected virtual void Dispose(bool disposing)
Parameters
| Type | Name | Description |
|---|---|---|
| bool | disposing | A value indicating whether the method has been called by the |
HandleMessageAsync(byte[]?, IReadOnlyCollection<MessageHeader>, ConsumerEndpoint, IBrokerMessageIdentifier, ISequenceStore)
Handles the consumed message invoking each IConsumerBehavior in the pipeline.
Declaration
protected virtual ValueTask HandleMessageAsync(byte[]? message, IReadOnlyCollection<MessageHeader> headers, ConsumerEndpoint endpoint, IBrokerMessageIdentifier brokerMessageIdentifier, ISequenceStore sequenceStore)
Parameters
| Type | Name | Description |
|---|---|---|
| byte[] | message | The body of the consumed message. |
| IReadOnlyCollection<MessageHeader> | headers | The headers of the consumed message. |
| ConsumerEndpoint | endpoint | The endpoint from which the message was consumed. |
| IBrokerMessageIdentifier | brokerMessageIdentifier | The identifier of the consumed message. |
| ISequenceStore | sequenceStore | The ISequenceStore to be used. |
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
IncrementFailedAttempts(IRawInboundEnvelope)
Increments the stored failed attempts count for the specified envelope.
Declaration
public int IncrementFailedAttempts(IRawInboundEnvelope envelope)
Parameters
| Type | Name | Description |
|---|---|---|
| IRawInboundEnvelope | envelope | The envelope. |
Returns
| Type | Description |
|---|---|
| int | The current failed attempts count after the increment. |
IsStartedAndNotStopping()
Gets a value indicating whether the consumer is started (or will start) and is not being stopped.
Declaration
protected bool IsStartedAndNotStopping()
Returns
| Type | Description |
|---|---|
| bool | A value indicating whether the consumer is started and not being stopped. |
RevertConnectedStatus()
Called when the connection is lost to transitions the consumer back to Started.
Declaration
protected void RevertConnectedStatus()
RollbackAsync(IBrokerMessageIdentifier)
Declaration
public ValueTask RollbackAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
| Type | Name | Description |
|---|---|---|
| IBrokerMessageIdentifier | brokerMessageIdentifier | The identifier of the message to be rolled back. |
Returns
| Type | Description |
|---|---|
| ValueTask | A Task representing the asynchronous operation. |
RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
Declaration
public ValueTask RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<IBrokerMessageIdentifier> | brokerMessageIdentifiers | The identifiers of to messages be rolled back. |
Returns
| Type | Description |
|---|---|
| ValueTask | A Task representing the asynchronous operation. |
RollbackCoreAsync(IReadOnlyCollection<TIdentifier>)
If necessary, notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.
Declaration
protected abstract ValueTask RollbackCoreAsync(IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<TIdentifier> | brokerMessageIdentifiers | The identifiers of to message be rolled back. |
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
SetConnectedStatus()
Called when fully connected to transitions the consumer to Connected.
Declaration
protected void SetConnectedStatus()
StartAsync()
Starts consuming. Used after StopAsync(bool) has been called to resume consuming.
Declaration
public ValueTask StartAsync()
Returns
| Type | Description |
|---|---|
| ValueTask | A Task representing the asynchronous operation. |
StartCoreAsync()
Starts consuming. Called to resume consuming after StopAsync(bool) has been called.
Declaration
protected abstract ValueTask StartCoreAsync()
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
StopAsync(bool)
Stops the consumer without disconnecting. Can be used to pause and resume consuming.
Declaration
public ValueTask StopAsync(bool waitUntilStopped = true)
Parameters
| Type | Name | Description |
|---|---|---|
| bool | waitUntilStopped | A value indicating whether the method should wait until the consumer has been effectively stopped. |
Returns
| Type | Description |
|---|---|
| ValueTask | A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent. |
StopCoreAsync()
Stops consuming while staying connected to the message broker.
Declaration
protected abstract ValueTask StopCoreAsync()
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
TriggerReconnectAsync()
Stops the consumer and starts an asynchronous Task to disconnect and reconnect it.
Declaration
public ValueTask TriggerReconnectAsync()
Returns
| Type | Description |
|---|---|
| ValueTask | A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent, while the process will be completed in another asynchronous Task. |
Remarks
This is used to recover when the consumer is stuck in state where it's not able to rollback or commit anymore.
WaitUntilConsumingStoppedCoreAsync()
Waits until the consuming is stopped.
Declaration
protected abstract ValueTask WaitUntilConsumingStoppedCoreAsync()
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |