当前位置: 首页 > article >正文

Flume的安装与使用

一、简介

1.1、概念

Apache Flume 是一个分布式、可靠且可用的系统,专为收集、聚合和移动大量日志数据而设计。它的架构基于流式数据流模型,允许开发者通过简单的配置实现从多个来源收集数据,通过简单的配置文件,用户即可完成复杂的数据流定义,大大降低了使用门槛。Flume 最常见的应用场景是从 Web 服务器或其他服务的日志文件中实时收集数据,并将这些数据传输到一个或多个目标位置,如HDFS(Hadoop分布式文件系统)、关系数据库、消息队列等。

1.2、特点

高可用性:Flume 设计有故障恢复机制,能够自动重试失败的操作,确保数据的可靠传输。
可扩展性:用户可以根据数据流量的变化动态调整Agent的数量,轻松应对数据增长的需求。
灵活性:支持多种数据源和目的地,可以通过配置文件轻松定制数据流,满足不同业务场景的需求。
易用性:提供了丰富的插件和配置选项,无需编程即可实现复杂的数据收集和传输任务。

二、安装

2.1、上传解压重命名

上传到  /opt/moudles  下

解压到  /opt/installs  下

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs

重命名

cd /opt/installs

mv apache-flume-1.9.0-bin/ flume

2.2、修改配置文件

来到  /opt/instals/flume/conf  下

cp flume-env.sh.template flume-env.sh

将JAVA_HOME路径修改为自己的jdk路径

修改环境变量

vi /etc/profile

 

export FLUME_HOME=/opt/installs/flume
export PATH=$PATH:$FLUME_HOME/bin

刷新一下:  source /etc/profile

三、使用(以案例形式)

参考网址:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了

flume 的使用是编写 conf文件的,运行的时候指定该文件

# 定义组件的名字
<Agent>.sources = <Source>
a1.sources=s1
<Agent>.channels = <Channel1> <Channel2>
a1.channels=c1
<Agent>.sinks = <Sink>
a1.sinks=sink1

# 设置source 和 channel 之间的关系
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
a1.sources.s1.channels=c1

# 设置sink 和 channel 之间的关系
<Agent>.sinks.<Sink>.channel = <Channel1>
a1.sinks.sink1.channel=c1

先定义agent的名字,再定义agent中三大组件的名字
接着定义各个组件之间的关联关系

3.1、Avro+Memory+Logger

可以在flume的conf下创建一个 myconf  文件夹来存放写的文件

avro: 是监听某个端口是否有信息的工具
memory: 内存
logger: 控制台
即将演示一个场景:给服务器上的一个端口发送消息,消息经过内存,打印到控制台上。

#编写s1的类型是什么
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.32.128
a1.sources.s1.port = 4141
a1.sources.s1.channels = c1 

找到channel中的memory类型,再设置一下

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
#source 或者 sink 每个事务中存取 Event 的操作数量
a1.channels.c1.transactionCapacity = 10000 

接着查找sink,sink的类型是logger

 a1.sinks.s2.channel = c1
a1.sinks.s2.type = logger

最终合并起来的文件就是:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 4141

a1.channels.c1.type = memory

a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

 在myconf文件夹下创建conf文件  avro-memory-log.conf   将上面的代码粘贴进去,随后运行它

flume-ng agent -c ../ -f avro-memory-log.conf -n a1 -Dflume.root.logger=INFO,console

-c  后面跟上 配置文件的路径
-f  跟上自己编写的conf文件
-n  agent的名字
-Dflume.root.logger=INFO,console   INFO 日志输出级别  Debug,INFO,warn,error 等

接着向端口中发送数据:

flume-ng avro-client -c /opt/installs/flume/conf/ -H bigdata01 -p 4141 -F /home/hivedata/arr1.txt

给avro发消息,使用avro-client

 flume是没有运行结束时间的,它一直监听某个Ip的端口,有消息就处理,没消息,就等着,反正不可能运行结束。

如果想停止,可以使用ctrl + c 终止flume。

3.2、Exec + Memory + HDFS

到企业中怎么知道用什么呢?

取决于公司的业务,理论将 sources channel sink 可以任意组合

以下版本演示的是没有时间语义的案例:

文件名:exec-memory-hdfs.conf

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hivedata/arr1.txt  #虚拟机上的目录,一定要存在
a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000


a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/event/ #在hdfs上的被抽取消息的存储位置

 flume-ng agent -c ./ -f exec-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

不断向 arr1.txt  文件中输入数据即可观察到被抽取的现象

echo "nidejiaobuliuliangdaotianya" >> arr1.txt

3.3、Spool +File + HDFS

Spooling Directory

spool 这个效果是抽取一个文件夹的效果,文件夹中不断的产生新的文件,我将这些新的文件上传至hdfs。

文件名:spool-file-hdfs.conf

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /home/scripts/
a1.sources.src-1.fileHeader = true

a1.channels.ch-1.type = file

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = ch-1
a1.sinks.k1.hdfs.path = /flume/

以上的采集只能采集到文件夹中是否有新的文件产生,不能采集变化的文件。

抽取一个文件夹中的所有文件,子文件夹中的文件是不抽取的,抽取过的文件,数据发生了变化,也不会再抽取一次。

3.4、tailDir + Memory + HDFS [ 非常常用 ]

tailDir 是用来监控多个文件夹下的多个文件的,只要文件内容发生变化,就会再次的进行数据的抽取

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
# . 代表的意思是一个任意字符   * 代表前面的字符出现0到多次
a1.sources.r1.filegroups.f1 = /home/scripts/datas/.*txt.*


a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume3/logs

运行:

flume-ng agent -c ./ -f taildir-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console 


http://www.kler.cn/a/396114.html

相关文章:

  • centos rich 美观打印日志
  • sql分区
  • python怎么设置环境变量
  • git没有识别出大写字母改成小写重命名的文件目录
  • Spring Boot 中的全局异常处理器
  • 山泽光纤HDMI线:铜线的隐藏力量
  • 249: 凸包面积
  • Spark RDD 的 compute 方法
  • Apache Doris:高级数据导入导出与外部系统集成
  • PyTorch和TensorFlow和Keras
  • Rust Struct 属性初始化
  • SpringBoot(5)-SpringSecurity
  • 循环队列KFIFO
  • 【Linux篇】面试——用户和组、文件类型、权限、进程
  • shell脚本(1)
  • 4. Spring Cloud Ribbon 实现“负载均衡”的详细配置说明
  • TMMI(测试成熟度模型集成)认证是什么?
  • uniapp微信登录的流程
  • 同三维T610UDP-4K60 4K60 DP或HDMI或手机信号采集卡
  • paddle表格识别数据制作
  • 【3D Slicer】的小白入门使用指南八
  • Redis五大基本类型——String字符串命令详解(命令用法详解+思维导图详解)
  • 自动化运维(k8s):一键获取指定命名空间镜像包脚本
  • 衡石科技BI如何助力企业实现数字化转型
  • Spring Boot编程训练系统:敏捷开发与持续集成
  • My_SQL day3