Show / Hide Table of Contents

    MQTT - Basic

    This sample implements the simple possible producer and consumer.

    See also: Connecting to a Message Broker

    Common

    The message being exchanged is defined in a common project.

    namespace Silverback.Samples.Mqtt.Basic.Common
    {
        public class SampleMessage
        {
            public int Number { get; set; }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Mqtt/Basic.Common

    Producer

    The producer uses a hosted service to publish some messages in the background.

    • Startup
    • EndpointsConfigurator
    • Background Service
    using Microsoft.Extensions.DependencyInjection;
    
    namespace Silverback.Samples.Mqtt.Basic.Producer
    {
        public class Startup
        {
            public void ConfigureServices(IServiceCollection services)
            {
                // Enable Silverback
                services
                    .AddSilverback()
    
                    // Use Apache Mqtt as message broker
                    .WithConnectionToMessageBroker(
                        options => options
                            .AddMqtt())
    
                    // Delegate the inbound/outbound endpoints configuration to a separate
                    // class.
                    .AddEndpointsConfigurator<EndpointsConfigurator>();
    
                // Add the hosted service that produces the random sample messages
                services.AddHostedService<ProducerBackgroundService>();
            }
    
            public void Configure()
            {
            }
        }
    }
    
    using MQTTnet.Protocol;
    using Silverback.Messaging.Configuration;
    using Silverback.Samples.Mqtt.Basic.Common;
    
    namespace Silverback.Samples.Mqtt.Basic.Producer
    {
        public class EndpointsConfigurator : IEndpointsConfigurator
        {
            public void Configure(IEndpointsConfigurationBuilder builder)
            {
                builder
                    .AddMqttEndpoints(
                        endpoints => endpoints
    
                            // Configure the client options
                            .Configure(
                                config => config
                                    .WithClientId("samples.basic.producer")
                                    .ConnectViaTcp("localhost"))
    
                            // Produce the SampleMessage to the samples-basic topic
                            .AddOutbound<SampleMessage>(
                                endpoint => endpoint
                                    .ProduceTo("samples/basic")
                                    .WithQualityOfServiceLevel(
                                        MqttQualityOfServiceLevel.AtLeastOnce)
                                    .Retain()));
            }
        }
    }
    
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using Silverback.Messaging.Broker;
    using Silverback.Messaging.Publishing;
    using Silverback.Samples.Mqtt.Basic.Common;
    
    namespace Silverback.Samples.Mqtt.Basic.Producer
    {
        public class ProducerBackgroundService : BackgroundService
        {
            private readonly IServiceScopeFactory _serviceScopeFactory;
    
            private readonly ILogger<ProducerBackgroundService> _logger;
    
            public ProducerBackgroundService(
                IServiceScopeFactory serviceScopeFactory,
                ILogger<ProducerBackgroundService> logger)
            {
                _serviceScopeFactory = serviceScopeFactory;
                _logger = logger;
            }
    
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                // Create a service scope and resolve the IPublisher
                // (the IPublisher cannot be resolved from the root scope and cannot
                // therefore be directly injected into the BackgroundService)
                using var scope = _serviceScopeFactory.CreateScope();
                var publisher = scope.ServiceProvider.GetRequiredService<IPublisher>();
                var broker = scope.ServiceProvider.GetRequiredService<IBroker>();
    
                int number = 0;
    
                while (!stoppingToken.IsCancellationRequested)
                {
                    // Check whether the connection has been established, since the
                    // BackgroundService will start immediately, before the application
                    // is completely bootstrapped
                    if (!broker.IsConnected)
                    {
                        await Task.Delay(100, stoppingToken);
                        continue;
                    }
    
                    await ProduceMessageAsync(publisher, ++number);
    
                    await Task.Delay(100, stoppingToken);
                }
            }
    
            private async Task ProduceMessageAsync(IPublisher publisher, int number)
            {
                try
                {
                    await publisher.PublishAsync(
                        new SampleMessage
                        {
                            Number = number
                        });
    
                    _logger.LogInformation("Produced {Number}", number);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to produce {Number}", number);
                }
            }
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Mqtt/Basic.Producer

    Consumer

    The consumer processes the messages and outputs their value to the standard output.

    • Startup
    • EndpointsConfigurator
    • Subscriber
    using Microsoft.Extensions.DependencyInjection;
    
    namespace Silverback.Samples.Mqtt.Basic.Consumer
    {
        public class Startup
        {
            public void ConfigureServices(IServiceCollection services)
            {
                // Enable Silverback
                services
                    .AddSilverback()
    
                    // Use Apache Mqtt as message broker
                    .WithConnectionToMessageBroker(
                        options => options
                            .AddMqtt())
    
                    // Delegate the inbound/outbound endpoints configuration to a separate
                    // class.
                    .AddEndpointsConfigurator<EndpointsConfigurator>()
    
                    // Register the subscribers
                    .AddSingletonSubscriber<SampleMessageSubscriber>();
            }
    
            public void Configure()
            {
            }
        }
    }
    
    using MQTTnet.Protocol;
    using Silverback.Messaging.Configuration;
    using Silverback.Samples.Mqtt.Basic.Common;
    
    namespace Silverback.Samples.Mqtt.Basic.Consumer
    {
        public class EndpointsConfigurator : IEndpointsConfigurator
        {
            public void Configure(IEndpointsConfigurationBuilder builder)
            {
                builder
                    .AddMqttEndpoints(
                        endpoints => endpoints
    
                            // Configure the client options
                            .Configure(
                                config => config
                                    .WithClientId("samples.basic.consumer")
                                    .ConnectViaTcp("localhost")
    
                                    // Send last will message if disconnecting
                                    // ungracefully
                                    .SendLastWillMessage(
                                        lastWill => lastWill
                                            .Message(new TestamentMessage())
                                            .ProduceTo("samples/testaments")))
    
                            // Consume the samples/basic topic
                            .AddInbound(
                                endpoint => endpoint
                                    .ConsumeFrom("samples/basic")
                                    .WithQualityOfServiceLevel(
                                        MqttQualityOfServiceLevel.AtLeastOnce)
    
                                    // Silently skip the messages in case of exception
                                    .OnError(policy => policy.Skip())));
            }
        }
    }
    
    using Microsoft.Extensions.Logging;
    using Silverback.Samples.Mqtt.Basic.Common;
    
    namespace Silverback.Samples.Mqtt.Basic.Consumer
    {
        public class SampleMessageSubscriber
        {
            private readonly ILogger<SampleMessageSubscriber> _logger;
    
            public SampleMessageSubscriber(ILogger<SampleMessageSubscriber> logger)
            {
                _logger = logger;
            }
    
            public void OnMessageReceived(SampleMessage message) =>
                _logger.LogInformation("Received {MessageNumber}", message.Number);
        }
    }
    

    Full source code: https://github.com/BEagle1984/silverback/tree/master/samples/Mqtt/Basic.Consumer

    • Improve this doc
    GitHub E-Mail
    ↑ Back to top © 2020 Sergio Aquilini