Inbound Endpoint
An inbound endpoint is used to configure Silverback to automatically consume a topic/queue and relay the messages to the internal bus. If no exception is thrown by the subscribers, the message is acknowledged and the next one is consumed.
The endpoint object identifies the topic/queue that is being connected and the client configuration, such the connection options. The endpoint object is therefore very specific and every broker type will define it's own implementation of IConsumerEndpoint.
The options in the endpoint object are also used to tweak the Silverback behavior (e.g. the deserialization) and to enable additional features such as batch processing, decryption, etc.
Note
Silverback abstracts the message broker completely and the messages are automatically acknowledged if the subscribers complete without throwing an exception.
Apache Kafka
The KafkaConsumerEndpoint is defined by Silverback.Integration.Kafka and is used to declare an inbound endpoint connected to Apache Kafka.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events", "inventory-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
config.AutoOffsetReset = AutoOffsetResetType.Earliest;
}
.OnError(policy => policy.Retry(5))));
}
Note
You can decide whether to use one consumer per topic or subscribe multiple topics with the same consumer (passing multiple topic names in the endpoint constructor, as shown in the example above). There are advantages and disadvantages of both solutions and the best choice really depends on your specific requirements, the amount of messages being produced, etc. Anyway the main difference is that when subscribing multiple topics you will still consume one message after the other but they will simply be interleaved (this may or may not be an issue, it depends) and on the other hand each consumer will use some resources, so creating multiple consumers will result in a bigger overhead.
Note
For a more in-depth documentation about the Kafka client configuration refer also to the confluent-kafka-dotnet documentation.
MQTT
The MqttConsumerEndpoint is defined by Silverback.Integration.MQTT and is used to declare an inbound endpoint connected to an MQTT broker.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddMqttEndpoints(endpoints => endpoints
.Configure(
config => config
.WithClientId("order-service")
.ConnectViaTcp("localhost")
.SendLastWillMessage(
lastWill => lastWill
.Message(new TestamentMessage())
.ProduceTo("testaments")))
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events", "inventory-events")
.WithQualityOfServiceLevel(
MqttQualityOfServiceLevel.AtLeastOnce)
.OnError(policy => policy.Retry(5))));
}
Note
It doesn't matter how you configure the inbound and outbound endpoints, a single client will be created as long as all endpoints match the exact same configuration. (Using a slightly different configuration for the same client it will cause an exception to be thrown when validating the endpoints configuration.)
Note
For a more in-depth documentation about the MQTT client configuration refer also to the MQTTNet documentation.
RabbitMQ
Silverback.Integration.RabbitMQ is a bit more intricate and uses 2 different classes to specify an endpoint that connects to a queue (RabbitQueueConsumerEndpoint) or directly to an exchange (RabbitExchangeConsumerEndpoint).
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddInbound(
new RabbitQueueConsumerEndpoint("inventory-commands-queue")
{
Connection = new RabbitConnectionConfig
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
},
Queue = new RabbitQueueConfig
{
IsDurable = true,
IsExclusive = false,
IsAutoDeleteEnabled = false
}
})
.AddInbound(
new RabbitExchangeConsumerEndpoint("order-events")
{
Connection = new RabbitConnectionConfig
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
},
Exchange = new RabbitExchangeConfig
{
IsDurable = true,
IsAutoDeleteEnabled = false,
ExchangeType = ExchangeType.Fanout
},
QueueName = "my-consumer-group",
Queue = new RabbitQueueConfig
{
IsDurable = true,
IsExclusive = false,
IsAutoDeleteEnabled = false
}
});
}
Note
For a more in-depth documentation about the RabbitMQ configuration refer to the RabbitMQ tutorials and documentation.
Error handling
If an exceptions is thrown by the methods consuming the incoming messages (subscribers) the consumer will stop, unless some error policies are defined.
The built-in policies are:
- StopConsumerErrorPolicy (default)
- SkipMessageErrorPolicy
- RetryErrorPolicy
- MoveMessageErrorPolicy
- ErrorPolicyChain
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events", "inventory-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
config.AutoOffsetReset = AutoOffsetResetType.Earliest;
})
.OnError(policy => policy
.Retry(3, TimeSpan.FromSeconds(1))
.ThenSkip())));
}
Important
If the processing still fails after the last policy is applied the exception will be returned to the consumer, causing it to stop.
Important
The number of attempts are tracked according to the message id header. A message id must be provided in order for the MaxFailedAttempts
mechanism to work. This is ensured by the Silverback producer but might not be the case when consuming messages coming from other sources.
Some message broker implementations might transparently cope with the missing message id header and derive it from other identifiers (e.g. the kafka message key) but it's not automatically guaranteed that they will always be unique. You should carefully check that before relying on this feature.
Important
The RetryErrorPolicy will prevent the message broker to be polled for the duration of the configured delay, which could lead to a timeout. With Kafka you should for example set the max.poll.interval.ms
settings to an higher value.
Apply rules
Use ApplyTo and Exclude methods to decide which exceptions must be handled by the error policy or take advantage of ApplyWhen to specify a custom apply rule.
.OnError(policy => policy
.MoveToKafkaTopic(
moveEndpoint => moveEndpoint.ProduceTo("some-other-topic"),
movePolicy => movePolicy
.ApplyTo<MyException>()
.ApplyWhen((msg, ex) => msg.Xy == myValue))
.ThenSkip());
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events", "inventory-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.OnError(policy => policy
.MoveToKafkaTopic(
moveEndpoint => moveEndpoint.ProduceTo("some-other-topic"),
movePolicy => movePolicy
.ApplyTo<MyException>()
.ApplyWhen((msg, ex) => msg.Xy == myValue))
.ThenSkip())));
}
Publishing events
Messages can be published when a policy is applied, in order to execute custom code.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events", "inventory-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.OnError(policy => policy
.Retry(3, TimeSpan.FromSeconds(1))
.ThenSkip(skipPolicy => skipPolicy
.Publish(msg => new ProcessingFailedEvent(msg))))));
}
Batch processing
In some scenario, when having to deal with huge amounts of messages, processing each one of them on its own isn't the most efficient approach. Batch processing allow to process an arbitrary number of unrelated messages as a single unit of work.
Refer to the BatchSettings documentation for details about the configuration.
The batch can be subscribed either as IEnumerable
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddInbound(endpoint => endpoint
.ConsumeFrom("inventory-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.EnableBatchProcessing(100, TimeSpan.FromSeconds(5))));
}
Parallelism
The consumer processes the messages sequentially, this is by design.
The KafkaConsumer is a bit special and actually processes each assigned partition independently and concurrently.
This feature can be toggled using the ProcessAllPartitionsTogether and ProcessPartitionsIndependently methods of the IKafkaConsumerEndpointBuilder (or the KafkaConsumerEndpoint.ProcessPartitionsIndependently property), while the LimitParallelism method (or the KafkaConsumerEndpoint.MaxDegreeOfParallelism property) can be used to limit the number of messages being actually processed concurrently.
Exactly-once processing
Silverback is able to keep track of the messages that have been consumed in order to guarantee that each message is processed exactly once.
Offset storage
The OffsetStoreExactlyOnceStrategy will store the offset of the latest processed message (of each topic/partition) into a database table.
Note
The Silverback.Core.EntityFrameworkCore package is also required and the DbContext
must include a DbSet
of StoredOffset. See also the Sample DbContext (EF Core).
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.UseDbContext<MyDbContext>()
.WithConnectionToMessageBroker(options => options
.AddKafka()
.AddOffsetStoreDatabaseTable())
.AddEndpointsConfigurator<MyEndpointsConfigurator>();
}
}
Inbound log
The LogExactlyOnceStrategy will store the identifiers of all processed messages into a database table.
Note
The Silverback.Core.EntityFrameworkCore package is also required and the DbContext
must include a DbSet
of InboundLogEntry. See also the Sample DbContext (EF Core).
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.UseDbContext<MyDbContext>()
.WithConnectionToMessageBroker(options => options
.AddKafka()
.AddInboundLogDatabaseTable())
.AddEndpointsConfigurator<MyEndpointsConfigurator>();
}
}
Custom store
At the moment only a database accessed using Entity Framework is supported as offset or log storage, but a custom storage can be used implementing IOffsetStore or IInboundLog.
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.UseDbContext<MyDbContext>()
.WithConnectionToMessageBroker(options => options
.AddKafka()
.AddOffsetStore<MyCustomOffsetStore>())
.AddEndpointsConfigurator<MyEndpointsConfigurator>();
}
}