当前位置: 首页 > article >正文

物联网终端telegraf采集设备信息

背景

低功耗设备上资源有限,但又比较重要。对其的管理难度很大,有些时候又必须时刻了解其运行状况。我们自然想到的是能否有办法监控它呢?当时是有的!而且很成熟的解决方案。TICK技术栈,那TICK是什么呢?
TICK是由InfluxData开源的监控技术栈,由 Telegraf, InfluxDB, Chronograf, Kapacitor 四个工具的首字母组成。

  • Telegraf:go语言开发的数据采集工具;
  • InfluxDB:go语言开发的时序数据库;
  • Chronograf:数据可视化报表展示;
  • Kapacitor:时序数据的监控告警;
    在这里插入图片描述
    以上只是做个了解,不在本文重点。而且每个人遇到的工作场景不同,这样的架构也不一定适合自己。取其所需整合到自己的系统中,可以完美的满足业务需求最好。当然了解这个解决方案也是必须的。自己去花时间吧。

问题

在我的尝试中使用telegraf+influxdb+业务系统去跟踪并管理终端设备,通过交叉编译环境–现场设备–数据采集–数据入库–设备管理等正式环境运行后,欣喜的发现数据上来了。但是没过多久telegraf数据时断时续,最终挂掉而导致数据出现中断。个人模糊的感觉可能是入库时出现了什么我不知道的问题。通过抓包分析发现,其在对influxdb建立网络连接时网络不稳定,主程序进程占用的内存也多,多方面因数导致数据不能及时发出去最终撑爆了telegraf。也有一种通过限制telegraf连接失败时Telegraf将尝试重新连接InfluxDB的最大次数,influx_max_retry_interval参数指定了重试连接的最大时间间隔。但这样不能解决我数据丢失的问题呀。

架构图
项目摘取部分架构

解决

针对上面的问题决定替换直接连接influxdb的想法,利用终端本身与mqtt交互的方式,将数据回传到系统中。于是有了上面的截图。再一番修改后,数据到了server时又出现了问题。telegraf采集时,上报到mqtt过程中设置了数据格式为“json”,想着通过server转手时再做点自动分析处理的工作。然后再将数据直接入库!想法是好的,也是可行的。就是数据格式中出现了integer与float的交替出现导致了influxdb入库失败。查阅资料时发现可以直接配置数据的类型为influx,Telegraf数据格式配置
在这里插入图片描述

于是我修改配置如下:

root@lm70:/tffs0a# cat telegraf.conf
## 配置在采集终端的telegraf.conf,注意配置信息的正确性

# Telegraf Configuration
# Global tags can be specified here in key="value" format.
[global_tags]
  device="3"
# Configuration for telegraf agent
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  debug = false
  quiet = false
  hostname = ""
  omit_hostname = false

# CPU input plugin
[[inputs.cpu]]
  percpu = true
  totalcpu = true
  collect_cpu_time = false
  report_active = false
# Memory input plugin
[[inputs.mem]]
# Disk input plugin
[[inputs.disk]]
  ignore_fs = ["tmpfs", "devtmpfs", "devfs"]
# System input plugin (for boot time)
[[inputs.system]]
# Net input plugin (for network traffic)
[[inputs.net]]

# 配置influxdb信息,出现数据中断的,后改为下面的mqtt方式接收数据
# InfluxDB v2 output plugin
#[[outputs.influxdb_v2]]
#  urls = ["http://remote_ip:port"]
#  token = "token"
#  organization = "org"
#  bucket = "bucket"

# 配置mqtt的数据接收方式,第一次选择了data_format="json"的格式入库,出现了数据类型不匹配的情况,
# 当然可以自行处理,如果不想处理也是可以直接设置data_format=“influx”这种方式去规避
[[outputs.mqtt]]
  servers = ["tcp://remote_ip:port"]
  topic = "topic"
  qos = 1
  username = "admin"
  password = "passwd"
  data_format = "influx"

通过这么配置后,就能在server中直接转存到infludb中即可

# 对着上图的Server中运行的python代码片段

import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient
from concurrent.futures import ThreadPoolExecutor
from influxdb_client.client.write_api import SYNCHRONOUS

# 线程池
executor = ThreadPoolExecutor(max_workers=20)
# Influxdb连接信息,此处版本2.x
client = InfluxDBClient(url=ENV_INFLUX_URL, token=ENV_INFLUX_TOKEN, org=ENV_INFLUX_ORG, timezone='Asia/Shanghai')
write_api = client.write_api(write_options=SYNCHRONOUS)

def save_data(bytes_array):
	try:
        print(bytes_array)
        write_api.write("bucket", 'org', bytes_array)
        
        # 你可以对数据在做点什么也不是不可以....
        # dosomething...
        
    except Exception as e:
        print(f"监控数据入库失败...{e}")

def on_connect(client, userdata, flags, rc):
    """订阅设备监控数据的topic"""
    print("MQTT连接成功")
    client.subscribe(ENV_MQTT_MT)
  
def on_message(client, userdata, msg):
    if "mt_device" in msg.topic:
        executor.submit(save_data, msg.payload)
  	else:
  		pass

if __name__ == '__main__':
	# mqtt配置信息根据自己的配置
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(ENV_MQTT_IP, int(ENV_MQTT_PORT), 60)
    client.loop_forever()

官方链接:
https://github.com/influxdata/telegraf/tree/master/plugins

        到这里我遇到的问题就解决了,而且没有再出现断断续续的情况,一直很稳定。这就不禁让我感到疑惑。同样的网络环境下为什么这种方式可以安全平稳的运行,而直接配置influxdb数据库就会出现问题呢?

讨论

如果你有好的解决办法,请在评论区告诉我。


http://www.kler.cn/a/272272.html

相关文章:

  • 1月21日星期二今日早报简报微语报早读
  • (一)相机标定——四大坐标系的介绍、对应转换、畸变原理以及OpenCV完整代码实战(C++版)
  • 【正则表达式】从0开始学习正则表达式
  • 最大矩阵面积问题
  • MongoDB基本操作
  • 【Maven】resources-plugin
  • 实战!wsl 与主机网络通信,在 wsl 中搭建服务器。学了计算机网络,但只能刷刷面试题?那也太无聊了!这篇文章可以让你检测你的计网知识!
  • 7.Java整合MongoDB—项目创建
  • 学习python笔记:8,随机数
  • 【XML】xml转Freemind思维导图
  • 【Java】十大排序
  • 【Unity入门】详解Unity中的射线与射线检测
  • 流媒体学习之路(WebRTC)——FEC逻辑分析(6)
  • 51单片机与ARM单片机的区别
  • Jest:JavaScript的单元测试利器
  • 【GPT-SOVITS-01】源码梳理
  • 避免内存泄漏及泄漏后的排查方法【C++】
  • Redis 常用数据类型,各自的使用场景是什么?
  • CentOS 7 编译安装 Git
  • AI基础知识(2)--决策树,神经网络
  • 编程语言的生态系统
  • 一种动态联动的实现方法
  • 使用gitee自动备份文件
  • 【C语言】指针基础知识(一)
  • 深度强化学习01
  • ubuntu18.04安装ffmpeg