Table of Contents

Class SequenceBase<TEnvelope>

Namespace
Silverback.Messaging.Sequences
Assembly
Silverback.Integration.dll

A set of logically related messages, like the chunks belonging to the same message or the messages in a dataset.

public abstract class SequenceBase<TEnvelope> : ISequence, IDisposable where TEnvelope : IRawInboundEnvelope

Type Parameters

TEnvelope
Inheritance
SequenceBase<TEnvelope>
Implements
Derived
Inherited Members

Constructors

SequenceBase(string, ConsumerPipelineContext, bool, TimeSpan?, IMessageStreamProvider?, bool)

Initializes a new instance of the SequenceBase<TEnvelope> class.

protected SequenceBase(string sequenceId, ConsumerPipelineContext context, bool enforceTimeout = true, TimeSpan? timeout = null, IMessageStreamProvider? streamProvider = null, bool trackIdentifiers = true)

Parameters

sequenceId string

The identifier that is used to match the consumed messages with their belonging sequence.

context ConsumerPipelineContext

The current ConsumerPipelineContext, assuming that it will be the one from which the sequence gets published via the message bus.

enforceTimeout bool

Specifies whether the timeout has to be enforced.

timeout TimeSpan?

The timeout to be applied. If not specified the value of Endpoint.Sequence.Timeout will be used.

streamProvider IMessageStreamProvider

The IMessageStreamProvider to be pushed. A new one will be created if not provided.

trackIdentifiers bool

Specifies whether the message identifiers have to be collected to be used for the commit later on.

Properties

AbortException

Gets the exception that caused the abort, if any.

public Exception? AbortException { get; }

Property Value

Exception

Remarks

This property is filled only when AbortReason is Error.

AbortReason

Gets the reason of the abort.

public SequenceAbortReason AbortReason { get; }

Property Value

SequenceAbortReason

Context

Gets the ConsumerPipelineContext related to the processing of this sequence (usually the context of the first message that initiated the sequence).

public ConsumerPipelineContext Context { get; }

Property Value

ConsumerPipelineContext

IsAborted

Gets a value indicating whether the sequence processing has been aborted and no further message will be pushed.

public bool IsAborted { get; }

Property Value

bool

IsBeingConsumed

Gets a value indicating whether the sequence is being consumed. This value is set to true as soon as CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>?) is called and the output IMessageStreamEnumerable<TMessage> is created.

public bool IsBeingConsumed { get; }

Property Value

bool

IsComplete

Gets a value indicating whether all messages belonging to the sequence have been pushed and processed.

public bool IsComplete { get; }

Property Value

bool

IsCompleting

Gets a value indicating whether all messages belonging to the sequence have been pushed and the last message is currently being processed.

public bool IsCompleting { get; }

Property Value

bool

IsNew

Gets a value indicating whether the first message in the sequence was consumed and this instance was just created.

public bool IsNew { get; }

Property Value

bool

IsPending

Gets a value indicating whether the sequence is incomplete and awaiting for new messages to be pushed.

public bool IsPending { get; }

Property Value

bool

Length

Gets the length of the sequence so far.

public int Length { get; protected set; }

Property Value

int

ParentSequence

Gets the ISequence that contain this sequence (e.g. the ChunkSequence whose aggregated message is added to a BatchSequence).

public ISequence? ParentSequence { get; }

Property Value

ISequence

ProcessingCompletedTask

Gets a Task that will complete when the processing is completed (including commit/rollback of the transaction).

public Task ProcessingCompletedTask { get; }

Property Value

Task

SequenceId

Gets the identifier that is used to match the consumed messages with their belonging sequence.

public string SequenceId { get; }

Property Value

string

SequencerBehaviorsTask

Gets a Task that completes when the sequence went through both behaviors. This is necessary to synchronize completion when mixing Chunking with another sequence.

public Task SequencerBehaviorsTask { get; }

Property Value

Task

Sequences

Gets the ISequence that were added to this sequence (e.g. the ChunkSequence whose aggregated message is added to a BatchSequence).

public IReadOnlyCollection<ISequence> Sequences { get; }

Property Value

IReadOnlyCollection<ISequence>

StreamProvider

Gets the IMessageStreamProvider that will be pushed with the messages belonging to the sequence.

