2、zookeeper和kafka
zookeeper
zookeeper基本知识
zookeeper:开源的分布式框架协调服务
zookeeper的工作机制
基于观察者模式设计的分布式结构,复制存储和管理架构当中的元信息,架构当中的应用接受观察者的监控,一旦数据有变化,通知对于的zookeeper,保存变化的信息。
zookeeper特点
1、最少要有3台服务器,一个领导者(leader),多个跟随者(follower)。
2、zookeeper要有半数以上的节点存货,整个架构就可以正常工作,所有都是奇数台部署
3、全局数据一致
4、数据更新的原子性,要么都成功,要么都失败。
5、实时性
zookeeper的数据结构
1、统一的命名服务,不是以IP来记录,可以用域名也可以是主机名来记录信息。
2、统一配置管理,所有的节点信息的配置要是一致的。
3、统一的集群管理,在整个分布式的环境中,必须实时的掌握每个节点的状态,如果状态发生变化,要及时更新。
zookeeper安装实操
这里我们使用三台服务器完成zookeeper架构,leader和follow是随机选举的。
实验架构如下:
zw6:192.168.254.16
tomcat:192.168.254.21
mysql:192.168.254.33
1、三台服务器都解压zookeeper包并安装java依赖环境
tar -xf apache-zookeeper-3.5.7-bin.tar.gz
apt -y install openjdk-8-jre-headless
2、三台服务器都创建配置文件zoo.cfg
3、三台服务器修改配置文件zoo.cfg
定义zookeeper的数据目录和日志目录
- server.1:指的是服务器的序号
- 3188:leader和follower之间交换信息的通信端口
- 3288:选举的端口
4、三台服务器都创建数据目录和日志目录并赋权
5、三台服务器分别在/data/目录下,创建myid文件,并且里面输入分别是1、2、3,和我们配置文件里面1、2、3对应。
6、三台主机都启动zookeeper并查看状态,这里我们mysql3服务器是leader,两外两台都是follow
kafka
消息队列
kafka消息队列:服务端向客户端发送一个指令,客户端收到指令并且通过这个指令反馈到服务端,完成一个异步方式的通信的流程。
kafka消息队列:处理大数据场景非常合适
rabbitMQ消息队列:处理小数据场景合适
activeMQ消息队列:处理小数据场景合适
消息队列的应用场景
1、异步处理:用于用户的短信验证码,邮件通知。
2、系统解耦:用于微服务架构中的服务之间通信。可以降低各个组件之间的依赖程度(耦合度),可以提高组件的灵活性和可维护性。
3、负载均衡:用于高并发系统的任务处理。消息队列把多个任务分发到多个消费者实列,如电商平台的订单系统。
4、流量控制和限流:根据api请求来进行处理。通过控制消息的生产速度和消费者的处理速度来完成限流。
- 端口:应用和应用之间通信
- api接口:应用程序内部各个组件之间的通信的方式
5、任务调度和定时任务:消息队列可以定时的进行任务调度,按照消费者的计划生产对于的数据
6、数据同步和分发:用于日志收集和数据收集。可以远程的实现数据的统一管理。
7、实时数据处理
8、备份和恢复
消息队列的模式
1、点对点:一对一,生产者生产消息,消费者消费消息,这个是一对一的。
2、发布/订阅模式:消息的生产者发布一个主题,其他的消费者订阅这个主题,从而实现一对多。
- 主题:topic
kafka的组件名称
1、主题 topic:主题是kafka数据流的一个基本的单元,类似于数据的管道,生产者将消息发布到主题,其他消费者订阅主题来消费消息,主题可以被分区,分区有偏移量。
2、生产者:将消息写入主题和分区
3、消费者:从主题和分区中接受发布的消息,一个消费者可以订阅多个主题。
4、分区:一个主题可以被分为多个分区,每个分区都是数据的有序的子集,分区越多,消费者消费的速度越快,可以避免生产者的消费堆积。分区当中有偏移量,按照偏移量进行有序存储,消费者可以独立的读写每个分区的数据。
- 如何读取分区的数据,一般是不考虑的。只有在消息出现堆积的时候,会考虑扩展分区数。
- 堆积:消费者没有及时处理掉生产者发布的消息,导致消费堆积
kafka的消费堆积出现如何处理:
1、扩展分区数
2、偏移量:消息在分区当中的唯一标识,根据偏移量指定消费者获取消息的位置。
3、经纪人zookeeper:存储分区的消息,kafka集群的元数据。
kafka数据流向
kafka安装实操
这里我们依然使用上面zookeeper实验的三台服务器完成kafka实操
kafka非常占内存,可以在安装使用前清理下内存,echo 3 > /proc/sys/vm/drop_caches
1、三台服务器解压软件包后移动至/usr/local/kafka
2、三台服务端修改kafka配置文件server.properties
首先配置三台服务器不一样的地方
- #全局唯一编号,每一台服务器都不能相同,这里我们定0、1、2即可
- #地址分别对应自己主机的IP地址
接下里都是一样的配置
3、三台服务器添加kafka全局变量
4、三台服务器一起启动kafka
./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
看端口可以看出我们kafka服务已启,并且推选了mysql3这个主机为主
kafka补充应用
创建主题
kafka-topics.sh --create --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --replication-factor 2 --partitions 3 --topic test1
- bootstrap-server:这里的地址一般是一个集群当中的地址即可,约定是全写。
- replication-factor 2:定义主题的副本数,2个副本,一般2、4个,副本是偶数。
- partitions 3:定义分区数,一般3、6个
查询当前集群中的主题数
./kafka-topics.sh --list --bootstrap-server 192.168.233.61:9092
发布消息
kafka-console-producer.sh --broker-list 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --topic test1
消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --topic test1 --from-beginning
修改主题分区数
查看分区
Leader:每一个分区都有一个Leader,领导者负责处理分区的读写操作
Replicas:副本数,0 1 2 分别对象每一个Leader
Isr:标识和哪个Leader进行同步
Partition:分区的编号
修改分区数
kafka-topics.sh --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --alter --topic test1 --partitions 6
删除主题
kafka-topics.sh --delete --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --topic test1
elk+filebeat+kafka实操
数据流向如下
这里我们结合之前ELK+filebeat实验和上面kafka实验,具体架构如下
es1:zw4,192.168.254.14
es2:zw5,192.168.254.15
logstash、kibana:mysql1,192.168.254.31
filebeat:mysql2,192.168.254.32
kafka:zw6,192.168.254.16
kafka:tomcat,192.168.254.21
kafka:mysql,192.168.254.33
1、修改filebeat配置文件 /usr/local/filebeat/filebeat.yml,更换下标签,区分下之前做的实验,具体修改如下
2、创建logstash的日志收集文件/etc/logstash/conf.d/kafka.sh
- codec => "json":指定数据的格式是json
- auto_offser_reset => "latest":latest,从尾部开始;earliest的话就是从头开始拉取
- decorate_events => true:传递给es的数据额外的附加kafka的属性数据
3、启动和验证
启动logstash:logstash -f kafka.conf --path.data /opt/testa &
打开kafka集群的消费集群
kafka-topics.sh --bootstrap-server 192.168.254.16:9092,192.168.254.21:9092,192.168.254.33:9092 --topic nginx_mysql --from-beginning
启动filebeat:./filebeat -e -c filebeat.yml
显示filebeat与kafka集群连接成功
最后我们登录kibana,显示日志收集成功