当前位置: 首页 > article >正文

2、zookeeper和kafka

zookeeper

zookeeper基本知识

zookeeper:开源的分布式框架协调服务

zookeeper的工作机制

基于观察者模式设计的分布式结构,复制存储和管理架构当中的元信息,架构当中的应用接受观察者的监控,一旦数据有变化,通知对于的zookeeper,保存变化的信息。

zookeeper特点

1、最少要有3台服务器,一个领导者(leader),多个跟随者(follower)。

2、zookeeper要有半数以上的节点存货,整个架构就可以正常工作,所有都是奇数台部署

3、全局数据一致

4、数据更新的原子性,要么都成功,要么都失败。

5、实时性

zookeeper的数据结构

1、统一的命名服务,不是以IP来记录,可以用域名也可以是主机名来记录信息。

2、统一配置管理,所有的节点信息的配置要是一致的。

3、统一的集群管理,在整个分布式的环境中,必须实时的掌握每个节点的状态,如果状态发生变化,要及时更新。

zookeeper安装实操

这里我们使用三台服务器完成zookeeper架构,leader和follow是随机选举的。

实验架构如下:

zw6:192.168.254.16

tomcat:192.168.254.21

mysql:192.168.254.33

1、三台服务器都解压zookeeper包并安装java依赖环境

tar -xf apache-zookeeper-3.5.7-bin.tar.gz

apt -y install openjdk-8-jre-headless

2、三台服务器都创建配置文件zoo.cfg

3、三台服务器修改配置文件zoo.cfg

定义zookeeper的数据目录和日志目录

  • server.1:指的是服务器的序号
  • 3188:leader和follower之间交换信息的通信端口
  • 3288:选举的端口

4、三台服务器都创建数据目录和日志目录并赋权

5、三台服务器分别在/data/目录下,创建myid文件,并且里面输入分别是1、2、3,和我们配置文件里面1、2、3对应。

6、三台主机都启动zookeeper并查看状态,这里我们mysql3服务器是leader,两外两台都是follow

kafka

消息队列

kafka消息队列:服务端向客户端发送一个指令,客户端收到指令并且通过这个指令反馈到服务端,完成一个异步方式的通信的流程。

kafka消息队列:处理大数据场景非常合适

rabbitMQ消息队列:处理小数据场景合适

activeMQ消息队列:处理小数据场景合适

消息队列的应用场景

1、异步处理:用于用户的短信验证码,邮件通知。

2、系统解耦:用于微服务架构中的服务之间通信。可以降低各个组件之间的依赖程度(耦合度),可以提高组件的灵活性和可维护性。

3、负载均衡:用于高并发系统的任务处理。消息队列把多个任务分发到多个消费者实列,如电商平台的订单系统。

4、流量控制和限流:根据api请求来进行处理。通过控制消息的生产速度和消费者的处理速度来完成限流。

  • 端口:应用和应用之间通信
  • api接口:应用程序内部各个组件之间的通信的方式

5、任务调度和定时任务:消息队列可以定时的进行任务调度,按照消费者的计划生产对于的数据

6、数据同步和分发:用于日志收集和数据收集。可以远程的实现数据的统一管理。

7、实时数据处理

8、备份和恢复

消息队列的模式

1、点对点:一对一,生产者生产消息,消费者消费消息,这个是一对一的。

2、发布/订阅模式:消息的生产者发布一个主题,其他的消费者订阅这个主题,从而实现一对多。

  • 主题:topic

kafka的组件名称

1、主题 topic:主题是kafka数据流的一个基本的单元,类似于数据的管道,生产者将消息发布到主题,其他消费者订阅主题来消费消息,主题可以被分区,分区有偏移量。

2、生产者:将消息写入主题和分区

3、消费者:从主题和分区中接受发布的消息,一个消费者可以订阅多个主题。

4、分区:一个主题可以被分为多个分区,每个分区都是数据的有序的子集,分区越多,消费者消费的速度越快,可以避免生产者的消费堆积。分区当中有偏移量,按照偏移量进行有序存储,消费者可以独立的读写每个分区的数据。

  • 如何读取分区的数据,一般是不考虑的。只有在消息出现堆积的时候,会考虑扩展分区数。
  • 堆积:消费者没有及时处理掉生产者发布的消息,导致消费堆积

kafka的消费堆积出现如何处理:

1、扩展分区数

2、偏移量:消息在分区当中的唯一标识,根据偏移量指定消费者获取消息的位置。

3、经纪人zookeeper:存储分区的消息,kafka集群的元数据。

kafka数据流向

kafka安装实操

这里我们依然使用上面zookeeper实验的三台服务器完成kafka实操

