Outbound Endpoint
An outbound endpoint is used to configure silverback to automatically relay the integration messages that ate published to the internal bus to the message broker. Multiple outbound endpoints can be configured and Silverback will route the messages according to their type or a custom routing logic.
The endpoint object identifies the topic/queue that is being connected and the client configuration, such the connection options. The endpoint object is therefore very specific and every broker type will define it's own implementation of IProducerEndpoint.
The options in the endpoint object are also used to tweak the Silverback behavior (e.g. the serialization) and to enable additional features such as chunking, encryption, etc.
Apache Kafka
The KafkaProducerEndpoint is defined by Silverback.Integration.Kafka and is used to declare an outbound endpoint connected to Apache Kafka.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddKafkaEndpoints(endpoints => endpoints
.Configure(config =>
{
config.BootstrapServers = "PLAINTEXT://kafka:9092";
})
.AddOutbound<IIntegrationEvent>(endpoint => endpoint
.ProduceTo("order-events")
.EnableChunking(500000)
.ProduceToOutbox()));
}
Note
For a more in-depth documentation about the Kafka client configuration refer also to the confluent-kafka-dotnet documentation.
MQTT
The MqttProducerEndpoint is defined by Silverback.Integration.MQTT and is used to declare an outbound endpoint connected to an MQTT broker.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddMqttEndpoints(endpoints => endpoints
.Configure(
config => config
.WithClientId("order-service")
.ConnectViaTcp("localhost")
.SendLastWillMessage(
lastWill => lastWill
.Message(new TestamentMessage())
.ProduceTo("testaments")))
.AddOutbound<IIntegrationEvent>(endpoint => endpoint
.ProduceTo("order-events")
.WithQualityOfServiceLevel(
MqttQualityOfServiceLevel.AtLeastOnce)
.Retain()));
}
Note
For a more in-depth documentation about the MQTT client configuration refer also to the MQTTNet documentation.
RabbitMQ
Silverback.Integration.RabbitMQ is a bit more intricate and uses 2 different classes to specify an endpoint that connects to a queue (RabbitQueueProducerEndpoint) or directly to an exchange (RabbitExchangeProducerEndpoint).
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddOutbound<IIntegrationEvent>(
new RabbitQueueProducerEndpoint("inventory-commands-queue")
{
Connection = new RabbitConnectionConfig
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
},
Queue = new RabbitQueueConfig
{
IsDurable = true,
IsExclusive = false,
IsAutoDeleteEnabled = false
}
})
.AddOutbound<IIntegrationEvent>(
new RabbitExchangeProducerEndpoint("order-events")
{
Connection = new RabbitConnectionConfig
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
},
Exchange = new RabbitExchangeConfig
{
IsDurable = true,
IsAutoDeleteEnabled = false,
ExchangeType = ExchangeType.Fanout
}
});
}
Note
For a more in-depth documentation about the RabbitMQ configuration refer to the RabbitMQ tutorials and documentation.
Transactional outbox strategy
The transactional outbox pattern purpose is to reliably update the database and publish the messages in the same atomic transaction. This is achieved storing the outbound messages into a temporary outbox table, whose changes are committed together with the other changes to the rest of the data.
When using entity framework the outbound messages are stored into a DbSet
and are therefore implicitly saved in the same transaction used to save all other changes.
Note
The Silverback.Core.EntityFrameworkCore package is also required and the DbContext
must include a DbSet
of OutboxMessage. See also the Sample DbContext (EF Core).
Important
The current OutboxWorker cannot scale horizontally and starting multiple instances will cause the messages to be produced multiple times. In the following example a distributed lock (stored in the database) is used to ensure that only one instance is running and another one will immediately take over when it stops (the DbContext
must include a DbSet
of Lock as well, see also the Sample DbContext (EF Core)).
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.UseDbContext<MyDbContext>()
// Setup the lock manager using the database
// to handle the distributed locks.
// If this line is omitted the OutboundWorker will still
// work without locking.
.AddDbDistributedLockManager()
.WithConnectionToMessageBroker(options => options
.AddKafka()
.AddOutboxDatabaseTable()
.AddOutboxWorker())
.AddEndpointsConfigurator<MyEndpointsConfigurator>();
}
}
Custom outbox
You can easily use another kind of storage as outbox, simply creating your own IOutboxWriter and IOutboxReader implementations.
At the moment only a database table accessed using Entity Framework is supported as outbox, but a custom storage can be used implementing IOutboxWriter and IOutboxReader.
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.UseDbContext<MyDbContext>()
.AddDbDistributedLockManager()
.WithConnectionToMessageBroker(options => options
.AddKafka()
.AddOutbox<MyCustomOutboxWriter, MyCustomOutboxReader()
.AddOutboxWorker())
.AddEndpointsConfigurator<MyEndpointsConfigurator>();
}
}
Subscribing locally
The published messages that are routed to an outbound endpoint cannot be subscribed locally (within the same process), unless explicitly desired.
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddSilverback()
.AddDbDistributedLockManager()
.WithConnectionToMessageBroker(options => options
.AddKafka())
.AddEndpointsConfigurator<MyEndpointsConfigurator>()
.PublishOutboundMessagesToInternalBus();
}
}
Note
What said above is only partially true, as you can subscribe to the wrapped message (IOutboundEnvelope<TMessage>) even without calling PublishOutboundMessagesToInternalBus
.
Producing the same message to multiple endpoints
An outbound route can point to multiple endpoints resulting in a broadcast to all endpoints.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder)
{
builder
.AddOutbound<IIntegrationCommand>(
new KafkaProducerEndpoint("topic-1")
{
...
},
new KafkaProducerEndpoint("topic-2")
{
...
}));
}
}
A message will also be routed to all outbound endpoint mapped to a type compatible with the message type. In the example below an OrderCreatedMessage
(that inherits from OrderMessage
) would be sent to both endpoints.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder)
{
builder
.AddOutbound<OrderMessage>(
new KafkaProducerEndpoint("topic-1")
{
...
})
.AddOutbound<OrderCreatedMessage>(
new KafkaProducerEndpoint("topic-2")
{
...
}));
}
}
Dynamic custom routing
By default Silverback routes the messages according to their type and the static configuration defined at startup. In some cases you may need more flexibility, being able to apply your own routing rules. More information in the dedicated Outbound Messages Routing chapter.