Class SequenceBase<TEnvelope>
- Namespace
- Silverback.Messaging.Sequences
- Assembly
- Silverback.Integration.dll
A set of logically related messages, like the chunks belonging to the same message or the messages in a dataset.
public abstract class SequenceBase<TEnvelope> : ISequence, IDisposable where TEnvelope : IRawInboundEnvelope
Type Parameters
TEnvelope
- Inheritance
-
SequenceBase<TEnvelope>
- Implements
- Derived
- Inherited Members
Constructors
SequenceBase(string, ConsumerPipelineContext, bool, TimeSpan?, IMessageStreamProvider?, bool)
Initializes a new instance of the SequenceBase<TEnvelope> class.
protected SequenceBase(string sequenceId, ConsumerPipelineContext context, bool enforceTimeout = true, TimeSpan? timeout = null, IMessageStreamProvider? streamProvider = null, bool trackIdentifiers = true)
Parameters
sequenceIdstringThe identifier that is used to match the consumed messages with their belonging sequence.
contextConsumerPipelineContextThe current ConsumerPipelineContext, assuming that it will be the one from which the sequence gets published via the message bus.
enforceTimeoutboolSpecifies whether the timeout has to be enforced.
timeoutTimeSpan?The timeout to be applied. If not specified the value of
Endpoint.Sequence.Timeoutwill be used.streamProviderIMessageStreamProviderThe IMessageStreamProvider to be pushed. A new one will be created if not provided.
trackIdentifiersboolSpecifies whether the message identifiers have to be collected to be used for the commit later on.
Properties
AbortException
Gets the exception that caused the abort, if any.
public Exception? AbortException { get; }
Property Value
Remarks
This property is filled only when AbortReason is Error.
AbortReason
Gets the reason of the abort.
public SequenceAbortReason AbortReason { get; }
Property Value
Context
Gets the ConsumerPipelineContext related to the processing of this sequence (usually the context of the first message that initiated the sequence).
public ConsumerPipelineContext Context { get; }
Property Value
IsAborted
Gets a value indicating whether the sequence processing has been aborted and no further message will be pushed.
public bool IsAborted { get; }
Property Value
IsBeingConsumed
Gets a value indicating whether the sequence is being consumed. This value is set to true as soon as
CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>?) is called and the output IMessageStreamEnumerable<TMessage> is created.
public bool IsBeingConsumed { get; }
Property Value
IsComplete
Gets a value indicating whether all messages belonging to the sequence have been pushed and processed.
public bool IsComplete { get; }
Property Value
IsCompleting
Gets a value indicating whether all messages belonging to the sequence have been pushed and the last message is currently being processed.
public bool IsCompleting { get; }
Property Value
IsNew
Gets a value indicating whether the first message in the sequence was consumed and this instance was just created.
public bool IsNew { get; }
Property Value
IsPending
Gets a value indicating whether the sequence is incomplete and awaiting for new messages to be pushed.
public bool IsPending { get; }
Property Value
Length
Gets the length of the sequence so far.
public int Length { get; protected set; }
Property Value
ParentSequence
Gets the ISequence that contain this sequence (e.g. the ChunkSequence whose aggregated message is added to a BatchSequence).
public ISequence? ParentSequence { get; }
Property Value
ProcessingCompletedTask
Gets a Task that will complete when the processing is completed (including commit/rollback of the transaction).
public Task ProcessingCompletedTask { get; }
Property Value
SequenceId
Gets the identifier that is used to match the consumed messages with their belonging sequence.
public string SequenceId { get; }
Property Value
SequencerBehaviorsTask
Gets a Task that completes when the sequence went through both behaviors. This is necessary to synchronize completion when mixing Chunking with another sequence.
public Task SequencerBehaviorsTask { get; }
Property Value
Sequences
Gets the ISequence that were added to this sequence (e.g. the ChunkSequence whose aggregated message is added to a BatchSequence).
public IReadOnlyCollection<ISequence> Sequences { get; }
Property Value
StreamProvider
Gets the IMessageStreamProvider that will be pushed with the messages belonging to the sequence.
public IMessageStreamProvider StreamProvider { get; }
Property Value
TotalLength
Gets the declared total length of the sequence, if known.
public int? TotalLength { get; protected set; }
Property Value
- int?
Methods
AbortAsync(SequenceAbortReason, Exception?)
Aborts the sequence processing. Used, for example, to signal that an exception occurred or the enumeration returned prematurely.
public ValueTask AbortAsync(SequenceAbortReason reason, Exception? exception = null)
Parameters
reasonSequenceAbortReasonThe abort reason.
exceptionExceptionThe exception that caused the abort if an exception was thrown.
Returns
AddAsync(IRawInboundEnvelope, ISequence?, bool)
Adds the message to the sequence.
public Task<AddToSequenceResult> AddAsync(IRawInboundEnvelope envelope, ISequence? sequence, bool throwIfUnhandled)
Parameters
envelopeIRawInboundEnvelopeThe envelope to be added to the sequence.
sequenceISequenceThe sequence to be added to the sequence.
throwIfUnhandledboolA boolean value indicating whether an exception must be thrown if no subscriber is handling the message.
Returns
- Task<AddToSequenceResult>
A Task<TResult> representing the asynchronous operation. The task result contains a flag indicating whether the operation was successful and the number of streams that have been actually pushed.
AddCoreAsync(TEnvelope, ISequence?, bool)
Adds the message to the sequence.
protected virtual Task<AddToSequenceResult> AddCoreAsync(TEnvelope envelope, ISequence? sequence, bool throwIfUnhandled)
Parameters
envelopeTEnvelopeThe envelope to be added to the sequence.
sequenceISequenceThe sequence to be added to the sequence.
throwIfUnhandledboolA boolean value indicating whether an exception must be thrown if no subscriber is handling the message.
Returns
- Task<AddToSequenceResult>
A Task<TResult> representing the asynchronous operation. The task result contains a flag indicating whether the operation was successful and the number of streams that have been actually pushed.
CompleteAsync(CancellationToken)
Marks the sequence as complete, meaning no more messages will be pushed.
protected virtual ValueTask CompleteAsync(CancellationToken cancellationToken = default)
Parameters
cancellationTokenCancellationTokenA CancellationToken used to cancel the operation.
Returns
CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>?)
Creates a IMessageStreamEnumerable<TMessage> that will be pushed with the messages belonging to the sequence.
public IMessageStreamEnumerable<TMessage> CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>? filters = null)
Parameters
filtersIReadOnlyCollection<IMessageFilter>The filters to be applied.
Returns
- IMessageStreamEnumerable<TMessage>
Type Parameters
TMessageThe type of the messages to be streamed.
Dispose()
public void Dispose()
Dispose(bool)
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
protected virtual void Dispose(bool disposing)
Parameters
disposingboolA value indicating whether the method has been called by the
Disposemethod and not from the finalizer.
GetCommitIdentifiers()
Gets the identifiers to be used to commit after successful processing.
public IReadOnlyCollection<IBrokerMessageIdentifier> GetCommitIdentifiers()
Returns
- IReadOnlyCollection<IBrokerMessageIdentifier>
The identifiers to be used to commit.
GetRollbackIdentifiers()
Gets the identifiers to be used to roll back in case of error.
public IReadOnlyCollection<IBrokerMessageIdentifier> GetRollbackIdentifiers()
Returns
- IReadOnlyCollection<IBrokerMessageIdentifier>
The identifiers to be used to rollback.
IsLastMessage(TEnvelope)
Implements the logic to recognize the last message in the sequence without relying on the TotalCount property.
protected virtual bool IsLastMessage(TEnvelope envelope)
Parameters
envelopeTEnvelopeThe envelope to be added to the sequence.
Returns
- bool
trueif it is the last message, otherwisefalse.
OnTimeoutElapsedAsync()
Called when the timout is elapsed. If not overridden in a derived class, the default implementation aborts the sequence.
protected virtual ValueTask OnTimeoutElapsedAsync()