当前位置: 首页 > article >正文

kafka和Flume的整合

目录

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

 3、测试

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 2、测试


 

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

flume学习网站:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了 (liyifeng.org)

# 来到这个目录下
cd /opt/installs/flume/conf/myconf
# 创建一个conf文件
vi kafka-memory-logger.conf

在kafka-memory-logger.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sources.r1.kafka.topics = bigdata

a1.sources.r1.kafka.consumer.group.id = text7

a1.sources.r1.batchSize = 100

a1.sources.r1.batchDurationMillis = 2000

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sinks.k1.maxBytesToLog = 128

 3、测试

启动一个消息生产者,向topic中发送消息,启动flume,接收消息

  • 启动一个消息生产者,向topic中发送消息:
kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic bigdata

  • 启动flume,接收消息 
flume-ng agent -n a1 -c ../ -f kafka-memory-logger.conf -Dflume.root.logger=INFO,console

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

 

 

 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 在flume-kafka-sink.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = netcat

a1.sources.r1.bind = bigdata01

a1.sources.r1.port = 44444

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = bigdata

a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

 2、测试

启动:

flume-ng agent -n a1 -c ../ -f flume-kafka-sink.conf -Dflume.root.logger=INFO,console

 使用telnet命令,向端口发送消息:

yum -y install telnet

telnet bigdata01 44444

 

 在窗口不断的发送文本数据,数据被抽取到了kafka中,如何获取kafka数据呢?使用消费者:

kafka-console-consumer.sh --topic bigdata --bootstrap-server bigdata01:9092

 


http://www.kler.cn/a/392968.html

相关文章:

  • 【插件】多断言 插件pytest-assume
  • Redis高可用-主从复制
  • 应用程序部署(IIS的相关使用,sql server的相关使用)
  • IPguard与Ping32全面对比——选择最适合企业的数据安全解决方案
  • 《EasyQuotation 与MongoDB在股市信息的奇妙融合》
  • 操作系统离散存储练习题
  • Linux 系统管理和监控命令---- auditctl命令
  • 【机器学习】29. 关联规则挖掘(Association Rule Mining)
  • Linux下的vim和gdb
  • day55 图论章节刷题Part07([53.寻宝]prim算法、kruskal算法)
  • Window.history API学习笔记
  • 基于flask+jwt+vue前后端分离架构
  • 如何提高业务系统的稳定性
  • 浅谈C#之内存管理
  • 【无人机设计与控制】无人机集群路径规划:5种最新优化算法(ECO、AOA、SFOA、MGO、PLO)求解无人机集群路径规划
  • 鸿蒙学习生态应用开发能力全景图-三方库(3)
  • 专题十八_动态规划_斐波那契数列模型_路径问题_算法专题详细总结
  • C语言中操作符详解(下)
  • MFC工控项目实例二十九主对话框调用子对话框设定参数值
  • 当微软windows的记事本被AI加持
  • 定时清理潜在客户列表中的无效邮箱可提高EDM电子邮件自动化营销邮件送达率
  • Android插件化和组件化面试题及参考答案
  • Mac的极速文件搜索工具,高效管理文件
  • 时序数据库TimescaleDB安装部署以及常见使用
  • 手机直连卫星NTN通信初步研究
  • WPF+MVVM案例实战与特效(二十八)- 自定义WPF ComboBox样式:打造个性化下拉菜单