public IMessageStreamProvider StreamProvider { get; }

Property Value

IMessageStreamProvider

TotalLength

Gets the declared total length of the sequence, if known.

public int? TotalLength { get; protected set; }

Property Value

int?

Methods

AbortAsync(SequenceAbortReason, Exception?)

Aborts the sequence processing. Used, for example, to signal that an exception occurred or the enumeration returned prematurely.

public ValueTask AbortAsync(SequenceAbortReason reason, Exception? exception = null)

Parameters

reason SequenceAbortReason

The abort reason.

exception Exception

The exception that caused the abort if an exception was thrown.

Returns

ValueTask

A ValueTask representing the asynchronous operation.

AddAsync(IRawInboundEnvelope, ISequence?, bool)

Adds the message to the sequence.

public Task<AddToSequenceResult> AddAsync(IRawInboundEnvelope envelope, ISequence? sequence, bool throwIfUnhandled)

Parameters

envelope IRawInboundEnvelope

The envelope to be added to the sequence.

sequence ISequence

The sequence to be added to the sequence.

throwIfUnhandled bool

A boolean value indicating whether an exception must be thrown if no subscriber is handling the message.

Returns

Task<AddToSequenceResult>

A Task<TResult> representing the asynchronous operation. The task result contains a flag indicating whether the operation was successful and the number of streams that have been actually pushed.

AddCoreAsync(TEnvelope, ISequence?, bool)

Adds the message to the sequence.

protected virtual Task<AddToSequenceResult> AddCoreAsync(TEnvelope envelope, ISequence? sequence, bool throwIfUnhandled)

Parameters

envelope TEnvelope

The envelope to be added to the sequence.

sequence ISequence

The sequence to be added to the sequence.

throwIfUnhandled bool

A boolean value indicating whether an exception must be thrown if no subscriber is handling the message.

Returns

Task<AddToSequenceResult>

A Task<TResult> representing the asynchronous operation. The task result contains a flag indicating whether the operation was successful and the number of streams that have been actually pushed.

CompleteAsync(CancellationToken)

Marks the sequence as complete, meaning no more messages will be pushed.

protected virtual ValueTask CompleteAsync(CancellationToken cancellationToken = default)

Parameters

cancellationToken CancellationToken

A CancellationToken used to cancel the operation.

Returns

ValueTask

A Task representing the asynchronous operation.

CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>?)

Creates a IMessageStreamEnumerable<TMessage> that will be pushed with the messages belonging to the sequence.

public IMessageStreamEnumerable<TMessage> CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>? filters = null)

Parameters

filters IReadOnlyCollection<IMessageFilter>

The filters to be applied.

Returns

IMessageStreamEnumerable<TMessage>

The IMessageStreamEnumerable<TMessage>.

Type Parameters

TMessage

The type of the messages to be streamed.

Dispose()

public void Dispose()

Dispose(bool)

Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.

protected virtual void Dispose(bool disposing)

Parameters

disposing bool

A value indicating whether the method has been called by the Dispose method and not from the finalizer.

GetCommitIdentifiers()

Gets the identifiers to be used to commit after successful processing.

public IReadOnlyCollection<IBrokerMessageIdentifier> GetCommitIdentifiers()

Returns

IReadOnlyCollection<IBrokerMessageIdentifier>

The identifiers to be used to commit.

GetRollbackIdentifiers()

Gets the identifiers to be used to roll back in case of error.

public IReadOnlyCollection<IBrokerMessageIdentifier> GetRollbackIdentifiers()

Returns

IReadOnlyCollection<IBrokerMessageIdentifier>

The identifiers to be used to rollback.

IsLastMessage(TEnvelope)

Implements the logic to recognize the last message in the sequence without relying on the TotalCount property.

protected virtual bool IsLastMessage(TEnvelope envelope)

Parameters

envelope TEnvelope

The envelope to be added to the sequence.

Returns

bool

true if it is the last message, otherwise false.

OnTimeoutElapsedAsync()

Called when the timout is elapsed. If not overridden in a derived class, the default implementation aborts the sequence.

protected virtual ValueTask OnTimeoutElapsedAsync()

Returns

ValueTask

A Task representing the asynchronous operation.