当前位置: 首页 > article >正文

.net core集成MQTT服务端

程序作为MQTT的服务端,也是WebApi 接口地址,在Web页面中MQTTJS用的是Websocker协议,在Winfrom中用MQTT协议。导致程序需要启动两个端口。直接上代码

创建服务

引用包:MQTTnet,MQTTnet.AspNetCore,这包最新的5.*,我引用的是4.3.7.1207。5.*的包在接收消息的处理方式略微不同

MqttConfigServer

  public class MqttConfigServer
  {
      /// <summary>
      /// 客户端最大连接数
      /// </summary>
      public int MaxPendingPerClient { get; set; }
      /// <summary>
      /// Mqtt端口
      /// </summary>
      public int MqttPort { get; set; }
      /// <summary>
      /// MqttWs端口
      /// </summary>
      public int MqttWsPort { get; set; }

      /// <summary>
      /// MqttWs路径
      /// </summary>
      public string MqttWsPath { get; set; }
  }

这是拓展的方法,添加mqtt服务
AppInfo.GetOptions() 这是获取配置文件

 /// <summary>
 /// 添加mqtt服务
 /// </summary>
 /// <param name="services"></param>
 /// <param name="webHostBuilder"></param>
 public static void AddMqttService(this IServiceCollection services, IWebHostBuilder webHostBuilder)
 {
     var mqttServerConfig = AppInfo.GetOptions<MqttConfigServer>();

     webHostBuilder.ConfigureKestrel((context, serverOptions) =>
     {
         //配置Mqtt端口
         serverOptions.ListenAnyIP(mqttServerConfig.MqttPort, options => options.UseMqtt());
         //配置web端口
         serverOptions.ListenAnyIP(mqttServerConfig.MqttWsPort);
     });

     //配置MQTT服务
     services.AddHostedMqttServerWithServices(options =>
     {
         options.WithoutDefaultEndpoint();
         options.WithMaxPendingMessagesPerClient(mqttServerConfig.MaxPendingPerClient);
         options.WithKeepAlive();
     });
     services.AddMqttConnectionHandler();
     services.AddConnections();
	 //注入单例mqtt侦听服务
     services.AddSingleton<MyMqttService>();
 }

MyMqttService

