半小时速通flume-flume正文学习
文章目录
- 1.Flume安装
- 2.官方案例
- 3.案例2,监控hdfs实时更新日志
- 4.Flume监听一个目录,自动采集当前目录下的文件数据
- 5.实时监控目录下多个追加文件
- 6.Flume事务
- 7.Flume拓扑结构
- 8.复制和多路复用
- 9.Flume聚合操作
- 10.自定义拦截器Interceptor
- 1.idea创建工程导入依赖
- 2.创建类实现拦截器注意是flume包
- 3.实现拦截器的代码撰写
- 4.官方文档案例,自定义拦截器需要使用工厂设计模式
- 5.打包
- 6.配置flume信息
1.Flume安装
- 直接下载解压即可
- 删除lib下guava-11.0.2以兼容hadoop3.1.3
-
agent是一个jvm进程,因此,环境变量必须有Java
-
此外FLUME不需要任何额外配置
2.官方案例
-
案例需求:监听一个端口,收集该端口数据,并打印到控制台
-
实际操作
- 1.使用nc工具开启一个未被占用的端口,开启一个服务端
nc -lk 9999
- 2.监听该端口,开启一个客户端
nc localhost 9999
- 3.创建一个flume的配置信息
配置信息见名知意,就知道是net的source到logger的sink
# Name the components on this agent声明组件名称,a1表示agent命令,要是同一个节点启多个agent,那么组件的名字要不同。内部变量r1等与其他agent可以重复,此处source、sink、channel可以配置多个,因为加了s
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.那么组件的名字要不同。内部变量r1等与其他agent可以重复,此处source、sink、channel可以配置多个,因为加了s
2.第二处配置需要配置source 的类型,绑定机器,绑定端口
3.sink配置类型
4.channel配置类型memory,channel的容量1000个事件,transactionCapity事务容量,事务是解决安全性,防止数据丢失来实现的。事务的要比总容量要小。
5.将source,sink,channel绑定起来,原因是flume配置可自定义,可以配置多个channel,多个source等。此处需要注意,一个source可以绑定多个channel,但是一个channel只能绑定一个sink
4.启动flume进行监听操作
正确启动命令如下所示:
bin/flume-ng agent -c conf/ -n a1 -f
job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
3.案例2,监控hdfs实时更新日志
此处的exec source 表示的是linux的一条命令 主要配置tial -f
配置文件信息如下所示
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H #此处的端口号必须是namenode8020
######通用配置
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
######通用配置
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
1.source类型采用exec方式,表示使用传统Linux命令的方式进行
2.tail -f 和 -F有不同 二者都可以持续监听,但是一旦改名或移动位置等操作会终端tail -f的终端
- hadoop端口号namenode
- tail -f 不支持断点续传,一旦挂掉了,会丢数据。
4.Flume监听一个目录,自动采集当前目录下的文件数据
-
请注意,之前监听hive.log的使用exec source
-
但是此处使用Spooldir Source
-
官方文档如下
黑色表示必须配置
上述配置fileSuffix表示前缀
IncludePattern
IgnorePattern
你可以理解为白名单黑名单,使用正则表达式的方式来忽略或包含某种类型文件;
请注意回滚规则,若30秒内传输多个文件,那么回滚会放到一个hdfs路径下文件中
若再次上传1.txt则无法完成上传操作,原因是此文件若上传成功,但是无法完成更名操作
5.实时监控目录下多个追加文件
-
exce source不支持断电续传
-
spoolDir source不能够监控动态变化的文件
-
1.7版本出了Taildir source,它能够实现断电续传,并且能够动态监听多个实时追加的文件。
-
追加信息到文件
echo hello >> file.txt
必要的配置信息:
我们配置多个目录组f1,f2
然后分别配置目录组的路径
f1,f2使用正则表示分别监听包含file的文件和包含log的文件
6.Flume事务
-
事务:是一组操作的组合,要么操作全部执行,要么全部不执行,若事务执行中发生错误或故障,将进行回滚
-
source负责推数据,sink是拉数据
-
source到channel涉及一个put事务,两个流程doPut和putList
-
sink拉取channel的数据之间涉及一个take事务
- 小结:事务表示一组安全的操作,为了保证程序的安全性,put事务是source推送数据到channel,若channel空间不够,则回滚数据;take事务表示将数据从channel拉取到缓冲区,若发生异常则发生回滚操作。
7.Flume拓扑结构
- 所谓拓扑结构就是将多个agent串联起来
- 若多个flume进行串联,那么他们的sink必须使用AVRO,本质是将数据发送到某个端口,然后另一端的source从该端口获取数据
- AVRO是一个轻量化的RPC通信框架。
- agent1的负载量较大,直接往hdfs写较低,因此使用负载均衡策略来接多个agent同步往hdfs写
-
FLUME聚合操作非常典型,很多台服务器节点的数据,每一个服务器都部署一个agent。将所有agent汇总的数据都收集到一个agent上然后同步到hdfs中
-
聚合操作常用
8.复制和多路复用
- 需求分析:
- 架构设计
- 官方AVRO配置信息如下所示:
- avro source配置信息如下所示
- file roll sink 往本地文件系统去写的
9.Flume聚合操作
-
需求分析(集群方式Flume数据传输)
-
细节:主机名配置104,104是avro source 因此是服务端,服务端也是最先开启的一方
- 不同机器的agent可以一样;
- 同一台机器不同时启,agent也可以一致
10.自定义拦截器Interceptor
- 需求分析:
- 自定义拦截器的选择器类型不能使用传统的replicating而是multipllexing
- 上述头信息配置state,若头信息匹配到CZ则使用c1,若头信息使用US则使用c2,c3,没有头信息,那么配置c4即可
1.idea创建工程导入依赖
2.创建类实现拦截器注意是flume包
3.实现拦截器的代码撰写
- 代码撰写逻辑
- 1.明确拦截器目标:修改事件内容?过滤事件?添加头信息?
- 2.实现Interceptor接口
- 写清楚单个事件的处理逻辑
- 实现批量处理逻辑,通常使用单个事件处理
- 3.实现Builder类
- 如果拦截器有参数,使用configure(Context)来读取配置
- 使用build()方法构造拦截器实例。
4.官方文档案例,自定义拦截器需要使用工厂设计模式
- 具体来说就是在拦截器类内部构造一个静态类实现Builer接口,重写build方法,构造类即可。
package com.xidian.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.HostInterceptor;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author: Student Gu
* @create: 2025-01-25 08:33
**/
public class TypeInterceptor implements Interceptor {
// 声明一个集合收集拦截器处理后的event
private List<Event> addHeaderEvents;
@Override
public void initialize() {
addHeaderEvents = new ArrayList<>();
}
// 单个事件处理方法
/**
*/
@Override
public Event intercept(Event event) {
// 1.获取header 和 body
String body = new String(event.getBody());
Map<String, String> headers = event.getHeaders();
// 2.根据body中是否包含atguigu添加不同的头信息
if (body.contains("atguigu")){
headers.put("type","atguigu");//官方文档添加的V是CZ或者是UZ等
}else {
headers.put("type","other");
}
// 3.正常返回数据,若返回null表示你这条数据被过滤掉了
return event;
}
//批量事件处理方法
@Override
public List<Event> intercept(List<Event> list) {
// 1.清空一下集合
this.addHeaderEvents.clear();
// 2.遍历events
for (Event event : list) {
addHeaderEvents.add(intercept(event));
}
// 3.返回数据
return addHeaderEvents;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
5.打包
-
将打包扔到服务器flume/lib下
-
channel和sink对应关系;一个channel可以对应多个sink,但是一个sink仅能对应一个channel
6.配置flume信息
- flume1
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =
com.xidian.interceptor.TypeInterceptor
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.atguigu = c1
a1.sources.r1.selector.mapping.other = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
- flume2
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
- flume3
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1