Table of Contents

Class Consumer<TIdentifier>

Namespace
Silverback.Messaging.Broker
Assembly
Silverback.Integration.dll

Consumes from one or more endpoints and pushes the received messages via the message bus.

public abstract class Consumer<TIdentifier> : IConsumer, IDisposable where TIdentifier : class, IBrokerMessageIdentifier

Type Parameters

TIdentifier

The type of the IBrokerMessageIdentifier used by the consumer implementation.

Inheritance
Consumer<TIdentifier>
Implements
Derived
Inherited Members

Constructors

Consumer(string, IBrokerClient, IReadOnlyCollection<ConsumerEndpointConfiguration>, IBrokerBehaviorsProvider<IConsumerBehavior>, IServiceProvider, ISilverbackLogger<IConsumer>)

Initializes a new instance of the Consumer<TIdentifier> class.

protected Consumer(string name, IBrokerClient client, IReadOnlyCollection<ConsumerEndpointConfiguration> endpointsConfiguration, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IServiceProvider serviceProvider, ISilverbackLogger<IConsumer> logger)

Parameters

name string

The consumer name.

client IBrokerClient

The IBrokerClient.

endpointsConfiguration IReadOnlyCollection<ConsumerEndpointConfiguration>

The endpoints' configuration.

behaviorsProvider IBrokerBehaviorsProvider<IConsumerBehavior>

The IBrokerBehaviorsProvider<TBehavior>.

serviceProvider IServiceProvider

The IServiceProvider to be used to resolve the necessary services.

logger ISilverbackLogger<IConsumer>

The ISilverbackLogger.

Properties

Client

Gets the related IBrokerClient.

public IBrokerClient Client { get; }

Property Value

IBrokerClient

DisplayName

Gets the name to be displayed in the human-targeted output (e.g. logs, health checks result, etc.).

public string DisplayName { get; }

Property Value

string

Remarks

The DisplayName is currently returning the Name but this might change in future implementations.

EndpointsConfiguration

Gets the endpoints configuration.

public IReadOnlyCollection<ConsumerEndpointConfiguration> EndpointsConfiguration { get; }

Property Value

IReadOnlyCollection<ConsumerEndpointConfiguration>

IsStarted

Gets a value indicating whether the consumer is started.

protected bool IsStarted { get; }

Property Value

bool

IsStarting

Gets a value indicating whether the consumer is starting (or connecting to start).

protected bool IsStarting { get; }

Property Value

bool

IsStopping

Gets a value indicating whether the consumer is being stopped.

protected bool IsStopping { get; }

Property Value

bool

Name

Gets the consumer name.

public string Name { get; }

Property Value

string

ServiceProvider

Gets the IServiceProvider to be used to resolve the necessary services.

protected IServiceProvider ServiceProvider { get; }

Property Value

IServiceProvider

StatusInfo

Gets the IConsumerStatusInfo containing the status details and basic statistics of this consumer.

public IConsumerStatusInfo StatusInfo { get; }

Property Value

IConsumerStatusInfo

Methods

CommitAsync(IBrokerMessageIdentifier)

Confirms that the specified message has been successfully processed. The acknowledgement will be sent to the message broker and the message will never be consumed again (by the same logical consumer / consumer group).
public ValueTask CommitAsync(IBrokerMessageIdentifier brokerMessageIdentifier)

Parameters

brokerMessageIdentifier IBrokerMessageIdentifier

The identifier of the message to be committed.

Returns

ValueTask

A Task representing the asynchronous operation.

CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)

Confirms that the specified messages have been successfully processed. The acknowledgement will be sent to the message broker, and the message will never be consumed again (by the same logical consumer / consumer group).
public ValueTask CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)

Parameters

brokerMessageIdentifiers IReadOnlyCollection<IBrokerMessageIdentifier>

The identifiers of to message be committed.

Returns

ValueTask

A Task representing the asynchronous operation.

CommitCoreAsync(IReadOnlyCollection<TIdentifier>)

Commits the specified messages sending the acknowledgement to the message broker.

protected abstract ValueTask CommitCoreAsync(IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers)

Parameters

brokerMessageIdentifiers IReadOnlyCollection<TIdentifier>

The identifiers of to message be committed.

Returns

ValueTask

A ValueTask representing the asynchronous operation.

Dispose()

public void Dispose()

Dispose(bool)

Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.

protected virtual void Dispose(bool disposing)

Parameters

disposing bool

