Streaming
The IMessageStreamEnumerable<TMessage> can be used to consume an endpoint in a streaming fashion and it is the only way to consume sequences (see for example batch processing). This stream will be forwarded to the subscribed method as soon as the first message is consumed and it is then asynchronously pushed with the next messages.
IMessageStreamEnumerable<TMessage> implements both IEnumerable
Since the asynchronous and I/O bound nature of this stream it is recommended to take advantage of the IAsyncEnumerable
public class StreamSubscriber
{
public async Task OnOrderStreamReceived(
IAsyncEnumerable<OrderEvent> eventsStream)
{
await foreach(var orderEvent in eventsStream)
{
// ...process the event...
}
}
}
A single instance of IMessageStreamEnumerable<TMessage> is created and published per each queue/topic/partition and the messages are acknowledged (committed) after a single iteration completes, unless sequencing (e.g. batch processing) is configured or a sequence is automatically recognized by Silverback (e.g. a dataset). In that case an instance is published per each sequence and the entire sequence is atomically committed.
Rx (Observable)
The Silverback.Core.Rx package adds the IMessageStreamObservable<TMessage> that works like the IMessageStreamEnumerable<TMessage> but implements IObservable
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddSilverback().AsObservable();
}
}
Notes, suggestions and insights
- The stream will be pushed with messages as they are read from the message broker. Since the I/O bound nature of the operation you should obviously prefer to subscribe to an IAsyncEnumerable
instead of an IEnumerable and in any case loop asynchronously ( await foreach
or similar approach). - If the sequence is interrupted because the application is disconnecting or an error occurred in another subscriber, the IEnumerator will throw an OperationCancelledException. Handle it if you need to gracefully abort or cleanup.
- Throwing an exception while enumerating a sequence (e.g. a BatchSequence) will cause it to be aborted and handled according to the defined error policies. If you just break the iteration and the subscriber return, the operation will be considered successful instead and the sequence will be committed.