当前位置: 首页 > article >正文

日志收集Day007

1.配置ES集群TLS认证:

(1)elk101节点生成证书文件

cd /usr/share/elasticsearch
./bin/elasticsearch-certutil cert -out config/elastic-certificates.p12 -pass ""  --days 3650

(2)elk101节点为证书文件修改属主和属组
chown elasticsearch:elasticsearch config/elastic-certificates.p12

(3)elk101节点同步证书文件到其他节点

需要确保文件的属主和属组和步骤2一致
data_rsync.sh /usr/share/elasticsearch/config/elastic-certificates.p12
最好再检查一下是否同步成功,及属主信息是否都为elasticsearch

(4)elk101节点修改ES集群的配置文件

vim /etc/elasticsearch/elasticsearch.yml

#添加如下配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12

同步配置文件
data_rsync.sh /etc/elasticsearch/elasticsearch.yml

(4.5)由于我的elasticsearch是rpm安装非二进制安装,所有此处需要将证书文件移动到/etc/elasticsearch目录下。如果es集群配置文件跟证书文件同级则跳过此步骤。

cp -a /usr/share/elasticsearch/config/elastic-certificates.p12 /etc/elasticsearch/

(5)所有节点重启elasticsearch

systemctl restart elasticsearch.service

(6)生成随机密码(千万记得保存)

cd /usr/share/elasticsearch
./bin/elasticsearch-setup-passwords auto

(7)postman访问

(8)修改kibana配置的账户密码(第6步的账号密码)

vim /etc/kibana/kibana.yml

重启:systemctl restart kibana.service
使用elastic登录并修改密码

2.自定义角色使用logstash组件写入数据到ES集群

1.自定义角色(无需配置kibana权限)

2.创建用户

3.编写logstash文配置文件

在配置了TLS认证之后,logstash向es集群发送数据都需要携带用户和密码,这里配置的索引需要和步骤1索引匹配。

vim 10-file-to-es.conf

input { 
   file {
     path => "/tmp/lxc-file"
     # 指定读取文件的起始位置,但前提是该文件之前未读取过或者未在".sincedb"文件中记录。
     start_position => "beginning"
  }
}

output { 
   elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "logstash-file-lxc"
    user => "lxc2025"
    password => "123456"
  }
}

4.运行logstash

logstash -rf 10-file-to-es.conf

5.测试

3.zookeeper单点部署

1.下载zookeeper软件
https://zookeeper.apache.org/releases.html
2.解压软件包
tar xf apache-zookeeper-3.8.0-bin.tar.gz -C /app/softwares/
3.创建符号链接
cd /app/softwares/ && ln -sv apache-zookeeper-3.8.0-bin zk
4.声明zk的环境变量
cat > /etc/profile.d/kafka.sh <<'EOF'
export ZK_HOME=/app/softwares/zk
export PATH=$PATH:$ZK_HOME/bin
EOF
source /etc/profile.d/kafka.sh
5.创建zk的配置文件
cp /app/softwares/zk/conf/{zoo_sample.cfg,zoo.cfg}
6.启动zk节点
zkServer.sh start
补充:zkServer.sh stop       zkServer.sh status           zkServer.sh restart
7.连接节点
zkCli.sh

4.zookeeper集群部署

1.创建zk的数据目录
install -d /lxc/data/zk
2.修改单点zk的配置文件
修改配置文件,可以将配置文件内容替换为如下内容(下面数据目录需要和步骤一创建的一致)
vim /app/softwares/zk/conf/zoo.cfg

# 定义最小单元的时间范围tick。
tickTime=2000
# 启动时最长等待tick数量。
initLimit=5
# 数据同步时最长等待的tick时间进行响应ACK
syncLimit=2
# 指定数据目录
dataDir=/lxc/data/zk
# 监听端口
clientPort=2181
# 开启四字命令允许所有的节点访问。
4lw.commands.whitelist=*
# server.ID=A:B:C[:D]
# ID:zk的唯一编号。
# A:zk的主机地址。
# B:leader的选举端口,是谁leader角色,就会监听该端口。
# C:数据通信端口。
# D:可选配置,指定角色。
server.101=10.0.0.101:2888:3888
server.102=10.0.0.102:2888:3888
server.103=10.0.0.103:2888:3888

3.同步数据(这里使用脚本同步,没有脚本可以使用scp命令)
data_rsync.sh /app/softwares/zk/
data_rsync.sh /app/softwares/apache-zookeeper-3.8.0-bin/
data_rsync.sh /lxc/data/zk/
data_rsync.sh /etc/profile.d/kafka.sh

4.创建myid文件
for ((host_id=101;host_id<=103;host_id++)) do ssh 10.0.0.${host_id} "echo ${host_id} > /lxc/data/zk/myid";done