A value indicating whether the method has been called by the Dispose method and not from the finalizer.

HandleMessageAsync(byte[]?, IReadOnlyCollection<MessageHeader>, ConsumerEndpoint, IBrokerMessageIdentifier, ISequenceStore)

Handles the consumed message invoking each IConsumerBehavior in the pipeline.

protected virtual ValueTask HandleMessageAsync(byte[]? message, IReadOnlyCollection<MessageHeader> headers, ConsumerEndpoint endpoint, IBrokerMessageIdentifier brokerMessageIdentifier, ISequenceStore sequenceStore)

Parameters

message byte[]

The body of the consumed message.

headers IReadOnlyCollection<MessageHeader>

The headers of the consumed message.

endpoint ConsumerEndpoint

The endpoint from which the message was consumed.

brokerMessageIdentifier IBrokerMessageIdentifier

The identifier of the consumed message.

sequenceStore ISequenceStore

The ISequenceStore to be used.

Returns

ValueTask

A ValueTask representing the asynchronous operation.

IncrementFailedAttempts(IRawInboundEnvelope)

Increments the stored failed attempts count for the specified envelope.

public int IncrementFailedAttempts(IRawInboundEnvelope envelope)

Parameters

envelope IRawInboundEnvelope

The envelope.

Returns

int

The current failed attempts count after the increment.

IsStartedAndNotStopping()

Gets a value indicating whether the consumer is started (or will start) and is not being stopped.

protected bool IsStartedAndNotStopping()

Returns

bool

A value indicating whether the consumer is started and not being stopped.

RevertConnectedStatus()

Called when the connection is lost to transitions the consumer back to Started.

protected void RevertConnectedStatus()

RollbackAsync(IBrokerMessageIdentifier)

Notifies that an error occurred while processing the specified message. If necessary, the information will be sent to the message broker to ensure that the message will be consumed again.
public ValueTask RollbackAsync(IBrokerMessageIdentifier brokerMessageIdentifier)

Parameters

brokerMessageIdentifier IBrokerMessageIdentifier

The identifier of the message to be rolled back.

Returns

ValueTask

A Task representing the asynchronous operation.

RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)

Notifies that an error occured while processing the specified messages. If necessary the information will be sent to the message broker to ensure that the message will be re-processed.
public ValueTask RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)

Parameters

brokerMessageIdentifiers IReadOnlyCollection<IBrokerMessageIdentifier>

The identifiers of to messages be rolled back.

Returns

ValueTask

A Task representing the asynchronous operation.

RollbackCoreAsync(IReadOnlyCollection<TIdentifier>)

If necessary, notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.

protected abstract ValueTask RollbackCoreAsync(IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers)

Parameters

brokerMessageIdentifiers IReadOnlyCollection<TIdentifier>

The identifiers of to message be rolled back.

Returns

ValueTask

A ValueTask representing the asynchronous operation.

SetConnectedStatus()

Called when fully connected to transitions the consumer to Connected.

protected void SetConnectedStatus()

StartAsync()

Starts consuming. Used after StopAsync(bool) has been called to resume consuming.

public ValueTask StartAsync()

Returns

ValueTask

A Task representing the asynchronous operation.

StartCoreAsync()

Starts consuming. Called to resume consuming after StopAsync(bool) has been called.

protected abstract ValueTask StartCoreAsync()

Returns

ValueTask

A ValueTask representing the asynchronous operation.

StopAsync(bool)

Stops the consumer without disconnecting. Can be used to pause and resume consuming.

public ValueTask StopAsync(bool waitUntilStopped = true)

Parameters

waitUntilStopped bool

A value indicating whether the method should wait until the consumer has been effectively stopped.

Returns

ValueTask

A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent.

StopCoreAsync()

Stops consuming while staying connected to the message broker.

protected abstract ValueTask StopCoreAsync()

Returns

ValueTask

A ValueTask representing the asynchronous operation.

TriggerReconnectAsync()

Stops the consumer and starts an asynchronous Task to disconnect and reconnect it.

public ValueTask TriggerReconnectAsync()

Returns

ValueTask

A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent, while the process will be completed in another asynchronous Task.

Remarks

This is used to recover when the consumer is stuck in state where it's not able to rollback or commit anymore.

WaitUntilConsumingStoppedCoreAsync()

Waits until the consuming is stopped.

protected abstract ValueTask WaitUntilConsumingStoppedCoreAsync()

Returns

ValueTask

A ValueTask representing the asynchronous operation.