ETCD的封装和测试
etcd是存储键值数据的服务器
客户端通过长连接watch实时更新数据
场景:
当主机A给服务器存储 name: 小王
主机B从服务器中查name ,得到name-小王
当主机A更改name 小李
服务器实时通知主机B name 已经被更改成小李了。
应用:服务注册与发现
更改配置
若需要修改,则可以配置:/etc/default/etcd
sudo vi /etc/profile
在最后加上 export ETCDCTL_API=3 来确定etcd的版本 (每个终端都要设置,报错了可以尝试查看该终端上是否更改了etcd的版本)
使用
1.启动 sudo systemctl start etcd
source /etc/profile
2.添加 etcdctl put
3.查找 etcdctl get
4.删除 etcdctl del
一些接口的使用
使用样例
put:
#include<iostream>
#include<etcd/Client.hpp>
#include<etcd/KeepAlive.hpp>
#include<etcd/Response.hpp>
#include<etcd/Value.hpp>
#include<etcd/Watcher.hpp>
int main(int argc,char * argv[])
{
//创建Client
const string Host_url="http://127.0.0.1:2379";
etcd::Client client(Host_url); //用url初始化客户端
//获取lease_id
//keep_alive是一个保活对象,leasekeepalive(5),指的是生命周期为5s,
//leasekeepalive()函数返回一个异步操作的对象,get()函数是指等待该异步对象操作完成,操作失败会抛出异常
auto keep_alive=client.leasekeepalive(5).get(); //keep_alive是一个
//获取租约的id,用于put
auto leaseid=keep_alive->Lease();
cout<<leaseid<<endl;
//插入键值,传入key-value 以及leaseid,put操作返回异步对象
auto ret=client.put("/service/user","127.0.0.1:8080",leaseid).get();
if(ret.is_ok()==false)
{
cout<<"插入键值失败了"<<endl;
exit(1);
}
auto ret2=client.put("/service/friend","127.0.0.1:8081",leaseid).get();
if(ret.is_ok()==false)
{
cout<<"插入键值失败了"<<endl;
exit(1);
}
//暂停该进程
std::this_thread::sleep_for(std::chrono::seconds(10));
}
源码:
初始化
获取保活对象,异步对象
get:
#include<iostream>
#include<etcd/Client.hpp>
#include<etcd/KeepAlive.hpp>
#include<etcd/Response.hpp>
#include<etcd/Value.hpp>
#include<etcd/Watcher.hpp>
//自己设置的回调函数。
//
void cback(const etcd::Response& re)
{
//照搬txt目录下的 watch检查是否出错
if (re.error_code()) {
std::cout << "Watcher " << re.watch_id() << " fails with "
<< re.error_code() << ": " << re.error_message() << std::endl;}
//源码 class Event {
// public:
// enum class EventType {
// PUT,
// DELETE_,
// INVALID,
// };
//匹配事件
for(const auto& e:re.events())
{
if(e.event_type()==etcd::Event::EventType::PUT)
{
cout<<"你的key-value已经发生了改变"<<endl;
cout<<"之前的key::"<<e.prev_kv().key()<<"-value"<<e.prev_kv().as_string()<<endl;
cout<<"之前的key::"<<e.kv().key()<<"-value"<<e.kv().as_string()<<endl;
}
else if(e.event_type()==etcd::Event::EventType::DELETE_)
{
cout<<"你的value已经被删除了"<<endl;
cout<<"之前的key::"<<e.prev_kv().key()<<"-value:"<<e.prev_kv().as_string()<<endl;
}
}
}
int main()
{
const string Host_url="http://127.0.0.1:2379";
//根据库中指定url初始化客户端
etcd::Client client(Host_url);
//ls是指获取该/service key值下的所有value,在路径查找中通常只需要设置一个目录即可找到该目录下全部value值。返回异步对象
auto resp=client.ls("/service").get();
if(resp.is_ok()==false)
{
cout<<"获取信息无效"<<endl;
exit(1);
}
//获取keys,指的是符合/service下的文件名个数,以便于遍历
auto sz=resp.keys().size();
cout<<sz<<endl;
for(int i=0;i<sz;i++)
{
cout<<resp.value(i).as_string()<<"可以提供"<<resp.key(i)<<"服务"<<endl;
}
//监控装置,监视/service下的所有key-value,通常只监控它是否修改和是否删除
//需要自己设置cback回调函数
auto watcher=etcd::Watcher(client, "/service",
cback, true);
watcher.Wait();//等待。相当于启动监听装置
return 0;
}
源码:
tips:txt中有各种test的使用样例
封装客户端
二次封装:封装etcd-client-api,
实现两种类型的客户端
1.服务注册客户端:向服务器新增服务信息数据,并进行保活
2.服务发现客户端:从服务器查找服务信息数据,并进行改变事件监控封装的时候,我们尽量减少模块之间的耦合度,本质上etcd是一个键值存储系统,并不是专门用于作为注册中心进行服务注册和发现的。
封装思想:
1.封装服务注册客户端类提供一个接口:向服务器新增数据并进行保活参数:注册中心地址(etcd服务器地址),新增的服务信息(服务名-主机地址键值对)封装服务发现客户端类
服务下线事件接口(数据删除)
2.封装服务发现客户端类
提供两个设置回调函数的接口:服务上线事件接口(数据新增),服务下线事件接口(数据删除)
代码
#pragma once
#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include "../common/logger.hpp"
#include <functional>
namespace common{
class Rigistry
{
public:
using ptr=std::shared_ptr<Rigistry>;
Rigistry(const string & Host)
: _client(std::make_shared<etcd::Client>(Host)), _keep_alive(_client->leasekeepalive(5).get()), _leaseid(_keep_alive->Lease())
{
}
~Rigistry() { _keep_alive->Cancel(); }
bool registry(const std::string& service, const std::string& host)
{
auto ret = _client->put(service, host, _leaseid).get();
if (ret.is_ok() == false)
{
LOG_ERROR("客户端服务注册失败,{}", ret.error_message());
return false;
}
return true;
}
private:
std::shared_ptr<etcd::Client> _client;
std::shared_ptr<etcd::KeepAlive> _keep_alive;
int64_t _leaseid;
};
class Discovery
{
public:
using ptr=std::shared_ptr<Discovery>;
using NotifyCallback = std::function<void(const std::string&, const std::string&)>;
Discovery(const std::string &Host,
const std::string &basedir,
const NotifyCallback& put_cb,
const NotifyCallback& del_cb
)
//在 Discovery 对象构造的过程中,online 和 offonline 会发生隐式转换 转换成NotifyCallback类型
//因此得+const & 或者值传递的方式
: _put_cb(put_cb), _del_cb(del_cb),
_client(std::make_shared<etcd::Client>(Host))
{
auto resp = _client->ls(basedir).get();
if (resp.is_ok() == false)
{
LOG_ERROR("客户端服务获取信息失败,{}", resp.error_message());
}
auto sz = resp.keys().size();
for (int i = 0; i < sz; i++)
{
if(put_cb)
{
put_cb(resp.key(i),resp.value(i).as_string());
LOG_DEBUG("新增服务:{}-{}",resp.key(i),resp.value(i).as_string());
}
}
_watcher=(std::make_shared<etcd::Watcher>(*_client.get(), basedir, std::bind(&Discovery::cback, this, std::placeholders::_1), true));
// Watcher(Client const& client, 。。。要求传入client,
// 我们的_client被用shared_ptr封装了起来,得解引用 + get();
}
private:
void cback(const etcd::Response &re)
{
if (re.is_ok()==false)
{
std::cout <<"收到一个错误的事间通知"<<
re.error_message() << std::endl;
LOG_ERROR("客户端服务回调函数信息失败,{}", re.error_message());
}
for (const auto &e : re.events())
{
if (e.event_type() == etcd::Event::EventType::PUT)
{
if(_put_cb)
{
_put_cb(e.kv().key(),e.kv().as_string());
}
LOG_DEBUG("新增服务:{}-{}",e.kv().key(),e.kv().as_string());
}
else if (e.event_type() == etcd::Event::EventType::DELETE_)
{
if(_del_cb)
{
_del_cb(e.prev_kv().key(),e.prev_kv().as_string());
}
LOG_DEBUG("下线服务:{}-{}",e.prev_kv().key(),e.prev_kv().as_string());
}
}
}
private:
NotifyCallback _put_cb;
NotifyCallback _del_cb;
std::shared_ptr<etcd::Client> _client;
std::shared_ptr<etcd::Watcher> _watcher;
};
}