MQTT - Basic
This sample implements the simplest possible producer and consumer.
See also: Connecting to a Message Broker, Producing Messages, Consuming Messages
Common
The message being exchanged is defined in a common project.
namespace Silverback.Samples.Mqtt.Basic.Common;
public class SampleMessage
{
public int Number { get; set; }
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Mqtt/Basic.Common
Producer
The producer uses a hosted service to publish some messages in the background.
using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;
namespace Silverback.Samples.Mqtt.Basic.Producer;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services.AddSilverback()
// Use Apache Mqtt as message broker
.WithConnectionToMessageBroker(
options => options
.AddMqtt())
// Delegate the broker clients configuration to a separate class
.AddBrokerClientsConfigurator<BrokerClientsConfigurator>();
// Add the hosted service that produces the random sample messages
services.AddHostedService<ProducerBackgroundService>();
}
public void Configure()
{
}
}
using Silverback.Messaging.Configuration;
using Silverback.Samples.Mqtt.Basic.Common;
namespace Silverback.Samples.Mqtt.Basic.Producer;
public class BrokerClientsConfigurator : IBrokerClientsConfigurator
{
public void Configure(BrokerClientsConfigurationBuilder builder)
{
builder
.AddMqttClients(
clients => clients
// Configure connection
.ConnectViaTcp("localhost")
// Add an MQTT client
.AddClient(
client => client
.WithClientId("samples.basic.producer")
// Produce the SampleMessage to the samples/basic topic
.Produce<SampleMessage>(
endpoint => endpoint
.ProduceTo("samples/basic")
.WithAtLeastOnceQoS()
.Retain()
.LogNoMatchingSubscribersWarning())));
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Silverback.Messaging.Publishing;
using Silverback.Samples.Mqtt.Basic.Common;
namespace Silverback.Samples.Mqtt.Basic.Producer;
public class ProducerBackgroundService : BackgroundService
{
private readonly IPublisher _publisher;
private readonly ILogger<ProducerBackgroundService> _logger;
public ProducerBackgroundService(
IPublisher publisher,
ILogger<ProducerBackgroundService> logger)
{
_publisher = publisher;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
int number = 0;
while (!stoppingToken.IsCancellationRequested)
{
await ProduceMessageAsync(++number);
await Task.Delay(100, stoppingToken);
}
}
private async Task ProduceMessageAsync(int number)
{
try
{
await _publisher.PublishAsync(
new SampleMessage
{
Number = number
});
_logger.LogInformation("Produced {Number}", number);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to produce {Number}", number);
}
}
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Mqtt/Basic.Producer
Consumer
The consumer processes the messages and outputs their value to the standard output.
using Microsoft.Extensions.DependencyInjection;
using Silverback.Configuration;
using Silverback.Messaging.Configuration;
namespace Silverback.Samples.Mqtt.Basic.Consumer;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// Enable Silverback
services.AddSilverback()
// Use Apache Mqtt as message broker
.WithConnectionToMessageBroker(
options => options
.AddMqtt())
// Delegate the broker clients configuration to a separate class
.AddBrokerClientsConfigurator<BrokerClientsConfigurator>()
// Register the subscribers
.AddSingletonSubscriber<SampleMessageSubscriber>();
}
public void Configure()
{
}
}
using Silverback.Messaging.Configuration;
using Silverback.Samples.Mqtt.Basic.Common;
namespace Silverback.Samples.Mqtt.Basic.Consumer;
public class BrokerClientsConfigurator : IBrokerClientsConfigurator
{
public void Configure(BrokerClientsConfigurationBuilder builder)
{
builder
.AddMqttClients(
clients => clients
// Configure connection
.ConnectViaTcp("localhost")
// Add an MQTT client
.AddClient(
client => client
.WithClientId("samples.basic.consumer")
// Send last will message if disconnecting ungracefully
.SendLastWillMessage<TestamentMessage>(
lastWill => lastWill
.SendMessage(new TestamentMessage())
.ProduceTo("samples/testaments"))
// Consume the samples/basic topic
.Consume(
endpoint => endpoint
.ConsumeFrom("samples/basic")
.WithAtLeastOnceQoS()
// Silently skip the messages in case of exception
.OnError(policy => policy.Skip()))));
}
}
using Microsoft.Extensions.Logging;
using Silverback.Samples.Mqtt.Basic.Common;
namespace Silverback.Samples.Mqtt.Basic.Consumer;
public class SampleMessageSubscriber
{
private readonly ILogger<SampleMessageSubscriber> _logger;
public SampleMessageSubscriber(ILogger<SampleMessageSubscriber> logger)
{
_logger = logger;
}
public void OnMessageReceived(SampleMessage message) =>
_logger.LogInformation("Received {MessageNumber}", message.Number);
}
Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Mqtt/Basic.Consumer