kafka非常占内存,可以在安装使用前清理下内存,echo 3 > /proc/sys/vm/drop_caches

1、三台服务器解压软件包后移动至/usr/local/kafka

2、三台服务端修改kafka配置文件server.properties

首先配置三台服务器不一样的地方

  • #全局唯一编号,每一台服务器都不能相同,这里我们定0、1、2即可

  • #地址分别对应自己主机的IP地址

接下里都是一样的配置

3、三台服务器添加kafka全局变量

4、三台服务器一起启动kafka

./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

看端口可以看出我们kafka服务已启,并且推选了mysql3这个主机为主

kafka补充应用

创建主题

kafka-topics.sh --create --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --replication-factor 2 --partitions 3 --topic test1

  • bootstrap-server:这里的地址一般是一个集群当中的地址即可,约定是全写。
  • replication-factor 2:定义主题的副本数,2个副本,一般2、4个,副本是偶数。
  • partitions 3:定义分区数,一般3、6个

查询当前集群中的主题数

./kafka-topics.sh --list --bootstrap-server 192.168.233.61:9092

发布消息

kafka-console-producer.sh --broker-list 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --topic test1

消费消息

kafka-console-consumer.sh --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --topic test1 --from-beginning

修改主题分区数

查看分区

Leader:每一个分区都有一个Leader,领导者负责处理分区的读写操作

Replicas:副本数,0  1  2 分别对象每一个Leader

Isr:标识和哪个Leader进行同步

Partition:分区的编号

修改分区数

kafka-topics.sh --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --alter --topic test1 --partitions 6

删除主题

kafka-topics.sh --delete --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --topic test1

elk+filebeat+kafka实操

数据流向如下

这里我们结合之前ELK+filebeat实验和上面kafka实验,具体架构如下

es1:zw4,192.168.254.14

es2:zw5,192.168.254.15

logstash、kibana:mysql1,192.168.254.31

filebeat:mysql2,192.168.254.32

kafka:zw6,192.168.254.16

kafka:tomcat,192.168.254.21

kafka:mysql,192.168.254.33

1、修改filebeat配置文件 /usr/local/filebeat/filebeat.yml,更换下标签,区分下之前做的实验,具体修改如下

2、创建logstash的日志收集文件/etc/logstash/conf.d/kafka.sh

  • codec => "json":指定数据的格式是json
  • auto_offser_reset => "latest":latest,从尾部开始;earliest的话就是从头开始拉取
  • decorate_events => true:传递给es的数据额外的附加kafka的属性数据

3、启动和验证

启动logstash:logstash -f kafka.conf --path.data /opt/testa &

打开kafka集群的消费集群

kafka-topics.sh --bootstrap-server 192.168.254.16:9092,192.168.254.21:9092,192.168.254.33:9092 --topic nginx_mysql --from-beginning

启动filebeat:./filebeat -e -c filebeat.yml

显示filebeat与kafka集群连接成功

最后我们登录kibana,显示日志收集成功


http://www.kler.cn/a/471918.html

相关文章:

  • Linux服务器网络不通问题排查及常用命令使用
  • MATLAB深度学习实战文字识别
  • Springboot SAP Docker 镜像打包问题
  • 【JavaWeb】2. 通用基础代码
  • Spring Boot 项目自定义加解密实现配置文件的加密
  • 谷粒商城-高级篇-Sentinel-分布式系统的流量防卫兵
  • List ---- 模拟实现LIST功能的发现
  • 23.行号没有了怎么办 滚动条没有了怎么办 C#例子
  • IP Anycast 与 CDN
  • c/c++ 里的进程间通信 , 管道 pipe 编程举例
  • 接口项目架构流程图-thinkphp6-rabbitmq
  • QT学习十九天 QT核心机制
  • WebRtc02: WebRtc架构、目录结构、运行机制
  • Lianwei 安全周报|2024.1.7
  • 【Java】JVM内存相关笔记
  • 代理IP授权机制:保障安全与效率的双重考量
  • matlab专栏-常见问题处理
  • 青少年编程与数学 02-006 前端开发框架VUE 12课题、表单绑定
  • C#高级:递归4-根据一颗树递归生成数据列表
  • 通过 ulimit 和 sysctl 调整Linux系统性能
  • ThinkPHP 8高效构建Web应用-获取请求对象
  • VUE条件树查询 自定义条件节点
  • 【JMeter】配置元件Config Element
  • 浅谈棋牌游戏开发流程四:核心业务逻辑(二)——房间匹配与对局流程
  • Java 日期时间格式化标准
  • 【HarmonyOS NEXT】鸿蒙应用实现屏幕录制详解和源码