Show / Hide Table of Contents

    Class Consumer<TIdentifier>

    Consumes from one or more endpoints and pushes the received messages via the message bus.

    Inheritance
    object
    Consumer<TIdentifier>
    KafkaConsumer
    MqttConsumer
    Implements
    IConsumer
    IDisposable
    Inherited Members
    object.GetType()
    object.MemberwiseClone()
    object.ToString()
    object.Equals(object)
    object.Equals(object, object)
    object.ReferenceEquals(object, object)
    object.GetHashCode()
    Namespace: Silverback.Messaging.Broker
    Assembly: Silverback.Integration.dll
    Syntax
    public abstract class Consumer<TIdentifier> : IConsumer, IDisposable where TIdentifier : class, IBrokerMessageIdentifier
    Type Parameters
    Name Description
    TIdentifier

    The type of the IBrokerMessageIdentifier used by the consumer implementation.

    Constructors

    Consumer(string, IBrokerClient, IReadOnlyCollection<ConsumerEndpointConfiguration>, IBrokerBehaviorsProvider<IConsumerBehavior>, IServiceProvider, ISilverbackLogger<IConsumer>)

    Initializes a new instance of the Consumer<TIdentifier> class.

    Declaration
    protected Consumer(string name, IBrokerClient client, IReadOnlyCollection<ConsumerEndpointConfiguration> endpointsConfiguration, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IServiceProvider serviceProvider, ISilverbackLogger<IConsumer> logger)
    Parameters
    Type Name Description
    string name

    The consumer name.

    IBrokerClient client

    The IBrokerClient.

    IReadOnlyCollection<ConsumerEndpointConfiguration> endpointsConfiguration

    The endpoints' configuration.

    IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider

    The IBrokerBehaviorsProvider<TBehavior>.

    IServiceProvider serviceProvider

    The IServiceProvider to be used to resolve the necessary services.

    ISilverbackLogger<IConsumer> logger

    The ISilverbackLogger.

    Properties

    Client

    Gets the related IBrokerClient.

    Declaration
    public IBrokerClient Client { get; }
    Property Value
    Type Description
    IBrokerClient

    DisplayName

    Gets the name to be displayed in the human-targeted output (e.g. logs, health checks result, etc.).

    Declaration
    public string DisplayName { get; }
    Property Value
    Type Description
    string
    Remarks

    The DisplayName is currently returning the Name but this might change in future implementations.

    EndpointsConfiguration

    Gets the endpoints configuration.

    Declaration
    public IReadOnlyCollection<ConsumerEndpointConfiguration> EndpointsConfiguration { get; }
    Property Value
    Type Description
    IReadOnlyCollection<ConsumerEndpointConfiguration>

    IsStarted

    Gets a value indicating whether the consumer is started.

    Declaration
    protected bool IsStarted { get; }
    Property Value
    Type Description
    bool

    IsStarting

    Gets a value indicating whether the consumer is starting (or connecting to start).

    Declaration
    protected bool IsStarting { get; }
    Property Value
    Type Description
    bool

    IsStopping

    Gets a value indicating whether the consumer is being stopped.

    Declaration
    protected bool IsStopping { get; }
    Property Value
    Type Description
    bool

    Name

    Gets the consumer name.

    Declaration
    public string Name { get; }
    Property Value
    Type Description
    string

    ServiceProvider

    Gets the IServiceProvider to be used to resolve the necessary services.

    Declaration
    protected IServiceProvider ServiceProvider { get; }
    Property Value
    Type Description
    IServiceProvider

    StatusInfo

    Gets the IConsumerStatusInfo containing the status details and basic statistics of this consumer.

    Declaration
    public IConsumerStatusInfo StatusInfo { get; }
    Property Value
    Type Description
    IConsumerStatusInfo

    Methods

    CommitAsync(IBrokerMessageIdentifier)

    Confirms that the specified message has been successfully processed. The acknowledgement will be sent to the message broker and the message will never be consumed again (by the same logical consumer / consumer group).
    Declaration
    public ValueTask CommitAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
    Parameters
    Type Name Description
    IBrokerMessageIdentifier brokerMessageIdentifier

    The identifier of the message to be committed.

    Returns
    Type Description
    ValueTask

    A Task representing the asynchronous operation.

    CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)

    Confirms that the specified messages have been successfully processed. The acknowledgement will be sent to the message broker, and the message will never be consumed again (by the same logical consumer / consumer group).
    Declaration
    public ValueTask CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers

    The identifiers of to message be committed.

    Returns
    Type Description
    ValueTask

    A Task representing the asynchronous operation.

    CommitCoreAsync(IReadOnlyCollection<TIdentifier>)

    Commits the specified messages sending the acknowledgement to the message broker.

    Declaration
    protected abstract ValueTask CommitCoreAsync(IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers

    The identifiers of to message be committed.

    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    Dispose()

    Consumes from one or more endpoints and pushes the received messages via the message bus.

    Declaration
    public void Dispose()

    Dispose(bool)

    Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.

    Declaration
    protected virtual void Dispose(bool disposing)
    Parameters
    Type Name Description
    bool disposing

    A value indicating whether the method has been called by the Dispose method and not from the finalizer.

    HandleMessageAsync(byte[]?, IReadOnlyCollection<MessageHeader>, ConsumerEndpoint, IBrokerMessageIdentifier, ISequenceStore)

    Handles the consumed message invoking each IConsumerBehavior in the pipeline.

    Declaration
    protected virtual ValueTask HandleMessageAsync(byte[]? message, IReadOnlyCollection<MessageHeader> headers, ConsumerEndpoint endpoint, IBrokerMessageIdentifier brokerMessageIdentifier, ISequenceStore sequenceStore)
    Parameters
    Type Name Description
    byte[] message

    The body of the consumed message.

    IReadOnlyCollection<MessageHeader> headers

    The headers of the consumed message.

    ConsumerEndpoint endpoint

    The endpoint from which the message was consumed.

    IBrokerMessageIdentifier brokerMessageIdentifier

    The identifier of the consumed message.

    ISequenceStore sequenceStore

    The ISequenceStore to be used.

    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    IncrementFailedAttempts(IRawInboundEnvelope)

    Increments the stored failed attempts count for the specified envelope.

    Declaration
    public int IncrementFailedAttempts(IRawInboundEnvelope envelope)
    Parameters
    Type Name Description
    IRawInboundEnvelope envelope

    The envelope.

    Returns
    Type Description
    int

    The current failed attempts count after the increment.

    IsStartedAndNotStopping()

    Gets a value indicating whether the consumer is started (or will start) and is not being stopped.

    Declaration
    protected bool IsStartedAndNotStopping()
    Returns
    Type Description
    bool

    A value indicating whether the consumer is started and not being stopped.

    RevertConnectedStatus()

    Called when the connection is lost to transitions the consumer back to Started.

    Declaration
    protected void RevertConnectedStatus()

    RollbackAsync(IBrokerMessageIdentifier)

    Notifies that an error occurred while processing the specified message. If necessary, the information will be sent to the message broker to ensure that the message will be consumed again.
    Declaration
    public ValueTask RollbackAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
    Parameters
    Type Name Description
    IBrokerMessageIdentifier brokerMessageIdentifier

    The identifier of the message to be rolled back.

    Returns
    Type Description
    ValueTask

    A Task representing the asynchronous operation.

    RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)

    Notifies that an error occured while processing the specified messages. If necessary the information will be sent to the message broker to ensure that the message will be re-processed.
    Declaration
    public ValueTask RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers

    The identifiers of to messages be rolled back.

    Returns
    Type Description
    ValueTask

    A Task representing the asynchronous operation.

    RollbackCoreAsync(IReadOnlyCollection<TIdentifier>)

    If necessary, notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.

    Declaration
    protected abstract ValueTask RollbackCoreAsync(IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<TIdentifier> brokerMessageIdentifiers

    The identifiers of to message be rolled back.

    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    SetConnectedStatus()

    Called when fully connected to transitions the consumer to Connected.

    Declaration
    protected void SetConnectedStatus()

    StartAsync()

    Starts consuming. Used after StopAsync(bool) has been called to resume consuming.

    Declaration
    public ValueTask StartAsync()
    Returns
    Type Description
    ValueTask

    A Task representing the asynchronous operation.

    StartCoreAsync()

    Starts consuming. Called to resume consuming after StopAsync(bool) has been called.

    Declaration
    protected abstract ValueTask StartCoreAsync()
    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    StopAsync(bool)

    Stops the consumer without disconnecting. Can be used to pause and resume consuming.

    Declaration
    public ValueTask StopAsync(bool waitUntilStopped = true)
    Parameters
    Type Name Description
    bool waitUntilStopped

    A value indicating whether the method should wait until the consumer has been effectively stopped.

    Returns
    Type Description
    ValueTask

    A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent.

    StopCoreAsync()

    Stops consuming while staying connected to the message broker.

    Declaration
    protected abstract ValueTask StopCoreAsync()
    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    TriggerReconnectAsync()

    Stops the consumer and starts an asynchronous Task to disconnect and reconnect it.

    Declaration
    public ValueTask TriggerReconnectAsync()
    Returns
    Type Description
    ValueTask

    A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent, while the process will be completed in another asynchronous Task.

    Remarks

    This is used to recover when the consumer is stuck in state where it's not able to rollback or commit anymore.

    WaitUntilConsumingStoppedCoreAsync()

    Waits until the consuming is stopped.

    Declaration
    protected abstract ValueTask WaitUntilConsumingStoppedCoreAsync()
    Returns
    Type Description
    ValueTask

    A ValueTask representing the asynchronous operation.

    Implements

    IConsumer
    IDisposable
    GitHub E-Mail
    ↑ Back to top © 2026 Sergio Aquilini