public class MyMqttService
{
    public MyMqttService()
    { }
    public Task Server_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
    {
        Console.WriteLine($"客户端:【{arg.SenderId}】,主题【{arg.ApplicationMessage.Topic}】收到消息:{System.Text.Encoding.Default.GetString(arg.ApplicationMessage.PayloadSegment)}");
        return Task.CompletedTask;
    }
    /// <summary>
    /// 客户端验证账号密码
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
    {
        Console.WriteLine($"{arg.ClientId} 上线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
        return Task.CompletedTask;
    }
    /// <summary>
    /// 侦听所有消息
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public async Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
    {
        Console.WriteLine($"侦听所有消息:【{arg.ClientId}】,主题【{arg.ApplicationMessage.Topic}】收到消息:{System.Text.Encoding.Default.GetString(arg.ApplicationMessage.PayloadSegment)}");

        await Task.CompletedTask;
        return;
    }
    /// <summary>
    /// 客户端断开连接
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
    {
        Console.WriteLine($"【{arg.ClientId}】 下线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
        return Task.CompletedTask;
    }

    /// <summary>
    /// 客户端上线
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg)
    {
        Console.WriteLine($"{arg.ClientId} 上线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
        return Task.CompletedTask;
    }
    /// <summary>
    /// 客户端取消订阅
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
    {
        return Task.CompletedTask;
    }
    /// <summary>
    /// 客户端订阅
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
    {
        return Task.CompletedTask;
    }
    /// <summary>
    /// 客户端保留消息
    /// </summary>
    /// <param name="arg"></param>
    /// <returns></returns>
    public Task Server_RetainedMessageChangedAsync(RetainedMessageChangedEventArgs arg)
    {
        Console.WriteLine($"收到消息:{arg.ClientId}");
        return Task.CompletedTask;
    }
}

在Program 添加mqtt 服务

  //添加Mqtt服务
    services.AddMqttService(builder.WebHost);
    var mqttServerConfig = AppInfo.GetOptions<MqttConfigServer>();

  //WsMqtt 路径
  app.MapConnectionHandler<MqttConnectionHandler>(mqttServerConfig.MqttWsPath, http => http.WebSockets.SubProtocolSelector = protocolList => protocolList.FirstOrDefault() ?? string.Empty);
  var mqttService = app.Services.GetRequiredService<MyMqttService>();
  //加载MQTT回调
  app.UseMqttServer(server =>
  {
      //验证账号密码
      server.ValidatingConnectionAsync += mqttService.Server_ValidatingConnectionAsync;
      //侦听为消费消息
      server.ApplicationMessageNotConsumedAsync += mqttService.Server_ApplicationMessageNotConsumedAsync;
      //侦听所有消息
      server.InterceptingPublishAsync += mqttService.Server_InterceptingPublishAsync;
      //客户端订阅消息
      server.ClientSubscribedTopicAsync += mqttService.Server_ClientSubscribedTopicAsync;
      //客户端取消订阅消息
      server.ClientUnsubscribedTopicAsync += mqttService.Server_ClientUnsubscribedTopicAsync;
      //客户端链接事件
      server.ClientConnectedAsync += mqttService.Server_ClientConnectedAsync;
      //客户端关闭事件
      server.ClientDisconnectedAsync += mqttService.Server_ClientDisconnectedAsync;
      //保留消息
      server.RetainedMessageChangedAsync += mqttService.Server_RetainedMessageChangedAsync;
  });

mqttconfig

{
    "maxPendingPerClient": 1000, //客户端最大等待连接数
    "mqttPort": 1883, //mqtt端口
    "mqttWsPort": 8083, //mqtt websocket端口
    "mqttWsPath": "/mqtt" //mqtt websocket路径

  }

这样服务就完成,这里mqtt 服务并不影响WebApi 的接口,只是启动的两个端口

服务端主动推送消息

 private IServiceProvider _serviceProvider;
 public ClientControlController(IServiceProvider serviceProvider)
 {
     _serviceProvider = serviceProvider;
 }

  /// <summary>
  /// 服务端推送消息
  /// </summary>
  /// <returns></returns>
  private async Task PushMsg(string topic, ClientControl control)
  {
      var server = _serviceProvider.GetService<MqttHostedServer>() as MqttServer;
      //获取客户端
      var client = (await server?.GetClientsAsync()).Where(m => m.Id == "HardwareService").FirstOrDefault();
	  //推送消息
      var payload = JsonConvert.SerializeObject(control);

      var message = new MqttApplicationMessageBuilder()
          .WithTopic(topic)//主题
          .WithPayload(payload)
          .Build();

      if (client != null)
      {
          await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(message)
          {
              SenderClientId = client.Id,

          });
      }
  }

http://www.kler.cn/a/594932.html

相关文章:

  • 【c++】异常处理
  • 硬件与软件的边界-从单片机到linux的问答详解
  • 糊涂人寄信——递推
  • Unity动画片段丢失(AnimationClip),如何进行重新绑定
  • DApp+公链/主链+钱包+Swap开发西安区块链公司
  • 华为中小型企业项目案例
  • 理解Akamai EdgeGrid认证在REST API中的应用
  • 手机换IP有什么用?最新换IP方法
  • MySql 存储引擎 InnoDB 与 MyISAM 有什么区别
  • 车载以太网网络测试-18【传输层-DOIP协议-1】
  • 根据模板将 Exce 明细数据批量生成 PPT 文档|邮件合并
  • 搭建React简单项目
  • PCDN 在去中心化互联网中的角色
  • 前端知识-CSS(二)
  • python自动化脚本编写-处理文件、数据分析
  • Vue.js 中的 Memoization:提升性能的缓存技术
  • MySQL 创建用户,建库,建表
  • 【Qt】信号signal是单向的
  • 【数学建模】灰色关联分析模型详解与应用
  • Nginx之Basic Auth认证