【微服务即时通讯系统】——etcd一致性键值存储系统,etcd的介绍,etcd的安装,etcd使用和功能测试
文章目录
- etcd
- 1. etcd的介绍
- 1.1 etcd的概念
- 2. etcd的安装
- 2.1 安装etcd
- 2.2 安装etcd客户端C/C++开发库
- 3. etcd使用
- 3.1 etcd接口介绍
- 4. etcd使用测试
- 4.1 原生接口使用测试
- 4.2 封装etcd使用测试
etcd
1. etcd的介绍
1.1 etcd的概念
Etcd 是一个基于GO实现的 分布式、高可用、一致 的 一致性键值存储系统, 用于配置共享和服务发现等。它使用 Raft 一致性算法来保持集群数据的一致性,且客户端通过长连接watch 功能,能够及时收到数据变化通知,相较于 Zookeeper 框架更加轻量化。
为什么需要 etcd?
管理共享配置信息:etcd 可以将配置信息集中存储,服务端将配置信息存储于 etcd 中,客户端可以通过 etcd 方便地获取这些配置信息。
服务发现:etcd 可以作为服务注册中心,将服务信息注册到 etcd 中,其他服务可以通过查询 etcd 来获取这些信息。且当服务发生变化时,etcd 可以及时通知,实现服务的自动发现和动态调整。
防止单点故障:为了防止单点故障,可以启动多个 etcd 组成集群。etcd 集群使用 raft 一致性算法来处理日志复制,保证多节点数据的强一致性。
etcd 使用 raft 算法来实现数据的一致性:Raft 算法用于分布式系统,包含主节点选举和数据更新两部分。主节点选举中,从节点在未收到主节点心跳包时可成为候选主节点发起投票,获超半数响应则成为新主节点。数据更新分两阶段,先主节点记录并复制日志,超半数响应后通知客户端,再主节点提交修改并通知从节点提交。
2. etcd的安装
2.1 安装etcd
这是在 Linux 系统(ubuntu) 上安装 Etcd 的基本步骤:
安装 Etcd:
sudo apt-get install etcd
启动 Etcd 服务:
sudo systemctl start etcd
设置 Etcd 开机自启:
sudo systemctl enable etcd
运行验证:
etcdctl put mykey "this is awesome"
如果出现报错:No help topic for 'put'
则 sudo vi /etc/profile
在末尾声明环境变量 ETCDCTL_API=3
以确定 etcd 版本:
export ETCDCTL_API=3
完毕后,加载配置文件,并重新执行测试指令:
dev@Crocodile:~/workspace$ source /etc/profile
dev@Crocodile:~/workspace$ etcdctl put mykey "this is awesome"
OK
dev@Crocodile:~/workspace$ etcdctl get mykey
mykey
this is awesome
dev@Crocodile:~/workspace$ etcdctl del mykey
2.2 安装etcd客户端C/C++开发库
etcd 由 golang 编写,v3 版本使用 grpc API(HTTP2+protobuf)通信,官方仅维护了 go 语言的 client 库。若要使用 C/C++ 语言,需寻找非官方的 etcd client 开发库。
etcd-cpp-apiv3 是一个 etcd 的 C++版本客户端 API。它依赖于 mipsasm, boost, protobuf, gRPC, cpprestsdk 等库。
依赖安装:
sudo apt-get install libboost-all-dev libssl-dev
sudo apt-get install libprotobuf-dev protobuf-compiler-grpc
sudo apt-get install libgrpc-dev libgrpc++-dev
sudo apt-get install libcpprest-dev
api 框架安装:
git clone https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git
cd etcd-cpp-apiv3
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/usr
make -j$(nproc) && sudo make install
3. etcd使用
etcd工作原理:
etcd 是分布式高可用的 一致性键值存储系统 用于配置共享和服务发现(Raft 一致性算法来保持集群数据的一致性,且客户端通过长连接 watch 功能,能够及时收到数据变化通知,相较于 Zookeeper 框架更加轻量化)
先创建一个etcd服务器存储键值对数据, 主机1可以向服务器进行服务注册,该注册是以一个键值对存储,保存了服务和主机的地址端口,主机2可以获取又主机1向服务器中添加的服务数据, 同时主机1对这个服务有一个长连接保活(服务器每个3s进行保活确认),如果服务下线了,etcd服务器就会通知主机2。
etcd 服务器有设置租约的功能 在上面介绍了 client和etcd服务器直接有一个长连接保活(KeepAlive) 这个长连接可以Lease()获取一个租约, 同时这个租约和client提供服务设置的租约id一致, etcd 服务器和client直接通过KeepALive长连接保活 ,如果长连接断开,那么client的键值对数据就无法获取到租约, etcd就会将没有租约的数据进行删除。
租约机制类似房租,没有租约就是房子属于自己,有租约就要定时通过房东续租,否则无法租房。
3.1 etcd接口介绍
//pplx::task 并行库异步结果对象
//阻塞方式 get(): 阻塞直到任务执行完成,并获取任务结果
//非阻塞方式 wait(): 等待任务到达终止状态,然后返回任务状态
namespace etcd {
class Value {
bool is_dir();//判断是否是一个目录
std::string const& key() //键值对的 key 值
std::string const& as_string()//键值对的 val 值
int64_t lease() //用于创建租约的响应中,返回租约 ID
}
//etcd 会监控所管理的数据的变化,一旦数据产生变化会通知客户端
//在通知客户端的时候,会返回改变前的数据和改变后的数据
class Event {
enum class EventType {
PUT, //键值对新增或数据发生改变
DELETE_,//键值对被删除
INVALID,
};
enum EventType event_type()
const Value& kv()
const Value& prev_kv()
}
class Response {
bool is_ok()
std::string const& error_message()
Value const& value()//当前的数值 或者 一个请求的处理结果
Value const& prev_value()//之前的数值
Value const& value(int index)//
std::vector<Event> const& events();//触发的事件
}
class KeepAlive {
KeepAlive(Client const& client, int ttl, int64_t lease_id =
0);
//返回租约 ID
int64_t Lease();
//停止保活动作
void Cancel();
}
class Client {
// etcd_url: "http://127.0.0.1:2379"
Client(std::string const& etcd_url,
std::string const& load_balancer = "round_robin");
//Put a new key-value pair 新增一个键值对
pplx::task<Response> put(std::string const& key,
std::string const& value);
//新增带有租约的键值对 (一定时间后,如果没有续租,数据自动删除)
pplx::task<Response> put(std::string const& key,
std::string const& value,
const int64_t leaseId);
//获取一个指定 key 目录下的数据列表
pplx::task<Response> ls(std::string const& key);
//创建并获取一个存活 ttl 时间的租约
pplx::task<Response> leasegrant(int ttl);
//获取一个租约保活对象,其参数 ttl 表示租约有效时间
pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int
ttl);
//撤销一个指定的租约
pplx::task<Response> leaserevoke(int64_t lease_id);
//数据锁
pplx::task<Response> lock(std::string const& key);
}
class Watcher {
Watcher(Client const& client,
std::string const& key, //要监控的键值对 key
std::function<void(Response)> callback, //发生改变后的回调
bool recursive = false); //是否递归监控目录下的所有数据改变
Watcher(std::string const& address,
std::string const& key,
std::function<void(Response)> callback,
bool recursive = false);
//阻塞等待,直到监控任务被停止
bool Wait();
bool Cancel();
}
4. etcd使用测试
4.1 原生接口使用测试
put.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <thread>
int main(int argc, char *argv[])
{
std::string etcd_host="http://127.0.0.1:2379";
// 实例化客户端对象
etcd::Client client(etcd_host);
// 获取租约保活对象,伴随着创建一个指定有效时长的租约
auto keep_alive=client.leasekeepalive(3).get();
// 获取租约id
auto lease_id=keep_alive->Lease();
// 向ectd新增数据
auto resp1=client.put("/service/user","127.0.0.1:8080",lease_id).get();
if(resp1.is_ok()==false)
{
std::cout<<"新增数据失败: "<<resp1.error_message()<<std::endl;
return -1;
}
auto resp2=client.put("/service/friend","127.0.0.1:9090").get();
if(resp2.is_ok()==false)
{
std::cout<<"新增数据失败: "<<resp2.error_message()<<std::endl;
return -1;
}
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
get.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <thread>
// 监控事件发生的回调函数
void callback(const etcd::Response &resp)
{
if(resp.is_ok()==false)
{
std::cout<<"收到一个错误的事件通知:"<<resp.error_message()<<std::endl;
return;
}
for(auto const& ev:resp.events())
{
if(ev.event_type()==etcd::Event::EventType::PUT)
{
std::cout<<"数据发生了改变:\n";
std::cout<<"当前的值:"<<ev.kv().key()<<"-"<<ev.kv().as_string()<<std::endl;
std::cout<<"原来的值:"<<ev.prev_kv().key()<<"-"<<ev.prev_kv().as_string()<<std::endl;
}
else if(ev.event_type()==etcd::Event::EventType::DELETE_)
{
std::cout<<"服务信息下线被删除:\n";
std::cout<<"当前的值:"<<ev.kv().key()<<"-"<<ev.kv().as_string()<<std::endl;
std::cout<<"原来的值:"<<ev.prev_kv().key()<<"-"<<ev.prev_kv().as_string()<<std::endl;
}
}
}
int main(int argc, char* argv[])
{
std::string etch_host="http://127.0.0.1:2379";
// 实例化客户端对象
etcd::Client client(etch_host);
// 获取指定的键值对信息
// 初次先用 ls 获取所有能够提供指定服务/service的实例信息
auto resp=client.ls("/service").get();
if(resp.is_ok()==false)
{
std::cout<<"获取键值对数据失败:"<<resp.error_message()<<std::endl;
return -1;
}
int sz=resp.keys().size();
for(int i=0;i<sz;++i) // 遍历所有可以提供的服务
{
std::cout<<resp.value(i).as_string()<<"可以提供"<<resp.key(i)<<"服务\n";
}
// 实例化一个键值对事件监控对象
auto watcher=etcd::Watcher(client,"/service",callback,true); // true递归监控路径下所有函数
watcher.Wait(); // 开始事件监控
return 0;
}
makefile
all:put get
put:put.cc
g++ -std=c++17 $^ -o $@ -letcd-cpp-api -lcpprest
get:get.cc
g++ -std=c++17 $^ -o $@ -letcd-cpp-api -lcpprest
4.2 封装etcd使用测试
etcd.hpp
#pragma once
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <functional>
#include "logger.hpp"
// 封装etcd-client-api 实现两种类型的客户端
// 1. 服务注册客户端:向服务器新增服务信息数据,并进行保活
// 2. 服务发现客户端:从服务器查找服务信息数据,并进行改变世界监控
// etcd本质是一个键值存储系统,并不是专门用于作为注册中心进行服务注册和发现的
// 实现:
// 1. 封装服务注册客户端类,提供接口向服务器新增数据进行保活
// 参数:注册中心地址(etcd服务器地址) 新增的服务信息(服务名-主机地址键值对)
// 2. 封装服务发现客户端类,提供服务上线事件接口,服务下线事件接口,一个设置根目录接口
namespace wh_im{
//服务注册客户端类
class Registry
{
public:
using ptr = std::shared_ptr<Registry>;
// 服务注册客户端类初始化
Registry(const std::string &host):
_client(std::make_shared<etcd::Client>(host)) ,
_keep_alive(_client->leasekeepalive(3).get()),
_lease_id(_keep_alive->Lease()){}
~Registry()
{
_keep_alive->Cancel();
}
// 注册服务键值对,val路径可以提供的服务key
bool registry(const std::string &key, const std::string &val)
{
// 进行服务键值对注册
auto resp = _client->put(key, val, _lease_id).get();
if (resp.is_ok() == false)
{
LOG_ERROR("注册数据失败:{}", resp.error_message());
return false;
}
return true;
}
private:
std::shared_ptr<etcd::Client> _client; // 客户端
std::shared_ptr<etcd::KeepAlive> _keep_alive; // 长连接
uint64_t _lease_id; // 租约id
};
//服务发现客户端类
class Discovery
{
public:
using ptr = std::shared_ptr<Discovery>;
using NotifyCallback = std::function<void(std::string, std::string)>;
// 服务发现客户端类初始化
Discovery(const std::string &host, // 注册中心地址
const std::string &basedir, // 基础根目录
const NotifyCallback &put_cb, // 上线通知处理回调函数
const NotifyCallback &del_cb): // 下线通知处理回调函数
_client(std::make_shared<etcd::Client>(host)) , // 构造客户端对象
_put_cb(put_cb), _del_cb(del_cb){
//先进行服务发现,先获取到当前已有的数据
auto resp = _client->ls(basedir).get();
if (resp.is_ok() == false)
{
LOG_ERROR("获取服务信息数据失败:{}", resp.error_message());
}
int sz = resp.keys().size();
for (int i = 0; i < sz; ++i)
{
if (_put_cb) _put_cb(resp.key(i), resp.value(i).as_string());
}
// 然后进行事件监控,监控数据发生的改变并调用回调进行处理
_watcher = std::make_shared<etcd::Watcher>(*_client.get(), basedir,
std::bind(&Discovery::callback, this, std::placeholders::_1), true);
}
~Discovery()
{
_watcher->Cancel();
}
private:
// 设置回调函数
void callback(const etcd::Response &resp)
{
if (resp.is_ok() == false)
{
LOG_ERROR("收到一个错误的事件通知: {}", resp.error_message());
return;
}
for (auto const& ev : resp.events())
{
if (ev.event_type() == etcd::Event::EventType::PUT)
{
if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());
LOG_DEBUG("新增服务:{}-{}", ev.kv().key(), ev.kv().as_string());
}
else if (ev.event_type() == etcd::Event::EventType::DELETE_)
{
if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());
LOG_DEBUG("下线服务:{}-{}", ev.prev_kv().key(), ev.prev_kv().as_string());
}
}
}
private:
NotifyCallback _put_cb; // 上线通知回调函数
NotifyCallback _del_cb; // 下线通知回调函数
std::shared_ptr<etcd::Client> _client; // 客户端
std::shared_ptr<etcd::Watcher> _watcher; // Watcher对象
};
} // end namespace
registry.cc
#include "../../common/logger.hpp"
#include "../../common/etcd.hpp"
#include <gflags/gflags.h>
#include <thread>
DEFINE_bool(run_mode, false, "程序的运行模式, false-调试; true-发布;");
DEFINE_string(log_file,"","发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level,0,"发布模式下,用于指定日志输出等级");
DEFINE_string(etch_host,"http://127.0.0.1:2379","服务注册中心地址");
DEFINE_string(base_service,"/service","服务监控根目录");
DEFINE_string(instance_name,"/user/instance1","当前实例名称");
DEFINE_string(access_host,"127.0.0.1:8880","当前实例的外部访问地址");
int main(int argc, char *argv[])
{
google::ParseCommandLineFlags(&argc, &argv, true);
wh_im::init_logger(FLAGS_run_mode,FLAGS_log_file,FLAGS_log_level);
wh_im::Registry::ptr rclient=std::make_shared<wh_im::Registry>(FLAGS_etch_host);
LOG_DEBUG("服务名称:{}",FLAGS_base_service+FLAGS_instance_name);
rclient->registry(FLAGS_base_service+FLAGS_instance_name,FLAGS_access_host);
std::this_thread::sleep_for(std::chrono::seconds(600));
return 0;
}
discovery.cc
#include "../../common/logger.hpp"
#include "../../common/etcd.hpp"
#include <gflags/gflags.h>
#include <thread>
DEFINE_bool(run_mode, false, "程序的运行模式, false-调试; true-发布;");
DEFINE_string(log_file,"","发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level,0,"发布模式下,用于指定日志输出等级");
DEFINE_string(etch_host,"http://127.0.0.1:2379","服务注册中心地址");
DEFINE_string(base_service,"/service","服务监控根目录");
void online(const std::string &service_name, const std::string &service_host)
{
LOG_DEBUG("上线服务:{}-{}",service_name,service_host);
}
void offline(const std::string &service_name, const std::string &service_host)
{
LOG_DEBUG("下线服务:{}-{}",service_name,service_host);
}
int main(int argc, char *argv[])
{
google::ParseCommandLineFlags(&argc, &argv, true);
wh_im::init_logger(FLAGS_run_mode,FLAGS_log_file,FLAGS_log_level);
wh_im::Discovery::ptr rclient=std::make_shared<wh_im::Discovery>(FLAGS_etch_host,
FLAGS_base_service,online,offline);
std::this_thread::sleep_for(std::chrono::seconds(600));
return 0;
}
makefile
all:discovery registry
discovery:discovery.cc
g++ -std=c++17 $^ -o $@ -lspdlog -lfmt -lgflags -letcd-cpp-api -lcpprest
registry:registry.cc
g++ -std=c++17 $^ -o $@ -lspdlog -lfmt -lgflags -letcd-cpp-api -lcpprest