当前位置: 首页 > article >正文

大数据-227 离线数仓 - Flume 自定义拦截器(续接上节) 采集启动日志和事件日志

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Flume 自定义拦截器
  • 拦截原理 拦截器实现 Java

在这里插入图片描述

自定义拦截器

(续接上节,上节已经到了打包的部分)

上传结果

将刚才的打包上传到这个目录下:
我拷贝的是带依赖的:“flume-test-1.0-SNAPSHOT-jar-with-dependencies.jar”

/opt/servers/apache-flume-1.9.0-bin/lib/

测试效果

我们创建刚才说的conf文件:

vim /opt/wzk/flume-conf/flumetest1.conf

编写的内容如下图所示:
在这里插入图片描述
启动进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flumetest1.conf -name a1 -Dflume.roog.logger=INFO,console

启动结果如下图所示:
z

我们启动 telnet 来传入数据:

telnet h122.wzk.icu 9999

启动之后输入数据:

2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

此时控制台的数据内容部分为:

4/08/27 15:31:31 INFO source.NetcatSource: Source starting
24/08/27 15:31:31 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.1.130:9999]
24/08/27 15:32:09 INFO sink.LoggerSink: Event: { headers:{logtime=2020-07-30} body: 32 30 32 30 2D 30 37 2D 33 30 20 31 34 3A 31 38 2020-07-30 14:18 }

对应的截图如下图所示:
在这里插入图片描述

采集启动日志(使用自定义拦截器)

配置文件

新建一个配置文件:

vim /opt flume-log2hdfs2.conf

写入的内容如下所示:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地时间
# a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

内容的截图如下所示:
在这里插入图片描述
修改的内容如下:

  • 给source增加自定义拦截器
  • 去掉时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true
  • 根据header中的logtime写文件

测试运行

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs2.conf -name a1 -Dflume.roog.logger=INFO,console

拷贝日志

修改日志的内容:

vim /opt/wzk/logs/start/test.log

继续写入如下的内容:

2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

写入内容如下图所示:
在这里插入图片描述

测试效果

可以看到HDFS上,已经有了读出日志中的时间的内容:
在这里插入图片描述

采集启动日志和事件日志

本系统中要采集两种日志:

  • 启动日志
  • 事件日志
    不同的日志放置在不同的目录下,要想一次拿到全部日志需要监控多个目录。
    在这里插入图片描述

总体思路

  • taildir 监控多个目录
  • 修改自定义拦截器,不同来源的数据加上不同标志
  • HDFS、Sink 根据标志写文件

Agent 介绍

Flume 是一个分布式、高可靠、可用来收集、聚合和传输大量日志数据的系统。在 Flume 的体系结构中,Agent 是一个关键的组件。每个 Agent 是一个独立的 JVM 进程,负责从数据源获取数据并将其传递到下游(如文件系统、数据库、或者另一个 Agent)

Agent 的核心组成部分

Flume Agent 的架构是高度模块化的,它由以下三个核心组件构成:

Source (源)

Source 是数据流的起点,负责接收外部数据。它支持多种数据传输协议和格式,能够从日志文件、网络端口、消息队列等数据源中接收事件。

支持的 Source 类型:

  • Avro Source:接收来自其他 Flume Agent 的 Avro 格式数据。
  • Syslog Source:处理 Syslog 消息。
  • Exec Source:从本地命令或脚本的输出中读取数据。
  • HTTP Source:通过 HTTP 接口接收数据。
  • Spooling Directory Source:监控特定目录中的文件并读取内容。

Channel (通道)

Channel 是 Agent 的数据缓冲区域,用于在 Source 和 Sink 之间暂存事件。Channel 的设计保证了在数据流动中断(如网络故障)时,数据不会丢失。

常见的 Channel 类型:

  • Memory Channel:将事件存储在内存中,速度快,但可能会丢失数据(如果 Agent 崩溃)。
  • File Channel:将事件存储在磁盘文件中,提供高可靠性但性能较低。
  • Kafka Channel:使用 Kafka 作为中转通道,适合分布式场景。

Sink (接收器)

Sink 是数据流的终点,负责将事件传递到下游存储或处理系统。

支持的 Sink 类型:

  • HDFS Sink:将事件写入 Hadoop HDFS。
  • Kafka Sink:将事件发送到 Kafka。
  • Elasticsearch Sink:将事件写入 Elasticsearch。
  • Logger Sink:将事件输出到日志。
  • Avro Sink:将事件传递到另一个 Flume Agent 的 Avro Source。

Agent配置

vim /opt/wzk/flume-conf/flume-log2hdfs3.conf

写入的内容如下图所示:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%
{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • filegroups:指定filegroups,可以有多个,以空格分割(taildir source可以同时监控多个目录中的文件)
  • headers.filegroups.headerKey

给Event增加header Key,不同的filegroup,可配置不同的value。


http://www.kler.cn/a/400327.html

相关文章:

  • 若依权限控制
  • 【Java 学习】数据类型、变量、运算符、条件控制语句
  • hadoop+wsl 10.255.255.254,BlockMissingException: Could not obtain block: 踩坑
  • .NET 9.0 中 System.Text.Json 的全面使用指南
  • 怎样在软件设计中选择使用GOF设计模式
  • JavaScript中如何使用Promise处理异步操作?
  • 华为HCCDA云技术认证--网络服务
  • 视图合并机制解析 | OceanBase查询优化
  • windows C#-异步编程概述(三)
  • 应用系统开发(12) Zync中实现数字相敏检波
  • 公众号登录报错问题处理
  • 【Pythonr入门第二讲】你好,世界
  • C# 属性与结构
  • 【机器学习】从马尔可夫链到CRF:全方位解析序列建模的核心技术
  • https://localhost/index 配置的nginx,一刷新就报404了
  • C++ 常见容器获取头元素的方法全览
  • 数据结构-二叉搜索树(Java语言)
  • 2.3 物理层设备
  • 无人机+无人车+机器狗:城市巷战突破技术详解
  • DataStream编程模型之数据源、数据转换、数据输出
  • 【蓝桥杯备赛】深秋的苹果
  • @quick-start/electron安装过程中的问题解决
  • CertiK安全调研报告:Web3.0桌面钱包的初步安全评估
  • vscode调试已经编译好的程序
  • ROS第九梯:ROS+VSCode+Python+C++自定义消息发布和订阅
  • ⚡️如何在 React 和 Next.js 项目里优雅的使用 Zustand