当前位置: 首页 > article >正文

【大数据学习 | flume】flume之常见的channel组件

Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件,Flume对于Channel,则提供了Memory Channel、JDBC Chanel、File Channel。

MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。

FileChannel保证数据的完整性与一致性。

​ Spillable Memory Channel基于内存和磁盘,内存不够时将数据存储在磁盘中,数据出错恢复时,只恢复磁盘中的数据,还在测试阶段不建议在生产环境用。

1. file channel

# file channel

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=11.90.214.80
a1.sources.r1.port=44444

#定义channel
a1.channels.c1.type=file
a1.channels.c1.dataDirs = /root/filedata

#定义sink
a1.sinks.k1.type=logger
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

创建数据输出目录

mkdir -p /root/filedata

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./fileroll.agent -Dflume.root.logger=INFO,console

2. Kafka Channel

将数据存储到kafka中,kafka数据也是存储在磁盘中,并且kafka提供了高可用的功能,数据不会丢失。

重新启动镜像并需要添加kafka的组件。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=11.90.214.80
a1.sources.r1.port=44444

#定义channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.c1.kafka.topic = hainiu
a1.channels.c1.kafka.consumer.group.id = flume-consumer

#定义sink
a1.sinks.k1.type=logger
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./kafkachannel.agent -Dflume.root.logger=INFO,console

测试kafka中是否存储flume收集过来的数据:

启动kafka消费者消费指定分区的数据

#创建主题
./kafka-topics.sh --zookeeper11.99.16.105:2181 --create --topic hainiu --replication-factor 1 --partitions 1
#生产者生产数据
./kafka-console-producer.sh --broker-list 11.99.16.105:9092--topic hainiu
#消费者消费数据
kafka-console-consumer.sh --bootstrap-server 11.99.16.105:9092 --topic hainiu

通过telnet向flume监听的端口发数据

flume logger sink将数据打印在控制台


http://www.kler.cn/a/398655.html

相关文章:

  • 01.02、判定是否互为字符重排
  • 结构化需求分析与设计
  • docker-hub 无法访问,使用windows魔法拉取docker images再上传到linux docker环境中
  • LLMs 训练经验篇
  • 《生成式 AI》课程 第3講 CODE TASK 任务3:自定义任务的机器人
  • 【0x001C】HCI_Write_Page_Scan_Activity详解
  • 在ubuntu上安装ubuntu22.04并ros2 humble版本的docker容器记录
  • 【C++动态规划 最长公共子序列】1035. 不相交的线|1805
  • c++基础36时间复杂度
  • Excel模板下载\数据导出
  • MySQL面试之底层架构与库表设计
  • 【iOS】知乎日报第四周总结
  • 智慧社区管理系统平台全面提升物业管理效率与用户体验
  • 拉取docker镜像应急方法
  • 论文《基于现实迷宫地形的电脑鼠设计》深度分析(四)——现实迷宫算法
  • css 布局学习之底部弹窗切换示
  • GPU分布式通信技术-PCle、NVLink、NVSwitch深度解析
  • Stable Diffusion Web UI - Checkpoint、Lora、Hypernetworks
  • 【案例】---Hutool提取excel文档
  • Excel365和WPS中提取字符串的五种方法
  • git如何添加已被忽略的目录里的子目录
  • 海外媒体发稿:中东地区阿拉伯邮报Arab Post新闻媒体宣发
  • hadoop_capacity-scheduler.xml
  • 【go从零单排】Directories、Temporary Files and Directories目录和临时目录、临时文件
  • 应用于各种小家电的快充协议芯片
  • python 多进程,程序运行越来越慢踩坑