当前位置: 首页 > article >正文

半小时速通flume-flume正文学习

文章目录

    • 1.Flume安装
    • 2.官方案例
    • 3.案例2,监控hdfs实时更新日志
    • 4.Flume监听一个目录,自动采集当前目录下的文件数据
    • 5.实时监控目录下多个追加文件
    • 6.Flume事务
    • 7.Flume拓扑结构
    • 8.复制和多路复用
    • 9.Flume聚合操作
    • 10.自定义拦截器Interceptor
      • 1.idea创建工程导入依赖
      • 2.创建类实现拦截器注意是flume包
      • 3.实现拦截器的代码撰写
      • 4.官方文档案例,自定义拦截器需要使用工厂设计模式
      • 5.打包
      • 6.配置flume信息

1.Flume安装

  • 直接下载解压即可

在这里插入图片描述

在这里插入图片描述

  • 删除lib下guava-11.0.2以兼容hadoop3.1.3

在这里插入图片描述

  • agent是一个jvm进程,因此,环境变量必须有Java

  • 此外FLUME不需要任何额外配置

2.官方案例

  • 案例需求:监听一个端口,收集该端口数据,并打印到控制台

  • 实际操作

    • 1.使用nc工具开启一个未被占用的端口,开启一个服务端
    nc -lk 9999
    
    • 2.监听该端口,开启一个客户端
    nc localhost 9999
    

在这里插入图片描述

  • 3.创建一个flume的配置信息

在这里插入图片描述

配置信息见名知意,就知道是net的source到logger的sink

# Name the components on this agent声明组件名称,a1表示agent命令,要是同一个节点启多个agent,那么组件的名字要不同。内部变量r1等与其他agent可以重复,此处source、sink、channel可以配置多个,因为加了s
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

1.那么组件的名字要不同。内部变量r1等与其他agent可以重复,此处source、sink、channel可以配置多个,因为加了s

2.第二处配置需要配置source 的类型,绑定机器,绑定端口

3.sink配置类型

4.channel配置类型memory,channel的容量1000个事件,transactionCapity事务容量,事务是解决安全性,防止数据丢失来实现的。事务的要比总容量要小。

5.将source,sink,channel绑定起来,原因是flume配置可自定义,可以配置多个channel,多个source等。此处需要注意,一个source可以绑定多个channel,但是一个channel只能绑定一个sink

4.启动flume进行监听操作

在这里插入图片描述

正确启动命令如下所示:

bin/flume-ng agent -c conf/ -n a1 -f
job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

3.案例2,监控hdfs实时更新日志

在这里插入图片描述

此处的exec source 表示的是linux的一条命令 主要配置tial -f

配置文件信息如下所示

# Name the components on this agent	
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H		#此处的端口号必须是namenode8020

######通用配置
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
######通用配置

#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0


# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

1.source类型采用exec方式,表示使用传统Linux命令的方式进行

2.tail -f 和 -F有不同 二者都可以持续监听,但是一旦改名或移动位置等操作会终端tail -f的终端

  • hadoop端口号namenode

在这里插入图片描述

  • tail -f 不支持断点续传,一旦挂掉了,会丢数据。

4.Flume监听一个目录,自动采集当前目录下的文件数据

在这里插入图片描述

  • 请注意,之前监听hive.log的使用exec source

  • 但是此处使用Spooldir Source

  • 官方文档如下

在这里插入图片描述

黑色表示必须配置

在这里插入图片描述

上述配置fileSuffix表示前缀

IncludePattern

IgnorePattern

你可以理解为白名单黑名单,使用正则表达式的方式来忽略或包含某种类型文件;

请注意回滚规则,若30秒内传输多个文件,那么回滚会放到一个hdfs路径下文件中

在这里插入图片描述

若再次上传1.txt则无法完成上传操作,原因是此文件若上传成功,但是无法完成更名操作

5.实时监控目录下多个追加文件

  • exce source不支持断电续传

  • spoolDir source不能够监控动态变化的文件

  • 1.7版本出了Taildir source,它能够实现断电续传,并且能够动态监听多个实时追加的文件。

  • 追加信息到文件

    echo hello >> file.txt
    

    在这里插入图片描述

必要的配置信息:

在这里插入图片描述

在这里插入图片描述

我们配置多个目录组f1,f2

然后分别配置目录组的路径

f1,f2使用正则表示分别监听包含file的文件和包含log的文件

6.Flume事务

  • 事务:是一组操作的组合,要么操作全部执行,要么全部不执行,若事务执行中发生错误或故障,将进行回滚

  • source负责推数据,sink是拉数据

  • source到channel涉及一个put事务,两个流程doPut和putList

    在这里插入图片描述

  • sink拉取channel的数据之间涉及一个take事务

在这里插入图片描述

  • 小结:事务表示一组安全的操作,为了保证程序的安全性,put事务是source推送数据到channel,若channel空间不够,则回滚数据;take事务表示将数据从channel拉取到缓冲区,若发生异常则发生回滚操作。

7.Flume拓扑结构

  • 所谓拓扑结构就是将多个agent串联起来

在这里插入图片描述

  • 若多个flume进行串联,那么他们的sink必须使用AVRO,本质是将数据发送到某个端口,然后另一端的source从该端口获取数据

在这里插入图片描述

  • AVRO是一个轻量化的RPC通信框架。

在这里插入图片描述

在这里插入图片描述

  • agent1的负载量较大,直接往hdfs写较低,因此使用负载均衡策略来接多个agent同步往hdfs写

