etcd二次封装
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 一、etcd封装
- 二、封装的思想
- 1.封装服务注册客户端类
- 2.封装服务发现客户端类
- 三.简单使用
- 1.服务注册客户端
- 2.服务发现客户端
一、etcd封装
1.服务注册客户端:向服务器新增服务信息数据,并进行保活
2.服务发现客户端:从服务器查找服务信息数据,并进行改变事件监控
封装的时候,我们尽量减少模块之间的耦合度,本质上etcd是一个键值存储系统,并不是专门用于作为注册中心进行服务注册和发现的。
二、封装的思想
1.封装服务注册客户端类
提供一个接口:向服务器新增数据并进行保活
参数:注册中心地址(etcd服务器地址),
新增的服务信息(服务名-主机地址键值对)
成员变量有三个,一个etcd::CLient客户端,一个keep_Alive保活对象,一个lease_id租约Id
构造函数需要提供etcd注册中心的地址,这样才能连接服务器。在构造函数初始化列表中初始化三个成员变量,这里的保活事件设置的3秒。
需要提供一个接口,用于服务注册,该接口有两个参数,一个std::string&key,一个是std::string& val。
接口内部调用put方法向服务器添加kv数据,并添加上续约Id。
class Registry
{
public:
using ptr = std::shared_ptr<Registry>;
Registry(const std::string& host)
:_client(std::make_shared<etcd::Client>(host)),
_keep_alive(_client->leasekeepalive(3).get()),
_lease_id(_keep_alive->Lease())
{
}
~Registry(){
//取消保活
_keep_alive->Cancel();
}
//服务注册接口
bool registry(const std::string& key,const std::string& val)
{
auto resp = _client->put(key,val,_lease_id).get();
if(resp.is_ok() == false){
LOG_ERROR("注册数据失效: {}",resp.error_message());
return false;
}
return true;
}
private:
std::shared_ptr<etcd::Client> _client; //客户端
std::shared_ptr<etcd::KeepAlive> _keep_alive; //保活对象
uint64_t _lease_id; //租约Id
};
2.封装服务发现客户端类
提供两个设置回调函数的接口:服务上线事件接口(数据新增),服务下线事件接口(数据删除)
提供一个设置根目录的接口:用于获取指定目录下的数据以及监控目录下数据的改变
该类成员变量有四个,两个用使用者设置的回调函数,一个client用户连接服务器,一个watcher用于监控数据的改变.
该类不需要提供对外接口,在构造函数中,需要传递四个参数,分别是两个回调函数,一个etcd服务器的地址用于连接服务器,还有一个是x需要监控的服务名称。
构造函数中会先更具参数中的basedir,获取当前服务器该basedir已有的数据.然后通过watcher监控basedir的改变,分别处理新增和删除的事件,事件发生时,调用用户设置的对应的回调函数。
需要注意的是,在构造时会先进行服务获取,调用了ls()这个接口,而不是get()。这个接口的意思是获取服务路径下所有服务。
打个比方,服务注册客户端注册了三个服务,分别是"/service/friend",“/service/user”,“/service/message”。我们使用ls进行服务发现需要提供一个key,我们提供“/service”,就会把这三个服务数据都获取到。
在watcher构造时,传入了四个参数,第一个参数时client对象的实例,第二个参数时监控的服务路径,第三个参数是一个回调函数,当监控的服务数据触发事件,就会调用这个回调函数,第四个参数是是否要递归监控服务路劲下所有服务。我们这是是true。
在这个回调函数中会遍历触发的事件,如果是新增事件则调用用户设置的put_cb,如果是删除事件,则调用del_cb;
/服务发现客户端类
class Discovery
{
public:
using ptr = std::shared_ptr<Discovery>;
//服务发现使用者需要提供两个回调函数,当监控的数据发送改变时,内部会自动调用对应的回调函数
using NotifyCallback = std::function<void(std::string,std::string)>;
Discovery(const std::string& host,const std::string& basedir,const NotifyCallback& put_cb,const NotifyCallback& del_cb)
:_client(std::make_shared<etcd::Client>(host)),
_put_cb(put_cb),_del_cb(del_cb)
{
//1.先进行服务发现,获取到当前已有的数据
auto resp = _client->ls(basedir).get();
if (resp.is_ok() == false) {
LOG_ERROR("获取服务信息数据失败:{}", resp.error_message());
}
//可能会有多个结果
int sz = resp.keys().size();
for (int i = 0; i < sz; ++i) {
if (_put_cb) _put_cb(resp.key(i), resp.value(i).as_string());
}
//2.然后进行事件监控,监控数据发生的改变,并调用回调进行处理
_watcher = std::make_shared<etcd::Watcher>(*_client.get(),basedir,std::bind(&Discovery::callback,this,std::placeholders::_1),true);
// _watcher->Wait();
}
~Discovery() {
//取消监控
_watcher->Cancel();
}
private:
void callback(const etcd::Response &resp) {
if (resp.is_ok() == false) {
LOG_ERROR("收到一个错误的事件通知: {}", resp.error_message());
return;
}
for (auto const& ev : resp.events()) {
if (ev.event_type() == etcd::Event::EventType::PUT) {
if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());
LOG_DEBUG("新增服务:{}-{}", ev.kv().key(), ev.kv().as_string());
}else if (ev.event_type() == etcd::Event::EventType::DELETE_) {
if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());
LOG_DEBUG("下线服务:{}-{}", ev.prev_kv().key(), ev.prev_kv().as_string());
}
}
}
private:
NotifyCallback _put_cb;
NotifyCallback _del_cb;
std::shared_ptr<etcd::Client> _client; //客户端
std::shared_ptr<etcd::Watcher> _watcher; //服务监控对象
};
三.简单使用
1.服务注册客户端
注册两个服务,“service.friend"和”.service.user"。线程休眠10秒后退出。
#include "../common/etcd.hpp"
#include <thread>
int main()
{
init_logger(false,"",0);
//创建一个服务注册客户端
Registry::ptr rClient = std::make_shared<Registry>("127.0.0.1:2379");
rClient->registry("/service/friend","192.168.1.1:8080");
rClient->registry("/service/user","192.168.1.2:8080");
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
2.服务发现客户端
服务发现客户端不需要调用接口,因为在创建服务发现客户端时构造函数中就已经使用watcher监控了basedir服务。
这个basedir就是这里传入的参数“/service”。
#include "../common/etcd.hpp"
void putHandle(std::string service_name,std::string host_name)
{
LOG_DEBUG("新增服务:{}-{}", service_name, host_name);
}
void delHandle(std::string service_name,std::string host_name)
{
LOG_DEBUG("下线服务:{}-{}", service_name, host_name);
}
int main()
{
init_logger(false,"",0);
//创建一个服务发现客户端
Discovery::ptr dClient = std::make_shared<Discovery>("127.0.0.1:2379","/service",putHandle,delHandle);
std::this_thread::sleep_for(std::chrono::seconds(600));
return 0;
}