Multiple Clients (in same process)
In some cases you may want to subscribe multiple times the same consumed message, to perform independent tasks. Having multiple subscribers handling the very same message is not a good idea since a failure in one of them will cause the message to be consumed again and thus reprocessed by all subscribers.
A much safer approach is to bind multiple consumers to the same topic, using a different client id. This will cause the message to be consumed multiple times (once per client) and being committed independently. The MqttClientIdFilterAttribute can be used to execute a subscribed method according to the client id.
public class MyEndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder) =>
builder
.AddMqttEndpoints(endpoints => endpoints
.Configure(
config => config
.ConnectViaTcp("localhost"))
.AddInbound(endpoint => endpoint
.Configure(config => config.WithClientId("client1"))
.ConsumeFrom("document-events"))
.AddInbound(endpoint => endpoint
.Configure(config => config.WithClientId("client2"))
.ConsumeFrom("document-events")));
}
Note
The filters can be added dynamically using the overloads of AddSubscriber
accepting a SubscriptionOptions or TypeSubscriptionOptions and this allows you to use a variable for the client id.
.AddSingletonSubscriber<MySubscriber>(
new TypeSubscriptionOptions
{
Filters = new[]
{
new MqttClientIdFilterAttribute("client1")
}
})
Using the MqttClientIdFilterAttribute is the cleanest and easiest approach but alternatively you can always subscribe to the IInboundEnvelope<TMessage> and perform different tasks according to the ClientId
value.
public class MySubscriber
{
public void OnMessageReceived(IInboundEnvelope<MyEvent> envelope)
{
switch (((MqttConsumerEndpoint)envelope.Endpoint).Configuration.ClientId)
{
case "client1":
PerformTask1(envelope.Message);
break;
case "client2":
PerformTask2(envelope.Message);
break;
}
}
private void PerformTask1(MyEvent @event) => ...
private void PerformTask2(MyEvent @event) => ...
}