Class Consumer<TIdentifier>
- Namespace
- Silverback.Messaging.Broker
- Assembly
- Silverback.Integration.dll
Consumes from one or more endpoints and pushes the received messages via the message bus.
public abstract class Consumer<TIdentifier> : IConsumer, IDisposable where TIdentifier : class, IBrokerMessageIdentifier
Type Parameters
TIdentifierThe type of the IBrokerMessageIdentifier used by the consumer implementation.
- Inheritance
-
Consumer<TIdentifier>
- Implements
- Derived
- Inherited Members
Constructors
Consumer(string, IBrokerClient, IReadOnlyCollection<ConsumerEndpointConfiguration>, IBrokerBehaviorsProvider<IConsumerBehavior>, IServiceProvider, ISilverbackLogger<IConsumer>)
Initializes a new instance of the Consumer<TIdentifier> class.
protected Consumer(string name, IBrokerClient client, IReadOnlyCollection<ConsumerEndpointConfiguration> endpointsConfiguration, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IServiceProvider serviceProvider, ISilverbackLogger<IConsumer> logger)
Parameters
namestringThe consumer name.
clientIBrokerClientThe IBrokerClient.
endpointsConfigurationIReadOnlyCollection<ConsumerEndpointConfiguration>The endpoints' configuration.
behaviorsProviderIBrokerBehaviorsProvider<IConsumerBehavior>serviceProviderIServiceProviderThe IServiceProvider to be used to resolve the necessary services.
loggerISilverbackLogger<IConsumer>The ISilverbackLogger.
Properties
Client
Gets the related IBrokerClient.
public IBrokerClient Client { get; }
Property Value
DisplayName
Gets the name to be displayed in the human-targeted output (e.g. logs, health checks result, etc.).
public string DisplayName { get; }
Property Value
Remarks
The DisplayName is currently returning the Name but this might change in future implementations.
EndpointsConfiguration
Gets the endpoints configuration.
public IReadOnlyCollection<ConsumerEndpointConfiguration> EndpointsConfiguration { get; }
Property Value
IsStarted
Gets a value indicating whether the consumer is started.
protected bool IsStarted { get; }
Property Value
IsStarting
Gets a value indicating whether the consumer is starting (or connecting to start).
protected bool IsStarting { get; }
Property Value
IsStopping
Gets a value indicating whether the consumer is being stopped.
protected bool IsStopping { get; }
Property Value
Name
Gets the consumer name.
public string Name { get; }
Property Value
ServiceProvider
Gets the IServiceProvider to be used to resolve the necessary services.
protected IServiceProvider ServiceProvider { get; }
Property Value
StatusInfo
Gets the IConsumerStatusInfo containing the status details and basic statistics of this consumer.
public IConsumerStatusInfo StatusInfo { get; }
Property Value
Methods
CommitAsync(IBrokerMessageIdentifier)
public ValueTask CommitAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
brokerMessageIdentifierIBrokerMessageIdentifierThe identifier of the message to be committed.
Returns
CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
public ValueTask CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
brokerMessageIdentifiersIReadOnlyCollection<IBrokerMessageIdentifier>The identifiers of to message be committed.
Returns
CommitCoreAsync(IReadOnlyCollection<TIdentifier>)
Commits the specified messages sending the acknowledgement to the message broker.
protected abstract ValueTask CommitCoreAsync(IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers)
Parameters
brokerMessageIdentifiersIReadOnlyCollection<TIdentifier>The identifiers of to message be committed.
Returns
Dispose()
public void Dispose()
Dispose(bool)
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
protected virtual void Dispose(bool disposing)
Parameters
disposingboolA value indicating whether the method has been called by the
Disposemethod and not from the finalizer.
HandleMessageAsync(byte[]?, IReadOnlyCollection<MessageHeader>, ConsumerEndpoint, IBrokerMessageIdentifier, ISequenceStore)
Handles the consumed message invoking each IConsumerBehavior in the pipeline.
protected virtual ValueTask HandleMessageAsync(byte[]? message, IReadOnlyCollection<MessageHeader> headers, ConsumerEndpoint endpoint, IBrokerMessageIdentifier brokerMessageIdentifier, ISequenceStore sequenceStore)
Parameters
messagebyte[]The body of the consumed message.
headersIReadOnlyCollection<MessageHeader>The headers of the consumed message.
endpointConsumerEndpointThe endpoint from which the message was consumed.
brokerMessageIdentifierIBrokerMessageIdentifierThe identifier of the consumed message.
sequenceStoreISequenceStoreThe ISequenceStore to be used.
Returns
IncrementFailedAttempts(IRawInboundEnvelope)
Increments the stored failed attempts count for the specified envelope.
public int IncrementFailedAttempts(IRawInboundEnvelope envelope)
Parameters
envelopeIRawInboundEnvelopeThe envelope.
Returns
- int
The current failed attempts count after the increment.
IsStartedAndNotStopping()
Gets a value indicating whether the consumer is started (or will start) and is not being stopped.
protected bool IsStartedAndNotStopping()
Returns
- bool
A value indicating whether the consumer is started and not being stopped.
RevertConnectedStatus()
Called when the connection is lost to transitions the consumer back to Started.
protected void RevertConnectedStatus()
RollbackAsync(IBrokerMessageIdentifier)
public ValueTask RollbackAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
Parameters
brokerMessageIdentifierIBrokerMessageIdentifierThe identifier of the message to be rolled back.
Returns
RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)
public ValueTask RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
Parameters
brokerMessageIdentifiersIReadOnlyCollection<IBrokerMessageIdentifier>The identifiers of to messages be rolled back.
Returns
RollbackCoreAsync(IReadOnlyCollection<TIdentifier>)
If necessary, notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.
protected abstract ValueTask RollbackCoreAsync(IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers)
Parameters
brokerMessageIdentifiersIReadOnlyCollection<TIdentifier>The identifiers of to message be rolled back.
Returns
SetConnectedStatus()
Called when fully connected to transitions the consumer to Connected.
protected void SetConnectedStatus()
StartAsync()
Starts consuming. Used after StopAsync(bool) has been called to resume consuming.
public ValueTask StartAsync()
Returns
StartCoreAsync()
Starts consuming. Called to resume consuming after StopAsync(bool) has been called.
protected abstract ValueTask StartCoreAsync()
Returns
StopAsync(bool)
Stops the consumer without disconnecting. Can be used to pause and resume consuming.
public ValueTask StopAsync(bool waitUntilStopped = true)
Parameters
waitUntilStoppedboolA value indicating whether the method should wait until the consumer has been effectively stopped.
Returns
- ValueTask
A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent.
StopCoreAsync()
Stops consuming while staying connected to the message broker.
protected abstract ValueTask StopCoreAsync()
Returns
TriggerReconnectAsync()
Stops the consumer and starts an asynchronous Task to disconnect and reconnect it.
public ValueTask TriggerReconnectAsync()
Returns
- ValueTask
A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent, while the process will be completed in another asynchronous Task.
Remarks
This is used to recover when the consumer is stuck in state where it's not able to rollback or commit anymore.
WaitUntilConsumingStoppedCoreAsync()
Waits until the consuming is stopped.
protected abstract ValueTask WaitUntilConsumingStoppedCoreAsync()