Class SequenceBase<TEnvelope>
A set of logically related messages, like the chunks belonging to the same message or the messages in a dataset.
Inherited Members
Namespace: Silverback.Messaging.Sequences
Assembly: Silverback.Integration.dll
Syntax
public abstract class SequenceBase<TEnvelope> : ISequence, IDisposable where TEnvelope : IRawInboundEnvelope
Type Parameters
| Name | Description |
|---|---|
| TEnvelope |
Constructors
SequenceBase(string, ConsumerPipelineContext, bool, TimeSpan?, IMessageStreamProvider?, bool)
Initializes a new instance of the SequenceBase<TEnvelope> class.
Declaration
protected SequenceBase(string sequenceId, ConsumerPipelineContext context, bool enforceTimeout = true, TimeSpan? timeout = null, IMessageStreamProvider? streamProvider = null, bool trackIdentifiers = true)
Parameters
| Type | Name | Description |
|---|---|---|
| string | sequenceId | The identifier that is used to match the consumed messages with their belonging sequence. |
| ConsumerPipelineContext | context | The current ConsumerPipelineContext, assuming that it will be the one from which the sequence gets published via the message bus. |
| bool | enforceTimeout | Specifies whether the timeout has to be enforced. |
| TimeSpan? | timeout | The timeout to be applied. If not specified the value of |
| IMessageStreamProvider | streamProvider | The IMessageStreamProvider to be pushed. A new one will be created if not provided. |
| bool | trackIdentifiers | Specifies whether the message identifiers have to be collected to be used for the commit later on. |
Properties
AbortException
Gets the exception that caused the abort, if any.
Declaration
public Exception? AbortException { get; }
Property Value
| Type | Description |
|---|---|
| Exception |
Remarks
This property is filled only when AbortReason is Error.
AbortReason
Gets the reason of the abort.
Declaration
public SequenceAbortReason AbortReason { get; }
Property Value
| Type | Description |
|---|---|
| SequenceAbortReason |
Context
Gets the ConsumerPipelineContext related to the processing of this sequence (usually the context of the first message that initiated the sequence).
Declaration
public ConsumerPipelineContext Context { get; }
Property Value
| Type | Description |
|---|---|
| ConsumerPipelineContext |
IsAborted
Gets a value indicating whether the sequence processing has been aborted and no further message will be pushed.
Declaration
public bool IsAborted { get; }
Property Value
| Type | Description |
|---|---|
| bool |
IsBeingConsumed
Gets a value indicating whether the sequence is being consumed. This value is set to true as soon as
CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>?) is called and the output IMessageStreamEnumerable<TMessage> is created.
Declaration
public bool IsBeingConsumed { get; }
Property Value
| Type | Description |
|---|---|
| bool |
IsComplete
Gets a value indicating whether all messages belonging to the sequence have been pushed and processed.
Declaration
public bool IsComplete { get; }
Property Value
| Type | Description |
|---|---|
| bool |
IsCompleting
Gets a value indicating whether all messages belonging to the sequence have been pushed and the last message is currently being processed.
Declaration
public bool IsCompleting { get; }
Property Value
| Type | Description |
|---|---|
| bool |
IsNew
Gets a value indicating whether the first message in the sequence was consumed and this instance was just created.
Declaration
public bool IsNew { get; }
Property Value
| Type | Description |
|---|---|
| bool |
IsPending
Gets a value indicating whether the sequence is incomplete and awaiting for new messages to be pushed.
Declaration
public bool IsPending { get; }
Property Value
| Type | Description |
|---|---|
| bool |
Length
Gets the length of the sequence so far.
Declaration
public int Length { get; protected set; }
Property Value
| Type | Description |
|---|---|
| int |
ParentSequence
Gets the ISequence that contain this sequence (e.g. the ChunkSequence whose aggregated message is added to a BatchSequence).
Declaration
public ISequence? ParentSequence { get; }
Property Value
| Type | Description |
|---|---|
| ISequence |
ProcessingCompletedTask
Gets a Task that will complete when the processing is completed (including commit/rollback of the transaction).
Declaration
public Task ProcessingCompletedTask { get; }
Property Value
| Type | Description |
|---|---|
| Task |
SequenceId
Gets the identifier that is used to match the consumed messages with their belonging sequence.
Declaration
public string SequenceId { get; }
Property Value
| Type | Description |
|---|---|
| string |
SequencerBehaviorsTask
Gets a Task that completes when the sequence went through both behaviors. This is necessary to synchronize completion when mixing Chunking with another sequence.
Declaration
public Task SequencerBehaviorsTask { get; }
Property Value
| Type | Description |
|---|---|
| Task |
Sequences
Gets the ISequence that were added to this sequence (e.g. the ChunkSequence whose aggregated message is added to a BatchSequence).
Declaration
public IReadOnlyCollection<ISequence> Sequences { get; }
Property Value
| Type | Description |
|---|---|
| IReadOnlyCollection<ISequence> |
StreamProvider
Gets the IMessageStreamProvider that will be pushed with the messages belonging to the sequence.
Declaration
public IMessageStreamProvider StreamProvider { get; }
Property Value
| Type | Description |
|---|---|
| IMessageStreamProvider |
TotalLength
Gets the declared total length of the sequence, if known.
Declaration
public int? TotalLength { get; protected set; }
Property Value
| Type | Description |
|---|---|
| int? |
Methods
AbortAsync(SequenceAbortReason, Exception?)
Aborts the sequence processing. Used, for example, to signal that an exception occurred or the enumeration returned prematurely.
Declaration
public ValueTask AbortAsync(SequenceAbortReason reason, Exception? exception = null)
Parameters
| Type | Name | Description |
|---|---|---|
| SequenceAbortReason | reason | The abort reason. |
| Exception | exception | The exception that caused the abort if an exception was thrown. |
Returns
| Type | Description |
|---|---|
| ValueTask | A ValueTask representing the asynchronous operation. |
AddAsync(IRawInboundEnvelope, ISequence?, bool)
Adds the message to the sequence.
Declaration
public ValueTask<AddToSequenceResult> AddAsync(IRawInboundEnvelope envelope, ISequence? sequence, bool throwIfUnhandled)
Parameters
| Type | Name | Description |
|---|---|---|
| IRawInboundEnvelope | envelope | The envelope to be added to the sequence. |
| ISequence | sequence | The sequence to be added to the sequence. |
| bool | throwIfUnhandled | A boolean value indicating whether an exception must be thrown if no subscriber is handling the message. |
Returns
| Type | Description |
|---|---|
| ValueTask<AddToSequenceResult> | A ValueTask<TResult> representing the asynchronous operation. The task result contains a flag indicating whether the operation was successful and the number of streams that have been actually pushed. |
AddCoreAsync(TEnvelope, ISequence?, bool)
Adds the message to the sequence.
Declaration
protected virtual ValueTask<AddToSequenceResult> AddCoreAsync(TEnvelope envelope, ISequence? sequence, bool throwIfUnhandled)
Parameters
| Type | Name | Description |
|---|---|---|
| TEnvelope | envelope | The envelope to be added to the sequence. |
| ISequence | sequence | The sequence to be added to the sequence. |
| bool | throwIfUnhandled | A boolean value indicating whether an exception must be thrown if no subscriber is handling the message. |
Returns
| Type | Description |
|---|---|
| ValueTask<AddToSequenceResult> | A ValueTask<TResult> representing the asynchronous operation. The task result contains a flag indicating whether the operation was successful and the number of streams that have been actually pushed. |
CompleteAsync(CancellationToken)
Marks the sequence as complete, meaning no more messages will be pushed.
Declaration
protected virtual ValueTask CompleteAsync(CancellationToken cancellationToken = default)
Parameters
| Type | Name | Description |
|---|---|---|
| CancellationToken | cancellationToken | A CancellationToken used to cancel the operation. |
Returns
| Type | Description |
|---|---|
| ValueTask | A Task representing the asynchronous operation. |
CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>?)
Creates a IMessageStreamEnumerable<TMessage> that will be pushed with the messages belonging to the sequence.
Declaration
public IMessageStreamEnumerable<TMessage> CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>? filters = null)
Parameters
| Type | Name | Description |
|---|---|---|
| IReadOnlyCollection<IMessageFilter> | filters | The filters to be applied. |
Returns
| Type | Description |
|---|---|
| IMessageStreamEnumerable<TMessage> |
Type Parameters
| Name | Description |
|---|---|
| TMessage | The type of the messages to be streamed. |
Dispose()
A set of logically related messages, like the chunks belonging to the same message or the messages in a dataset.
Declaration
public void Dispose()
Dispose(bool)
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
protected virtual void Dispose(bool disposing)
Parameters
| Type | Name | Description |
|---|---|---|
| bool | disposing | A value indicating whether the method has been called by the |
GetCommitIdentifiers()
Gets the identifiers to be used to commit after successful processing.
Declaration
public IReadOnlyCollection<IBrokerMessageIdentifier> GetCommitIdentifiers()
Returns
| Type | Description |
|---|---|
| IReadOnlyCollection<IBrokerMessageIdentifier> | The identifiers to be used to commit. |
GetRollbackIdentifiers()
Gets the identifiers to be used to roll back in case of error.
Declaration
public IReadOnlyCollection<IBrokerMessageIdentifier> GetRollbackIdentifiers()
Returns
| Type | Description |
|---|---|
| IReadOnlyCollection<IBrokerMessageIdentifier> | The identifiers to be used to rollback. |
IsLastMessage(TEnvelope)
Implements the logic to recognize the last message in the sequence without relying on the TotalCount property.
Declaration
protected virtual bool IsLastMessage(TEnvelope envelope)
Parameters
| Type | Name | Description |
|---|---|---|
| TEnvelope | envelope | The envelope to be added to the sequence. |
Returns
| Type | Description |
|---|---|
| bool |
|
OnTimeoutElapsedAsync()
Called when the timout is elapsed. If not overridden in a derived class, the default implementation aborts the sequence.
Declaration
protected virtual ValueTask OnTimeoutElapsedAsync()
Returns
| Type | Description |
|---|---|
| ValueTask | A Task representing the asynchronous operation. |