Show / Hide Table of Contents

    Class Consumer

    Consumes an endpoint and invokes a callback delegate when a message is received.

    Inheritance
    object
    Consumer
    Consumer<TBroker, TEndpoint, TIdentifier>
    Implements
    IConsumer
    IDisposable
    Inherited Members
    object.Equals(object)
    object.Equals(object, object)
    object.GetHashCode()
    object.GetType()
    object.MemberwiseClone()
    object.ReferenceEquals(object, object)
    object.ToString()
    Namespace: Silverback.Messaging.Broker
    Assembly: Silverback.Integration.dll
    Syntax
    public abstract class Consumer : IConsumer, IDisposable

    Constructors

    | Improve this doc View source

    Consumer(IBroker, IConsumerEndpoint, IBrokerBehaviorsProvider<IConsumerBehavior>, IServiceProvider, ISilverbackLogger<Consumer>)

    Initializes a new instance of the Consumer class.

    Declaration
    protected Consumer(IBroker broker, IConsumerEndpoint endpoint, IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider, IServiceProvider serviceProvider, ISilverbackLogger<Consumer> logger)
    Parameters
    Type Name Description
    IBroker broker

    The IBroker that is instantiating the consumer.

    IConsumerEndpoint endpoint

    The endpoint to be consumed.

    IBrokerBehaviorsProvider<IConsumerBehavior> behaviorsProvider

    The IBrokerBehaviorsProvider<TBehavior>.

    IServiceProvider serviceProvider

    The IServiceProvider to be used to resolve the needed services.

    ISilverbackLogger<Consumer> logger

    The ISilverbackLogger.

    Properties

    | Improve this doc View source

    Broker

    Gets the IBroker that owns this consumer.

    Declaration
    public IBroker Broker { get; }
    Property Value
    Type Description
    IBroker
    | Improve this doc View source

    Endpoint

    Gets the IConsumerEndpoint representing the endpoint that is being consumed.

    Declaration
    public IConsumerEndpoint Endpoint { get; }
    Property Value
    Type Description
    IConsumerEndpoint
    | Improve this doc View source

    Id

    Gets the InstanceIdentifier uniquely identifying the consumer instance.

    Declaration
    public InstanceIdentifier Id { get; }
    Property Value
    Type Description
    InstanceIdentifier
    | Improve this doc View source

    IsConnected

    Gets a value indicating whether this consumer has successfully initialized the connection to the message broker.

    Declaration
    public bool IsConnected { get; }
    Property Value
    Type Description
    bool
    Remarks

    This doesn't necessary mean that it is connected and ready to consume. The underlying library might handle the connection process asynchronously in the background or the protocol might require extra steps (e.g. Kafka might require the partitions to be assigned).

    | Improve this doc View source

    IsConnecting

    Gets a value indicating whether this consumer is initializing the connection to the message broker.

    Declaration
    public bool IsConnecting { get; }
    Property Value
    Type Description
    bool
    | Improve this doc View source

    IsConsuming

    Gets a value indicating whether this consumer is connected and consuming (started).

    Declaration
    public bool IsConsuming { get; protected set; }
    Property Value
    Type Description
    bool
    | Improve this doc View source

    IsDisconnecting

    Gets a value indicating whether the consumer is being disconnected.

    Declaration
    public bool IsDisconnecting { get; }
    Property Value
    Type Description
    bool
    | Improve this doc View source

    IsStopping

    Gets a value indicating whether the consumer is being stopped.

    Declaration
    protected bool IsStopping { get; }
    Property Value
    Type Description
    bool
    | Improve this doc View source

    ServiceProvider

    Gets the IServiceProvider to be used to resolve the required services.

    Declaration
    protected IServiceProvider ServiceProvider { get; }
    Property Value
    Type Description
    IServiceProvider
    | Improve this doc View source

    StatusInfo

    Gets the IConsumerStatusInfo containing the status details and basic statistics of this consumer.

    Declaration
    public IConsumerStatusInfo StatusInfo { get; }
    Property Value
    Type Description
    IConsumerStatusInfo

    Methods

    | Improve this doc View source

    CommitAsync(IBrokerMessageIdentifier)

    Confirms that the specified message has been successfully processed. The acknowledgement will be sent to the message broker and the message will never be consumed again (by the same logical consumer / consumer group).
    Declaration
    public Task CommitAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
    Parameters
    Type Name Description
    IBrokerMessageIdentifier brokerMessageIdentifier

    The identifier of the message to be committed.

    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)

    Confirms that the specified messages have been successfully processed. The acknowledgement will be sent to the message broker and the message will never be consumed again (by the same logical consumer / consumer group).
    Declaration
    public Task CommitAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers

    The identifiers of to message be committed.

    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    CommitCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)

    Commits the specified messages sending the acknowledgement to the message broker.

    Declaration
    protected abstract Task CommitCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers

    The identifiers of to message be committed.

    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    ConnectAsync()

    Connects and starts consuming.

    Declaration
    public Task ConnectAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    ConnectCoreAsync()

    Connects to the message broker.

    Declaration
    protected abstract Task ConnectCoreAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    DisconnectAsync()

    Disconnects and stops consuming.

    Declaration
    public Task DisconnectAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    DisconnectCoreAsync()

    Disconnects from the message broker.

    Declaration
    protected abstract Task DisconnectCoreAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    Dispose()

    Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.

    Declaration
    public void Dispose()
    | Improve this doc View source

    Dispose(bool)

    Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.

    Declaration
    protected virtual void Dispose(bool disposing)
    Parameters
    Type Name Description
    bool disposing

    A value indicating whether the method has been called by the Dispose method and not from the finalizer.

    | Improve this doc View source

    GetCurrentSequenceStores()

    Gets the ISequenceStore instances used by this consumer. Some brokers will require multiple stores (e.g. the KafkaConsumer will create a store per each assigned partition).

    Declaration
    public abstract IReadOnlyList<ISequenceStore> GetCurrentSequenceStores()
    Returns
    Type Description
    IReadOnlyList<ISequenceStore>

    The list of ISequenceStore.

    Remarks

    Used only for testing and maintained to preserve backward compatibility.

    | Improve this doc View source

    HandleMessageAsync(byte[]?, IReadOnlyCollection<MessageHeader>, string, IBrokerMessageIdentifier, ISequenceStore)

    Handles the consumed message invoking each IConsumerBehavior in the pipeline.

    Declaration
    protected virtual Task HandleMessageAsync(byte[]? message, IReadOnlyCollection<MessageHeader> headers, string sourceEndpointName, IBrokerMessageIdentifier brokerMessageIdentifier, ISequenceStore sequenceStore)
    Parameters
    Type Name Description
    byte[] message

    The body of the consumed message.

    IReadOnlyCollection<MessageHeader> headers

    The headers of the consumed message.

    string sourceEndpointName

    The name of the actual endpoint (topic) where the message has been delivered.

    IBrokerMessageIdentifier brokerMessageIdentifier

    The identifier of the consumed message.

    ISequenceStore sequenceStore

    The ISequenceStore to be used.

    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    IncrementFailedAttempts(IRawInboundEnvelope)

    Increments the stored failed attempts count for the specified envelope.

    Declaration
    public int IncrementFailedAttempts(IRawInboundEnvelope envelope)
    Parameters
    Type Name Description
    IRawInboundEnvelope envelope

    The envelope.

    Returns
    Type Description
    int

    The current failed attempts count after the increment.

    | Improve this doc View source

    RevertReadyStatus()

    Called when the connection is lost to transitions the consumer back to Connected.

    Declaration
    protected void RevertReadyStatus()
    | Improve this doc View source

    RollbackAsync(IBrokerMessageIdentifier)

    Notifies that an error occured while processing the specified message. If necessary the information will be sent to the message broker to ensure that the message will be consumed again.
    Declaration
    public Task RollbackAsync(IBrokerMessageIdentifier brokerMessageIdentifier)
    Parameters
    Type Name Description
    IBrokerMessageIdentifier brokerMessageIdentifier

    The identifier of the message to be rolled back.

    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)

    Notifies that an error occured while processing the specified messages. If necessary the information will be sent to the message broker to ensure that the message will be re-processed.
    Declaration
    public Task RollbackAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers

    The identifiers of to message be rolled back.

    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    RollbackCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier>)

    If necessary notifies the message broker that the specified messages couldn't be processed successfully, to ensure that they will be consumed again.

    Declaration
    protected abstract Task RollbackCoreAsync(IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers)
    Parameters
    Type Name Description
    IReadOnlyCollection<IBrokerMessageIdentifier> brokerMessageIdentifiers

    The identifiers of to message be rolled back.

    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    SetReadyStatus()

    Called when fully connected to transitions the consumer to Ready.

    Declaration
    protected void SetReadyStatus()
    | Improve this doc View source

    StartAsync()

    Starts consuming. Used after StopAsync() has been called to resume consuming.

    Declaration
    public Task StartAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    StartCoreAsync()

    Starts consuming. Called to resume consuming after StopAsync() has been called.

    Declaration
    protected abstract Task StartCoreAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    StopAsync()

    Stops the consumer without disconnecting. Can be used to pause and resume consuming.

    Declaration
    public Task StopAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation. This Task will complete as soon as the stopping signal has been sent.

    | Improve this doc View source

    StopCoreAsync()

    Stops consuming while staying connected to the message broker.

    Declaration
    protected abstract Task StopCoreAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    TriggerReconnectAsync()

    Disconnects and stops consuming.

    Declaration
    public Task TriggerReconnectAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    | Improve this doc View source

    WaitUntilConsumingStoppedCoreAsync()

    Waits until the consuming is stopped.

    Declaration
    protected abstract Task WaitUntilConsumingStoppedCoreAsync()
    Returns
    Type Description
    Task

    A Task representing the asynchronous operation.

    Implements

    IConsumer
    IDisposable
    • Improve this doc
    • View source
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini