大数据-186 Elasticsearch - ELK 家族 Logstash Input插件 JDBC syslog
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(已更完)
- Druid(已更完)
- Kylin(已更完)
- Elasticsearch(正在更新…)
章节内容
上节我们完成了如下的内容:
- Logstash 安装配置
- Logstash Input 插件
基本介绍
Logstash 的 JDBC 和 Syslog 是两种不同的 Input 插件,它们分别用于从数据库和 syslog 日志中收集数据。在详细介绍之前,首先明确一下它们各自的用途和特点:
-
JDBC 插件:用于从数据库(如 MySQL、PostgreSQL、SQL Server 等)中提取数据。它利用 JDBC 驱动连接数据库并运行 SQL 查询,收集查询结果并将其输入到 Logstash 进行处理和发送。
-
Syslog 插件:用于从 Syslog(系统日志)服务中收集日志数据。Syslog 是一种标准化的日志传输协议,广泛用于网络设备、操作系统和应用的日志记录。
JDBC Input 插件
JDBC 插件用于从关系型数据库中提取数据,特别适用于将结构化的业务数据导入到 ELK(Elasticsearch, Logstash, Kibana)堆栈中。它支持定时调度和增量提取,适合数据同步和 ETL 场景。
主要功能:
- 数据库连接:使用 JDBC 驱动来连接各种数据库(例如 MySQL、PostgreSQL、Oracle、SQL Server 等)。
- SQL 查询:允许用户通过 SQL 查询语句选择数据,支持复杂的查询条件。
- 增量提取:可以配置“追踪列”(tracking column),基于某一列(如自增 ID 或时间戳)实现增量数据拉取,避免重复导入。
- 定时调度:可以设置定时任务,定期查询和同步数据库的数据。
- 错误重试机制:可以处理连接错误并自动重试,确保数据采集的稳定性。
Syslog Input 插件
Syslog 插件用于接收基于 Syslog 协议传输的日志数据。Syslog 是一种广泛使用的日志协议,特别是在网络设备和 Linux/Unix 操作系统中。Logstash 的 Syslog 插件能够监听特定的端口,接收和解析这些日志。
主要功能:
- 协议支持:支持 Syslog 协议,包括 RFC 3164 和 RFC 5424 格式的日志。
- 多种传输方式:支持通过 TCP 和 UDP 协议接收日志,适应不同的传输需求。
- 日志解析:自动解析 Syslog 消息的头部字段,例如时间戳、主机名、程序名等。
- 网络监听:可以通过监听指定的 IP 和端口,持续接收并处理来自网络的 Syslog 日志。
JDBC插件
JDBC插件可以采集某张数据库表当中的数据到Logstash当中来
准备数据
CREATE TABLE `user` (
`id` bigint(20) NOT NULL,
`user_name` varchar(25) DEFAULT NULL,
`gender` varchar(20) DEFAULT NULL,
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(`id`)
) ENGINE=InnoDB DEFAULT CHRASET=utf8;
INSERT INTO `user` VALUES(1, 'zhangsan', 'male', '2024-08-16 00:00:00');
INSERT INTO `user` VALUES(2, 'lisi', 'female', '2024-08-16 00:00:00');
目前我们的MySQL服务器是在 h122 节点上的(之前给Hive和HBase等业务使用的),现在我们需要到 h122 数据库中,执行上述的SQL指令。
我这里使用Navicat执行, 执行结果如下图所示:
编写配置
cd /opt/servers/logstash-7.3.0/config
vim jdbc.conf
写入如下的内容(这里注意些自己的环境,我的环境和你的不一样):
input {
jdbc {
jdbc_driver_library => "/opt/servers/mysql-connector-java-8.0.19.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://h122.wzk.icu:3306/es-test"
jdbc_user => "hive"
jdbc_password => "hive@wzk.icu"
use_column_value => "true"
clean_run => "false"
record_last_run =>"true"
tracking_column => "id"
schedule => "* * * * *"
last_run_metadata_path => "/opt/servers/es/.Logstash_user_jdbc_last_run"
statement => "SELECT * from user where id > :sql_last_value;"
}
}
output{
stdout{
codec=>rubydebug
}
}
检查配置
cd /opt/servers/logstash-7.3.0
bin/logstash -f /opt/servers/logstash-7.3.0/config/jdbc.conf -t
执行结果如下图所示:
启动服务
cd /opt/servers/logstash-7.3.0
bin/logstash -f /opt/servers/logstash-7.3.0/config/jdbc.conf
启动结果如下图所示:
可以看到获取到了对应的数据:
发送数据
现在向数据库中写入数据,就可以发现Logstash监听到了:
INSERT INTO `user` VALUES(3, 'wangwu', 'female', '2024-08-16 00:00:00');
对应的Logstash的变化:
syslog插件
syslog机制负责记录内核和应用程序产生的日志信息,管理员可以通过查看日志记录,来掌握系统状况,默认系统已经安装了rsyslog,直接启动即可。
编写配置
创建新脚本,syslog.conf
cd /opt/servers/logstash-7.3.0/config
vim syslog.conf
写入如下的内容:
input {
tcp {
port => 6789
type => "syslog"
}
udp {
port => 6789
type => "syslog"
}
}
filter {
if [type] == "syslog" {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"
}
add_field => {
"received_at" => "%{@timestamp}"
"received_from" => "%{host}"
}
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
stdout {
codec => rubydebug
}
}
写入的内容如下图所示:
检查配置
cd /opt/servers/logstash-7.3.0
bin/logstash -f /opt/servers/logstash-7.3.0/config/syslog.conf -t
执行结果如下图所示:
启动服务
cd /opt/servers/logstash-7.3.0
bin/logstash -f /opt/servers/logstash-7.3.0/config/syslog.conf
执行结果如下图所示:
发送数据
修改系统日志配置文件
vim /etc/rsyslog.conf
添加一行配置:
*.* @@h121.wzk.icu:6789
写入的效果如下图所示:
重启系统日志服务
systemctl restart rsyslog
查看数据,可以看到如下的效果: