Class SequenceBase<TEnvelope>
Represents a set of logically related messages, like the chunks belonging to the same message or the messages in a dataset.
Inherited Members
Namespace: Silverback.Messaging.Sequences
Assembly: Silverback.Integration.dll
Syntax
public abstract class SequenceBase<TEnvelope> : ISequence, IDisposable where TEnvelope : IRawInboundEnvelope
Type Parameters
Name | Description |
---|---|
TEnvelope |
Constructors
| Improve this doc View sourceSequenceBase(string, ConsumerPipelineContext, bool, TimeSpan?, IMessageStreamProvider?, bool)
Initializes a new instance of the SequenceBase<TEnvelope> class.
Declaration
protected SequenceBase(string sequenceId, ConsumerPipelineContext context, bool enforceTimeout = true, TimeSpan? timeout = null, IMessageStreamProvider? streamProvider = null, bool trackIdentifiers = true)
Parameters
Type | Name | Description |
---|---|---|
string | sequenceId | The identifier that is used to match the consumed messages with their belonging sequence. |
ConsumerPipelineContext | context | The current ConsumerPipelineContext, assuming that it will be the one from which the sequence gets published to the internal bus. |
bool | enforceTimeout | Specifies whether the timeout has to be enforced. |
TimeSpan? | timeout | The timeout to be applied. If not specified the value of |
IMessageStreamProvider | streamProvider | The IMessageStreamProvider to be pushed. A new one will be created if not provided. |
bool | trackIdentifiers | Specifies whether the message identifiers have to be collected, in order to be used for the commit later on. |
Properties
| Improve this doc View sourceAbortException
Gets the exception that caused the abort, if any.
Declaration
public Exception? AbortException { get; }
Property Value
Type | Description |
---|---|
Exception |
Remarks
This property is filled only when AbortReason is Error.
AbortReason
Gets the reason of the abort.
Declaration
public SequenceAbortReason AbortReason { get; }
Property Value
Type | Description |
---|---|
SequenceAbortReason |
Activity
Gets the Activity created for this sequence.
Declaration
public Activity? Activity { get; }
Property Value
Type | Description |
---|---|
Activity |
Context
Gets the ConsumerPipelineContext related to the processing of this sequence (usually the context of the first message that initiated the sequence).
Declaration
public ConsumerPipelineContext Context { get; }
Property Value
Type | Description |
---|---|
ConsumerPipelineContext |
IsAborted
Gets a value indicating whether the sequence processing has been aborted and no further message will be pushed.
Declaration
public bool IsAborted { get; }
Property Value
Type | Description |
---|---|
bool |
IsBeingConsumed
Gets a value indicating whether the sequence is being consumed. This value is set to true
as
soon as CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>?) is called and the output
IMessageStreamEnumerable<TMessage> is created.
Declaration
public bool IsBeingConsumed { get; }
Property Value
Type | Description |
---|---|
bool |
IsComplete
Gets a value indicating whether all messages belonging to the sequence have been pushed and processed.
Declaration
public bool IsComplete { get; }
Property Value
Type | Description |
---|---|
bool |
IsCompleting
Gets a value indicating whether all messages belonging to the sequence have been pushed and the last message is currently being processed.
Declaration
public bool IsCompleting { get; }
Property Value
Type | Description |
---|---|
bool |
IsNew
Gets a value indicating whether the first message in the sequence was consumed and this instance was just created.
Declaration
public bool IsNew { get; }
Property Value
Type | Description |
---|---|
bool |
IsPending
Gets a value indicating whether the sequence is incomplete and awaiting for new messages to be pushed.
Declaration
public bool IsPending { get; }
Property Value
Type | Description |
---|---|
bool |
Length
Gets the length of the sequence so far.
Declaration
public int Length { get; protected set; }
Property Value
Type | Description |
---|---|
int |
ParentSequence
Gets the ISequence that contain this sequence (e.g. the ChunkSequence whose aggregated message is added to a BatchSequence.
Declaration
public ISequence? ParentSequence { get; }
Property Value
Type | Description |
---|---|
ISequence |
ProcessingCompletedTask
Gets a Task that will complete when the processing is completed (including commit/rollback of the transaction).
Declaration
public Task ProcessingCompletedTask { get; }
Property Value
Type | Description |
---|---|
Task |
SequenceId
Gets the identifier that is used to match the consumed messages with their belonging sequence.
Declaration
public string SequenceId { get; }
Property Value
Type | Description |
---|---|
string |
SequencerBehaviorsTask
Gets a Task that completes when the sequence went through both behaviors. This is necessary to synchronize completion when mixing Chunking with another sequence.
Declaration
public Task SequencerBehaviorsTask { get; }
Property Value
Type | Description |
---|---|
Task |
Sequences
Gets the ISequence that were added to this sequence (e.g. the ChunkSequence whose aggregated message is added to a BatchSequence.
Declaration
public IReadOnlyCollection<ISequence> Sequences { get; }
Property Value
Type | Description |
---|---|
IReadOnlyCollection<ISequence> |
ShouldCreateNewActivity
Gets a value indicating whether this sequence should create a new activity. This should be true if the sequence contains independent messages.
Declaration
public bool ShouldCreateNewActivity { get; }
Property Value
Type | Description |
---|---|
bool |
StreamProvider
Gets the IMessageStreamProvider that will be pushed with the messages belonging to the sequence.
Declaration
public IMessageStreamProvider StreamProvider { get; }
Property Value
Type | Description |
---|---|
IMessageStreamProvider |
TotalLength
Gets the declared total length of the sequence, if known.
Declaration
public int? TotalLength { get; protected set; }
Property Value
Type | Description |
---|---|
int? |
Methods
| Improve this doc View sourceAbortAsync(SequenceAbortReason, Exception?)
Aborts the sequence processing. Used for example to signal that an exception occurred or the enumeration returned prematurely.
Declaration
public Task AbortAsync(SequenceAbortReason reason, Exception? exception = null)
Parameters
Type | Name | Description |
---|---|---|
SequenceAbortReason | reason | The abort reason. |
Exception | exception | The exception that caused the abort, if an exception was thrown. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
AddAsync(IRawInboundEnvelope, ISequence?, bool)
Adds the message to the sequence.
Declaration
public Task<AddToSequenceResult> AddAsync(IRawInboundEnvelope envelope, ISequence? sequence, bool throwIfUnhandled = true)
Parameters
Type | Name | Description |
---|---|---|
IRawInboundEnvelope | envelope | The envelope to be added to the sequence. |
ISequence | sequence | The sequence to be added to the sequence. |
bool | throwIfUnhandled | A boolean value indicating whether an exception must be thrown if no subscriber is handling the message. |
Returns
Type | Description |
---|---|
Task<AddToSequenceResult> | A Task<TResult> representing the asynchronous operation. The task result contains a flag indicating whether the operation was successful and the number of streams that have been actually pushed. |
AddCoreAsync(TEnvelope, ISequence?, bool)
Adds the message to the sequence.
Declaration
protected virtual Task<AddToSequenceResult> AddCoreAsync(TEnvelope envelope, ISequence? sequence, bool throwIfUnhandled)
Parameters
Type | Name | Description |
---|---|---|
TEnvelope | envelope | The envelope to be added to the sequence. |
ISequence | sequence | The sequence to be added to the sequence. |
bool | throwIfUnhandled | A boolean value indicating whether an exception must be thrown if no subscriber is handling the message. |
Returns
Type | Description |
---|---|
Task<AddToSequenceResult> | A Task<TResult> representing the asynchronous operation. The task result contains a flag indicating whether the operation was successful and the number of streams that have been actually pushed. |
CompleteAsync(CancellationToken)
Marks the sequence as complete, meaning no more messages will be pushed.
Declaration
protected virtual Task CompleteAsync(CancellationToken cancellationToken = default)
Parameters
Type | Name | Description |
---|---|---|
CancellationToken | cancellationToken | A CancellationToken used to cancel the operation. |
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |
CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>?)
Creates a IMessageStreamEnumerable<TMessage> that will be pushed with the messages belonging to the sequence.
Declaration
public IMessageStreamEnumerable<TMessage> CreateStream<TMessage>(IReadOnlyCollection<IMessageFilter>? filters = null)
Parameters
Type | Name | Description |
---|---|---|
IReadOnlyCollection<IMessageFilter> | filters | The filters to be applied. |
Returns
Type | Description |
---|---|
IMessageStreamEnumerable<TMessage> |
Type Parameters
Name | Description |
---|---|
TMessage | The type of the messages to be streamed. |
Dispose()
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
public void Dispose()
Dispose(bool)
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
Declaration
protected virtual void Dispose(bool disposing)
Parameters
Type | Name | Description |
---|---|---|
bool | disposing | A value indicating whether the method has been called by the |
GetBrokerMessageIdentifiers()
Gets the identifiers of the messages belonging to the sequence.
Declaration
public IReadOnlyList<IBrokerMessageIdentifier> GetBrokerMessageIdentifiers()
Returns
Type | Description |
---|---|
IReadOnlyList<IBrokerMessageIdentifier> | The list of identifiers. |
IsLastMessage(TEnvelope)
Implements the logic to recognize the last message in the sequence without relying on the TotalCount property.
Declaration
protected virtual bool IsLastMessage(TEnvelope envelope)
Parameters
Type | Name | Description |
---|---|---|
TEnvelope | envelope | The envelope to be added to the sequence. |
Returns
Type | Description |
---|---|
bool |
|
OnTimeoutElapsedAsync()
Called when the timout is elapsed. If not overridden in a derived class, the default implementation aborts the sequence.
Declaration
protected virtual Task OnTimeoutElapsedAsync()
Returns
Type | Description |
---|---|
Task | A Task representing the asynchronous operation. |