在这里插入图片描述

  • FLUME聚合操作非常典型,很多台服务器节点的数据,每一个服务器都部署一个agent。将所有agent汇总的数据都收集到一个agent上然后同步到hdfs中

  • 聚合操作常用

8.复制和多路复用

  • 需求分析:

在这里插入图片描述

  • 架构设计

在这里插入图片描述

  • 官方AVRO配置信息如下所示:

在这里插入图片描述

  • avro source配置信息如下所示

在这里插入图片描述

  • file roll sink 往本地文件系统去写的

在这里插入图片描述

9.Flume聚合操作

  • 需求分析(集群方式Flume数据传输)

    在这里插入图片描述

  • 细节:主机名配置104,104是avro source 因此是服务端,服务端也是最先开启的一方

在这里插入图片描述

  • 不同机器的agent可以一样;
  • 同一台机器不同时启,agent也可以一致

10.自定义拦截器Interceptor

  • 需求分析:

在这里插入图片描述

在这里插入图片描述

  • 自定义拦截器的选择器类型不能使用传统的replicating而是multipllexing

在这里插入图片描述

  • 上述头信息配置state,若头信息匹配到CZ则使用c1,若头信息使用US则使用c2,c3,没有头信息,那么配置c4即可

1.idea创建工程导入依赖

在这里插入图片描述

2.创建类实现拦截器注意是flume包

在这里插入图片描述

3.实现拦截器的代码撰写

  • 代码撰写逻辑
    • 1.明确拦截器目标:修改事件内容?过滤事件?添加头信息?
    • 2.实现Interceptor接口
      • 写清楚单个事件的处理逻辑
      • 实现批量处理逻辑,通常使用单个事件处理
    • 3.实现Builder类
      • 如果拦截器有参数,使用configure(Context)来读取配置
      • 使用build()方法构造拦截器实例。

4.官方文档案例,自定义拦截器需要使用工厂设计模式

在这里插入图片描述

  • 具体来说就是在拦截器类内部构造一个静态类实现Builer接口,重写build方法,构造类即可。
package com.xidian.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.HostInterceptor;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author: Student Gu
 * @create: 2025-01-25 08:33
 **/

public class TypeInterceptor implements Interceptor {
//    声明一个集合收集拦截器处理后的event
    private List<Event> addHeaderEvents;
    @Override
    public void initialize() {
        addHeaderEvents = new ArrayList<>();

    }

//    单个事件处理方法

    /**


     */
    @Override
    public Event intercept(Event event) {
//        1.获取header 和 body

        String body = new String(event.getBody());

        Map<String, String> headers = event.getHeaders();

//        2.根据body中是否包含atguigu添加不同的头信息
        if (body.contains("atguigu")){
            headers.put("type","atguigu");//官方文档添加的V是CZ或者是UZ等
        }else {
            headers.put("type","other");
        }
//        3.正常返回数据,若返回null表示你这条数据被过滤掉了
        return event;
    }

//批量事件处理方法
    @Override
    public List<Event> intercept(List<Event> list) {
//        1.清空一下集合
        this.addHeaderEvents.clear();

//        2.遍历events
        for (Event event : list) {
            addHeaderEvents.add(intercept(event));
        }

//        3.返回数据
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

5.打包

在这里插入图片描述

在这里插入图片描述

  • 将打包扔到服务器flume/lib下

  • channel和sink对应关系;一个channel可以对应多个sink,但是一个sink仅能对应一个channel

6.配置flume信息

  • flume1
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =
com.xidian.interceptor.TypeInterceptor
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.atguigu = c1
a1.sources.r1.selector.mapping.other = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
  • flume2
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
  • flume3
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1


http://www.kler.cn/a/519247.html

相关文章:

  • Mono里运行C#脚本36—加载C#类定义的成员变量和方法的数量
  • 飞牛NAS安装过程中的docker源问题
  • 【vue3组件】【大文件上传】【断点续传】支持文件分块上传,能够在上传过程中暂停、继续上传的组件
  • @RabbitListener处理重试机制完成后的异常捕获
  • HTB:Support[WriteUP]
  • 面向程序员的Lean 4教程(2) - 数组和列表
  • 【深入理解SpringCloud微服务】Sentinel源码解析——DegradeSlot熔断规则
  • 【漫话机器学习系列】060.前馈神经网络(Feed Forward Neural Networks, FFNN)
  • 能源新动向:智慧能源平台助力推动新型电力负荷管理系统建设
  • 面试技巧——压力面题目与参考答案
  • 软件越跑越慢的原因分析
  • (一)QT的简介与环境配置WIN11
  • Vivado生成X1或X4位宽mcs文件并固化到flash
  • ES设置证书和创建用户,kibana连接es
  • 【前沿聚焦】机器学习的未来版图:从自动化到隐私保护的技术突破
  • 通过亚马逊云科技Bedrock打造自定义AI智能体Agent(上)
  • Python 字符串加密
  • 什么是业务对象
  • C++练习 —— 命名空间、引用、类的定义、构造函数和析构函数、运算符重载、const成员函数、类相关OJ题
  • 2024:人工智能大模型的璀璨年代
  • 在 ASP.NET Core 6.0 Web API 中将 Excel 文件数据上传并保存到数据库中
  • 数据结构初阶之栈的介绍与栈的实现
  • C语言程序设计十大排序—希尔排序
  • 代码随想录-训练营-day14
  • 设计模式Python版 工厂方法模式
  • 【C语言】字符函数与字符串函数