当前位置: 首页 > article >正文

MQTTnet.Server同时支持mqtt及websocket协议

Net6后写法

 Net6前写法

Program.cs

using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using MQTTnet.AspNetCore;
using System;
using System.IO;

namespace MQTTnet.Server
{
    public class Program
    {
        public static int WebPort { get; set; } 

        public static void Main(string[] args)
        {
            var configuration = new ConfigurationBuilder().SetBasePath(Directory.GetCurrentDirectory()).AddJsonFile("appsettings.json").Build();
            var appSettings = configuration.GetSection("AppSettings");
            WebPort = Convert.ToInt32(appSettings["WebPort"]);  
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args)
        { 
            return Host.CreateDefaultBuilder(args).ConfigureWebHostDefaults(webBuilder => {
                webBuilder.UseKestrel(
                        o =>
                        {
                            o.ListenAnyIP(2883, l => l.UseMqtt());
                            o.ListenAnyIP(WebPort); // http & websocket
                        });
                webBuilder.UseStartup<Startup>();
            });
        }
    }
}

Startup.cs

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MQTTnet.AspNetCore;
using MQTTnet.Server.Util.mqtt;
using System.Linq;

namespace MQTTnet.Server
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
             
            MQServer.UserName = "admin";
            MQServer.Password = "123456";

            services.AddHostedMqttServer(
               optionsBuilder =>
               {
                   optionsBuilder
                       .WithMaxPendingMessagesPerClient(10000)//限制每个客户端连接在 MQTT 代理上可以拥有的待处理消息数量的设置,默认值是100 
                       .WithKeepAlive();
               });

            services.AddMqttConnectionHandler();
            services.AddConnections();
            services.AddSingleton<MQServer>();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env, MQServer mqttController)
        { 
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            } 

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            { 
                endpoints.MapControllerRoute(name: "default", pattern: "{controller=Demo}/{action=Info}/{id?}");
                //
                endpoints.MapConnectionHandler<MqttConnectionHandler>(
                   "/mqtt",
                   httpConnectionDispatcherOptions => httpConnectionDispatcherOptions.WebSockets.SubProtocolSelector =
                       protocolList => protocolList.FirstOrDefault() ?? string.Empty);
            });

            app.UseMqttServer(
               server =>
               { 
                   server.ValidatingConnectionAsync += MQServer.ValidateConnectionAsync;
                   server.ClientConnectedAsync += MQServer.ClientConnectedAsync;
                   server.ClientDisconnectedAsync += MQServer.ClientDisconnectedAsync;
               });
        }
    }
}

MQServer.cs

using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Server.Util.mqtt
{
    public class MQServer
    { 
        public static string UserName { get; set; } = "admin";
        public static string Password { get; set; } = "123456"; 
        static readonly HashSet<string> clientIds = new();

        /// <summary>
        /// Validates the MQTT connection.
        /// </summary>
        /// <param name="args">The arguments.</param>
        public static Task ValidateConnectionAsync(ValidatingConnectionEventArgs args)
        {
            try
            {
                if (string.IsNullOrWhiteSpace(args.UserName))
                {
                    args.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    Logger.Error($"MQServer,身份校验失败(用户名为空),ClientId:{args.ClientId}");
                    return Task.CompletedTask;
                }

                if (clientIds.TryGetValue(args.ClientId, out var _))
                {
                    args.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
                    Logger.Error($"MQServer,身份校验失败(有相同clientid已连接),ClientId:{args.ClientId}");
                    return Task.CompletedTask;
                }

                if (args.UserName != UserName || args.Password != Password)
                {
                    args.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    Logger.Error($"MQServer,身份校验失败(用户名或密码错误),ClientId:{args.ClientId},UserName:{args.UserName},Password:{args.Password}");
                    return Task.CompletedTask;
                } 

                args.ReasonCode = MqttConnectReasonCode.Success; 
                return Task.CompletedTask;
            }
            catch (Exception ex)
            { 
                Logger.Error("MQServer,ValidateConnectionAsync", ex);
                return Task.FromException(ex);
            }
        }

        public static async Task ClientConnectedAsync(ClientConnectedEventArgs args)
        {
            Logger.Info($"MQServer,mqtt客户端上线,id:{args.ClientId},Endpoint:{args.Endpoint},ProtocolVersion:{args.ProtocolVersion}");
            clientIds.Add(args.ClientId); 
        }

        /// <summary>
        /// Handles the client connected event.
        /// </summary>
        /// <param name="args">The arguments.</param>
        public static async Task ClientDisconnectedAsync(ClientDisconnectedEventArgs args)
        {
            Logger.Error($"MQServer,mqtt客户端离线,id:{args.ClientId},Endpoint:{args.Endpoint},DisconnectType:{args.DisconnectType},ReasonString:{args.ReasonCode}");
            clientIds.Remove(args.ClientId); 
        }
    }
}


http://www.kler.cn/a/349274.html

相关文章:

  • k8s 1.28 集群部署
  • 力扣349.两个数组的交集
  • 2024年诺贝尔物理学奖授予机器学习与神经网络研究者的启示
  • 超酷大数据音乐推荐知识图谱AI智能问答可视化系统的设计与研发
  • vue2中vuex状态管理使用安装教程及多模块化拆分,包含大多项目常用用法
  • 企业数字化转型:打造数字资产开启创新与可持续发展之路
  • 需求8——通过一个小需求来体会AI如何帮助改bug
  • 【HarmonyOS】HMRouter使用详解(三)生命周期
  • 深度学习 CPU
  • CENTOS7安装MYSQL80(2024.10.15)
  • PMP敏捷专题课:敏捷原则与理念
  • Python中的数据可视化艺术:用Matplotlib和Seaborn讲故事
  • python 位运算 笔记
  • Docker 教程九 (Docker Dockerfile)
  • python pip安装requirements.txt依赖与国内镜像
  • MySQL的索引类型
  • Java Stream API flatMap()方法介绍
  • KEIL新建51工程
  • 科普向--什么是CI/CD
  • 使用API有效率地管理Dynadot域名,查看域名服务器(Name Server)列表