当前位置: 首页 > article >正文

kafka和Flume的整合

目录

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

 3、测试

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 2、测试


 

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

flume学习网站:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了 (liyifeng.org)

# 来到这个目录下
cd /opt/installs/flume/conf/myconf
# 创建一个conf文件
vi kafka-memory-logger.conf

在kafka-memory-logger.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sources.r1.kafka.topics = bigdata

a1.sources.r1.kafka.consumer.group.id = text7

a1.sources.r1.batchSize = 100

a1.sources.r1.batchDurationMillis = 2000

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sinks.k1.maxBytesToLog = 128

 3、测试

启动一个消息生产者,向topic中发送消息,启动flume,接收消息

  • 启动一个消息生产者,向topic中发送消息:
kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic bigdata

  • 启动flume,接收消息 
flume-ng agent -n a1 -c ../ -f kafka-memory-logger.conf -Dflume.root.logger=INFO,console

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

 

 

 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 在flume-kafka-sink.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = netcat

a1.sources.r1.bind = bigdata01

a1.sources.r1.port = 44444

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = bigdata

a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

 2、测试

启动:

flume-ng agent -n a1 -c ../ -f flume-kafka-sink.conf -Dflume.root.logger=INFO,console

 使用telnet命令,向端口发送消息:

yum -y install telnet

telnet bigdata01 44444

 

 在窗口不断的发送文本数据,数据被抽取到了kafka中,如何获取kafka数据呢?使用消费者:

kafka-console-consumer.sh --topic bigdata --bootstrap-server bigdata01:9092

 


http://www.kler.cn/a/392968.html

相关文章:

  • 游戏引擎学习第62天
  • SpringAI人工智能开发框架006---SpringAI多模态接口_编程测试springai多模态接口支持
  • 前端网页开发学习(HTML+CSS+JS)有这一篇就够!
  • CentOS7下的vsftpd服务器和客户端
  • [源码解析] 模型并行分布式训练Megatron (2) --- 整体架构
  • 宠物行业的出路:在爱与陪伴中寻找增长新机遇
  • Linux 系统管理和监控命令---- auditctl命令
  • 【机器学习】29. 关联规则挖掘(Association Rule Mining)
  • Linux下的vim和gdb
  • day55 图论章节刷题Part07([53.寻宝]prim算法、kruskal算法)
  • Window.history API学习笔记
  • 基于flask+jwt+vue前后端分离架构
  • 如何提高业务系统的稳定性
  • 浅谈C#之内存管理
  • 【无人机设计与控制】无人机集群路径规划:5种最新优化算法(ECO、AOA、SFOA、MGO、PLO)求解无人机集群路径规划
  • 鸿蒙学习生态应用开发能力全景图-三方库(3)
  • 专题十八_动态规划_斐波那契数列模型_路径问题_算法专题详细总结
  • C语言中操作符详解(下)
  • MFC工控项目实例二十九主对话框调用子对话框设定参数值
  • 当微软windows的记事本被AI加持
  • 定时清理潜在客户列表中的无效邮箱可提高EDM电子邮件自动化营销邮件送达率
  • Android插件化和组件化面试题及参考答案
  • Mac的极速文件搜索工具,高效管理文件
  • 时序数据库TimescaleDB安装部署以及常见使用
  • 手机直连卫星NTN通信初步研究
  • WPF+MVVM案例实战与特效(二十八)- 自定义WPF ComboBox样式:打造个性化下拉菜单