.net core集成MQTT服务端
程序作为MQTT的服务端,也是WebApi 接口地址,在Web页面中MQTTJS用的是Websocker协议,在Winfrom中用MQTT协议。导致程序需要启动两个端口。直接上代码
创建服务
引用包:MQTTnet,MQTTnet.AspNetCore,这包最新的5.*,我引用的是4.3.7.1207。5.*的包在接收消息的处理方式略微不同
MqttConfigServer
public class MqttConfigServer
{
/// <summary>
/// 客户端最大连接数
/// </summary>
public int MaxPendingPerClient { get; set; }
/// <summary>
/// Mqtt端口
/// </summary>
public int MqttPort { get; set; }
/// <summary>
/// MqttWs端口
/// </summary>
public int MqttWsPort { get; set; }
/// <summary>
/// MqttWs路径
/// </summary>
public string MqttWsPath { get; set; }
}
这是拓展的方法,添加mqtt服务
AppInfo.GetOptions() 这是获取配置文件
/// <summary>
/// 添加mqtt服务
/// </summary>
/// <param name="services"></param>
/// <param name="webHostBuilder"></param>
public static void AddMqttService(this IServiceCollection services, IWebHostBuilder webHostBuilder)
{
var mqttServerConfig = AppInfo.GetOptions<MqttConfigServer>();
webHostBuilder.ConfigureKestrel((context, serverOptions) =>
{
//配置Mqtt端口
serverOptions.ListenAnyIP(mqttServerConfig.MqttPort, options => options.UseMqtt());
//配置web端口
serverOptions.ListenAnyIP(mqttServerConfig.MqttWsPort);
});
//配置MQTT服务
services.AddHostedMqttServerWithServices(options =>
{
options.WithoutDefaultEndpoint();
options.WithMaxPendingMessagesPerClient(mqttServerConfig.MaxPendingPerClient);
options.WithKeepAlive();
});
services.AddMqttConnectionHandler();
services.AddConnections();
//注入单例mqtt侦听服务
services.AddSingleton<MyMqttService>();
}
MyMqttService
public class MyMqttService
{
public MyMqttService()
{ }
public Task Server_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
Console.WriteLine($"客户端:【{arg.SenderId}】,主题【{arg.ApplicationMessage.Topic}】收到消息:{System.Text.Encoding.Default.GetString(arg.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
}
/// <summary>
/// 客户端验证账号密码
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
Console.WriteLine($"{arg.ClientId} 上线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
return Task.CompletedTask;
}
/// <summary>
/// 侦听所有消息
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public async Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
Console.WriteLine($"侦听所有消息:【{arg.ClientId}】,主题【{arg.ApplicationMessage.Topic}】收到消息:{System.Text.Encoding.Default.GetString(arg.ApplicationMessage.PayloadSegment)}");
await Task.CompletedTask;
return;
}
/// <summary>
/// 客户端断开连接
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Console.WriteLine($"【{arg.ClientId}】 下线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
return Task.CompletedTask;
}
/// <summary>
/// 客户端上线
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Console.WriteLine($"{arg.ClientId} 上线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
return Task.CompletedTask;
}
/// <summary>
/// 客户端取消订阅
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
return Task.CompletedTask;
}
/// <summary>
/// 客户端订阅
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
return Task.CompletedTask;
}
/// <summary>
/// 客户端保留消息
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public Task Server_RetainedMessageChangedAsync(RetainedMessageChangedEventArgs arg)
{
Console.WriteLine($"收到消息:{arg.ClientId}");
return Task.CompletedTask;
}
}
在Program 添加mqtt 服务
//添加Mqtt服务
services.AddMqttService(builder.WebHost);
var mqttServerConfig = AppInfo.GetOptions<MqttConfigServer>();
//WsMqtt 路径
app.MapConnectionHandler<MqttConnectionHandler>(mqttServerConfig.MqttWsPath, http => http.WebSockets.SubProtocolSelector = protocolList => protocolList.FirstOrDefault() ?? string.Empty);
var mqttService = app.Services.GetRequiredService<MyMqttService>();
//加载MQTT回调
app.UseMqttServer(server =>
{
//验证账号密码
server.ValidatingConnectionAsync += mqttService.Server_ValidatingConnectionAsync;
//侦听为消费消息
server.ApplicationMessageNotConsumedAsync += mqttService.Server_ApplicationMessageNotConsumedAsync;
//侦听所有消息
server.InterceptingPublishAsync += mqttService.Server_InterceptingPublishAsync;
//客户端订阅消息
server.ClientSubscribedTopicAsync += mqttService.Server_ClientSubscribedTopicAsync;
//客户端取消订阅消息
server.ClientUnsubscribedTopicAsync += mqttService.Server_ClientUnsubscribedTopicAsync;
//客户端链接事件
server.ClientConnectedAsync += mqttService.Server_ClientConnectedAsync;
//客户端关闭事件
server.ClientDisconnectedAsync += mqttService.Server_ClientDisconnectedAsync;
//保留消息
server.RetainedMessageChangedAsync += mqttService.Server_RetainedMessageChangedAsync;
});
mqttconfig
{
"maxPendingPerClient": 1000, //客户端最大等待连接数
"mqttPort": 1883, //mqtt端口
"mqttWsPort": 8083, //mqtt websocket端口
"mqttWsPath": "/mqtt" //mqtt websocket路径
}
这样服务就完成,这里mqtt 服务并不影响WebApi 的接口,只是启动的两个端口
服务端主动推送消息
private IServiceProvider _serviceProvider;
public ClientControlController(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
/// <summary>
/// 服务端推送消息
/// </summary>
/// <returns></returns>
private async Task PushMsg(string topic, ClientControl control)
{
var server = _serviceProvider.GetService<MqttHostedServer>() as MqttServer;
//获取客户端
var client = (await server?.GetClientsAsync()).Where(m => m.Id == "HardwareService").FirstOrDefault();
//推送消息
var payload = JsonConvert.SerializeObject(control);
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)//主题
.WithPayload(payload)
.Build();
if (client != null)
{
await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(message)
{
SenderClientId = client.Id,
});
}
}