MQTT - Basic
This sample implements the simple possible producer and consumer.
See also: Connecting to a Message Broker
Common
The message being exchanged is defined in a common project.
namespace Silverback.Samples.Mqtt.Basic.Common
{
public class SampleMessage
{
public int Number { get; set; }
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Mqtt/Basic.Common
Producer
The producer uses a hosted service to publish some messages in the background.
using Microsoft.Extensions.DependencyInjection;
namespace Silverback.Samples.Mqtt.Basic.Producer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services
.AddSilverback()
// Use Apache Mqtt as message broker
.WithConnectionToMessageBroker(
options => options
.AddMqtt())
// Delegate the inbound/outbound endpoints configuration to a separate
// class.
.AddEndpointsConfigurator<EndpointsConfigurator>();
// Add the hosted service that produces the random sample messages
services.AddHostedService<ProducerBackgroundService>();
}
public void Configure()
{
}
}
}
using MQTTnet.Protocol;
using Silverback.Messaging.Configuration;
using Silverback.Samples.Mqtt.Basic.Common;
namespace Silverback.Samples.Mqtt.Basic.Producer
{
public class EndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder)
{
builder
.AddMqttEndpoints(
endpoints => endpoints
// Configure the client options
.Configure(
config => config
.WithClientId("samples.basic.producer")
.ConnectViaTcp("localhost"))
// Produce the SampleMessage to the samples-basic topic
.AddOutbound<SampleMessage>(
endpoint => endpoint
.ProduceTo("samples/basic")
.WithQualityOfServiceLevel(
MqttQualityOfServiceLevel.AtLeastOnce)
.Retain()));
}
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Silverback.Messaging.Broker;
using Silverback.Messaging.Publishing;
using Silverback.Samples.Mqtt.Basic.Common;
namespace Silverback.Samples.Mqtt.Basic.Producer
{
public class ProducerBackgroundService : BackgroundService
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<ProducerBackgroundService> _logger;
public ProducerBackgroundService(
IServiceScopeFactory serviceScopeFactory,
ILogger<ProducerBackgroundService> logger)
{
_serviceScopeFactory = serviceScopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Create a service scope and resolve the IPublisher
// (the IPublisher cannot be resolved from the root scope and cannot
// therefore be directly injected into the BackgroundService)
using var scope = _serviceScopeFactory.CreateScope();
var publisher = scope.ServiceProvider.GetRequiredService<IPublisher>();
var broker = scope.ServiceProvider.GetRequiredService<IBroker>();
int number = 0;
while (!stoppingToken.IsCancellationRequested)
{
// Check whether the connection has been established, since the
// BackgroundService will start immediately, before the application
// is completely bootstrapped
if (!broker.IsConnected)
{
await Task.Delay(100, stoppingToken);
continue;
}
await ProduceMessageAsync(publisher, ++number);
await Task.Delay(100, stoppingToken);
}
}
private async Task ProduceMessageAsync(IPublisher publisher, int number)
{
try
{
await publisher.PublishAsync(
new SampleMessage
{
Number = number
});
_logger.LogInformation("Produced {Number}", number);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to produce {Number}", number);
}
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Mqtt/Basic.Producer
Consumer
The consumer processes the messages and outputs their value to the standard output.
using Microsoft.Extensions.DependencyInjection;
namespace Silverback.Samples.Mqtt.Basic.Consumer
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services
.AddSilverback()
// Use Apache Mqtt as message broker
.WithConnectionToMessageBroker(
options => options
.AddMqtt())
// Delegate the inbound/outbound endpoints configuration to a separate
// class.
.AddEndpointsConfigurator<EndpointsConfigurator>()
// Register the subscribers
.AddSingletonSubscriber<SampleMessageSubscriber>();
}
public void Configure()
{
}
}
}
using MQTTnet.Protocol;
using Silverback.Messaging.Configuration;
using Silverback.Samples.Mqtt.Basic.Common;
namespace Silverback.Samples.Mqtt.Basic.Consumer
{
public class EndpointsConfigurator : IEndpointsConfigurator
{
public void Configure(IEndpointsConfigurationBuilder builder)
{
builder
.AddMqttEndpoints(
endpoints => endpoints
// Configure the client options
.Configure(
config => config
.WithClientId("samples.basic.consumer")
.ConnectViaTcp("localhost")
// Send last will message if disconnecting
// ungracefully
.SendLastWillMessage(
lastWill => lastWill
.Message(new TestamentMessage())
.ProduceTo("samples/testaments")))
// Consume the samples/basic topic
.AddInbound(
endpoint => endpoint
.ConsumeFrom("samples/basic")
.WithQualityOfServiceLevel(
MqttQualityOfServiceLevel.AtLeastOnce)
// Silently skip the messages in case of exception
.OnError(policy => policy.Skip())));
}
}
}
using Microsoft.Extensions.Logging;
using Silverback.Samples.Mqtt.Basic.Common;
namespace Silverback.Samples.Mqtt.Basic.Consumer
{
public class SampleMessageSubscriber
{
private readonly ILogger<SampleMessageSubscriber> _logger;
public SampleMessageSubscriber(ILogger<SampleMessageSubscriber> logger)
{
_logger = logger;
}
public void OnMessageReceived(SampleMessage message) =>
_logger.LogInformation("Received {MessageNumber}", message.Number);
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Mqtt/Basic.Consumer