5.所有节点启动zk服务
source /etc/profile.d/kafka.sh
zkServer.sh start

6.查看zk的角色状态
zkServer.sh status

5.编写zk的集群管理脚本
vim /usr/local/sbin/zkManager.sh

#!/bin/bash

#判断用户是否传参
if [ $# -ne 1 ];then
    echo "无效参数,用法为: $0  {start|stop|restart|status}"
    exit
fi

#获取用户输入的命令
cmd=$1

#定义函数功能
function zookeeperManger(){
    case $cmd in
    start)
        echo "启动服务"        
        remoteExecution start
        ;;
    stop)
        echo "停止服务"
        remoteExecution stop
        ;;
    restart)
        echo "重启服务"
        remoteExecution restart
        ;;
    status)
        echo "查看状态"
        remoteExecution status
        ;;
    *)
        echo "无效参数,用法为: $0  {start|stop|restart|status}"
        ;;
    esac
}


#定义执行的命令
function remoteExecution(){
    for (( i=101 ; i<=103 ; i++ )) ; do
            tput setaf 2
            echo ========== 10.0.0.${i} zkServer.sh  $1 ================
            tput setaf 9
            ssh 10.0.0.${i}  "source /etc/profile.d/kafka.sh; zkServer.sh $1 2>/dev/null"
    done
}
#调用函数
zookeeperManger

添加权限:chmod +x /usr/local/sbin/zkManager.sh
管理zk集群:
zkManager.sh start
zkManager.sh stop
zkManager.sh restart
zkManager.sh status

6.zookeeper的命令行基本管理:
查看

ls /
    查看根(/)下有多少子zookeeper node,简称"znode"。    
get /aa
    查看"/aa"的数据。

创建
create /aa
    在根路径下创建一个名为"aa"的"znode"。
create -s -e /aa
    在根路径下创建一个名为"aa"有序编号(-s)的临时"znode"(-e),会话结束,自动删除。
create /aa  123 
    在根路径下创建一个名为"aa"的"znode",并指定其数据为123。

修改
set /aa  456
    将名为"/aa"的"znode",的数据修改为456。

删除
delete /aa
删除名为"/aa"的"znode",aa下面不能有子znode,必须为空
deleteall /aa
递归删除名为"/aa"下的所有"znode"
 

7.搭建kafka单点环境

(1)下载kafka软件包
(2)解压软件包:  tar xf kafka_2.13-3.2.1.tgz -C /app/softwares/
(3)创建符号连接:  cd /app/softwares/ && ln -svf kafka_2.13-3.2.1 kafka
(4)配置环境变量:vim /etc/profile.d/kafka.sh
加入如下内容:

export KAFKA_HOME=/app/softwares/kafka
export PATH=$PATH:$KAFKA_HOME/bin

(5)source /etc/profile.d/kafka.sh
  (6)修改配置文件: vim /app/softwares/kafka/config/server.properties

broker.id=101
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/kafka321

(7)连接zk节点,验证

zkCli.sh
ls /kafka321


http://www.kler.cn/a/522011.html

相关文章:

  • 剑指 Offer II 007. 数组中和为 0 的三个数
  • RocketMQ 中如何实现消息的可靠传递?
  • 编译dpdk19.08.2中example时一系列报错解决
  • 《深度剖析Q-learning中的Q值:解锁智能决策的密码》
  • Qt Ribbon使用实例
  • 分布式版本控制系统:Git
  • RK3588平台开发系列讲解(ARM篇)ARM64底层中断处理
  • 一文讲解Java中的equals和hashCode方法
  • VSCode 设置为中文(Configure Display Language)
  • HarmonyOS:ForEach:循环渲染
  • HPO3:提升模型性能的高效超参数优化工具
  • 24小R的随机播放顺序
  • 使用TensorFlow实现逻辑回归:从训练到模型保存与加载
  • 信息学奥赛一本通 2110:【例5.1】素数环
  • 2025数学建模美赛|A题成品论文
  • 神经网络|(六)概率论基础知识-全概率公式
  • 爱快 IK-X9 吸顶AP 简单开箱评测和拆解,三频WiFi7,BE5000,2.5G网口
  • Continuous Batching 连续批处理
  • 基于ESP8266的多功能环境监测与反馈系统开发指南
  • 嵌入式C语言:结构体
  • KF-GINS 和 OB-GINS 的 Earth类 和 Rotation 类
  • 安卓日常问题杂谈(一)
  • Java-数据结构-二叉树习题(3)
  • 落地 基于特征的对象检测
  • leetcode 面试经典 150 题:简化路径
  • 鲁滨逊漂流记读后感