Linux——五种IO模型
目录
一IO基本理解
二五种IO模型
1五种IO模型示意图
2同步IO和异步IO
二非阻塞IO
1fcntl
2实现非阻塞IO
三多路复用
1select
1.1定位和作用
1.2介绍参数
1.3编写多路复用代码
1.4优缺点
2poll
2.1作用和定位
2.2介绍参数
2.3修改select代码
3epoll
3.1介绍参数
3.2工作原理
理解数据到达主机
3.3编写epoll代码
3.4优点
3.5工作模式
理解LT和ET
四Reactor
完整源代码
一IO基本理解
前面学习了网络通信,我们认识到:网络通信本质上是进程间通信;而进程间通信本质上是IO;关于IO:I:Input;O:Output;这又是什么意思?
站在进程角度上:Input是外面交给进程的数据;Output是进程发出去的数据; 站在系统角度上:Input是数据从硬件交给OS;Output是数据从OS输出出去; 站在内存角度上:Input是内存数据交给磁盘;Output是磁盘数据交给内存
总之:IO本质上是外设与内设之间的交互
那对应我们来说:IO该如何理解呢??
C/C++解除到的IO接口:read/recv write/send 当底层有新数据到来时(前提接收/发送缓冲区未被占满的情况下),这些接口才能给我们返回fd:让我们读/写数据,否则会一直阻塞下去知道新数据的到来!
所以说:IO = 等 + 拷贝
那什么叫做高效的IO呢?<-> 在单位时间内,减少IO等的比率
二五种IO模型
在现实生活中,有很多人喜欢钓鱼,也被我们称之为钓鱼佬;它们钓鱼方式都是按照自己对钓鱼的理解去实现的,所以也就产生出不同的钓鱼方式;这也就总共可分为以下五种:(钓鱼 = 等 + 钓)
张三:一动不动,检测鱼漂,钓;(阻塞IO)
李四:一直在动,随便检测鱼漂,钓;(非阻塞IO)
王五:在鱼漂上挂铃铛(铃铛没响就刷抖音),钓;(信号驱动IO)
赵六:准备很多钓鱼竿,定期检测鱼竿,钓;(多路复用IO)
田七:找司机小王钓,自己去做别的事(田七没有参与调用,只是发起钓鱼);(异步IO)
在以上例子中:
人:系统调用;
鱼竿:sockfd;
湖:系统内部;
鱼:数据;
鱼漂浮动:数据就绪;
钓:接收数据
田七:发起IO 小王:操作系统
通过上面的例子可以基本对应5中IO模型,但又有问题了:
阻塞IO vs 非阻塞IO
IO = 等 + 拷贝:拷贝(钓)的方式是一样的,就是等的方式不同(一个一直等,一个一直动)
谁的钓鱼效率最高呢?(单位时间内等待的时间短)
你可以会说是王五或者田七:但是答案是赵六!因为如果你是鱼的话,湖面上有104个诱饵(光赵六就有100个),你吃到赵六的鱼饵的概率高,也就是赵六更容易有鱼咬钩,自然比别人效率高,而王五田七他们只有一个鱼竿(假设与咬钩概率相等),一天内钓上来鱼的数量是差不多的,只是多做了一些事情(多刷了几个视频,多参加了几个会(对于别人来说))
1五种IO模型示意图
阻塞 IO(常见): 在内核将数据准备好之前, 系统调用会一直等待;所有的套接字,默认都是阻塞方式
非阻塞 IO: 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回错误码
信号驱动 IO: 内核将数据准备好的时候, 使用 SIGIO 信号通知应用程序进行 IO操作
IO 多路转接: 虽然从流程图上看起来和阻塞 IO 类似. 实际上最核心在于 IO 多路转接能够同时等待多个文件描述符的就绪状态
异步 IO: 由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)
2同步IO和异步IO
我们也将IO模型分为两大类:同步IO(前四种IO模型)和异步IO
之前也有人为了信号驱动IO是同步还是异步争吵过,但这里把信号驱动IO归结为同步IO:看似王五在做别的事,但是鱼咬钩你是不是要去把鱼给钓上来呢?实际上还是受到IO影响(刷着视频突然鱼咬钩了你要立刻放下手机去钓鱼)!
两者区别:你有没有参与IO(等)的过程
二非阻塞IO
非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询:这
对 CPU 来说是较大的浪费, 一般只有特定场景下才使用
1fcntl
作用:将fd设置成非阻塞IO
函数的第二个参数:
• 复制一个现有的描述符(cmd=F_DUPFD) .
• 获得/设置文件描述符标记(cmd=F_GETFD 或 F_SETFD).
• 获得/设置文件状态标记(cmd=F_GETFL 或 F_SETFL).
• 获得/设置异步 I/O 所有权(cmd=F_GETOWN 或 F_SETOWN).
• 获得/设置记录锁(cmd=F_GETLK,F_SETLK 或 F_SETLKW).
2实现非阻塞IO
#include <iostream>
#include <sys/fcntl.h>
#include <string>
#include <unistd.h>
// 设置非阻塞轮询方式
void NoBlock(int fd)
{
int f1 = fcntl(fd, F_GETFL);
if (f1 < 0)
{
std::cerr << "fcntl fail" << std::endl;
return;
}
fcntl(fd, F_SETFL, f1 | O_NONBLOCK);
}
int main()
{
char buffer[128];
NoBlock(0);
while (true)
{
ssize_t n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "echo:" << buffer;
}
else if (n == 0)
{
std::cout << "read none" << std::endl;
break;
}
else
{
// 如何区别read是出错了还是非阻塞了? -> 通过errno来判断!
if (errno == EWOULDBLOCK)
{
std::cout << "Non blocking polling" << std::endl;
std::cout << "Do other things" << std::endl;
sleep(1);
continue;
}
else
{
std::cout << "read error" << std::endl;
break;
}
}
}
return 0;
}
现象:
为什么我输入的时候信息会与打印的信息混杂在一起?--> OS(默认)回显
如果read被信号中断了,循环就会结束,不往下进行读了:我们也要保证该情况不会影响我们读取
#include <iostream>
#include <sys/fcntl.h>
#include <string>
#include <unistd.h>
// 设置非阻塞轮询方式
void NoBlock(int fd)
{
int f1 = fcntl(fd, F_GETFL);
if (f1 < 0)
{
std::cerr << "fcntl fail" << std::endl;
return;
}
fcntl(fd, F_SETFL, f1 | O_NONBLOCK);
}
int main()
{
char buffer[128];
NoBlock(0);
while (true)
{
ssize_t n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "echo:" << buffer;
}
else if (n == 0)//ctrl+d
{
std::cout << "read none" << std::endl;
break;
}
else
{
// 如何区别read是出错了还是非阻塞了? -> 通过errno来判断!
if (errno == EWOULDBLOCK)
{
std::cout << "Non blocking polling" << std::endl;
std::cout << "Do other things" << std::endl;
sleep(1);
continue;//未来是要break的!
}
else if(errno==EINTR)//read被信号中断
{
continue;
}
else
{
std::cout << "read error" << std::endl;
break;
}
}
}
return 0;
}
三多路复用
IO多路复用(Input/Output Multiplexing)是一种在单个线程中管理多个输入/输出通道的技术。它允许一个线程同时监听多个输入流(例如sockfd、fd等),并在有数据可读或可写时进行相应的处理,而不需要为每个通道创建一个独立的线程。
1select
系统提供 select 函数来实现多路复用输入/输出模型
1.1定位和作用
定位:它只负责进行等,不负责IO
作用:等待多个fd(等该fd上面的新事件就绪),通知用户新事件已经就绪,可以来IO了
1.2介绍参数
1.3编写多路复用代码
使用select要注意以下细节:
1.select后accept还是阻塞吗?
一定不会阻塞(事件已经就绪)
2.read能直接读吗?如果不能,谁最清楚底层fd的数据就绪了?
绝对不能读,读取的时候,条件不一定满足; 由select来统一监管(select知道)
3.select要正常工作,要借助辅助数组(fd_array),来保存所有合法的fd! (select的fd_set* 是输入输出参数,它会继续参数重置)
4.select统一监管是怎么做到的?
(accept后)将最新的fd添加到辅助数组(fd_array)就行了
5.就绪了循环处理所有事件
//SelectServer.hpp
#pragma once
#include <cstdint>
#include <memory>
#include <sys/select.h>
#include "Socket.hpp"
const static int gnum = sizeof(fd_set) * 8;
const static int gfd = -1;
class SelectServer
{
public:
SelectServer(uint16_t port) : _port(port), _st(std::make_unique<TcpSocket>())
{
_st->Tcp_ServerSocket(_port);
}
void Init()
{
for (int i = 0; i < gnum; i++)
{
fd_array[i] = gfd;
}
// 自己先设置
fd_array[0] = _st->Sockfd();
}
void Loop()
{
while (true)
{
fd_set ft;
FD_ZERO(&ft); // 使用前先清除
int fd_max = gfd;
// 合法的fd添加到ft中
for (int i = 0; i < gnum; i++)
{
if (fd_array[i] == gfd)
continue;
FD_SET(fd_array[i], &ft);
// 更新出最大值fd_max
if (fd_max < fd_array[i])
fd_max = fd_array[i];
}
struct timeval tl = {10, 0};
// 只关心读事件
int n = ::select(fd_max + 1, &ft, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
std::cout << "time done " << tl.tv_sec << ' ' << tl.tv_usec << std::endl;
break;
case -1:
std::cerr << "select fail" << std::endl;
break;
default:
std::cout << "have even: " << n << std::endl;
HandEvent(ft); // 处理事件(不出来会一直通知)
PrintFd_array();
break;
}
}
}
//一定会存在大量的不用类型的sockfd!
void HandEvent(fd_set &ft)
{
for (int i = 0; i < gnum; i++)
{
if (fd_array[i] == gfd)
continue;
// 一定是合法的fd
// 判断fd是否就绪
if (FD_ISSET(fd_array[i], &ft)) // 动态检测
{
// 事件就绪
// sockfd类型?
if (fd_array[i] == _st->Sockfd())
{
Link();
}
else
{
Read_Write(i);
}
}
}
}
void Link()
{
InetAddr ir;
int sockfd = _st->AcceptSocket(&ir); // 一定不会阻塞!
if (sockfd > 0)
{
// 新的sockfd -> fd_array 让select进行管理
bool tmp = false;
for (int i = 1; i < gnum; i++)
{
if (fd_array[i] == gfd)
{
tmp = true;
fd_array[i] = sockfd;
std::cout << "add:" << fd_array[i] << std::endl;
break;
}
}
// fd_array满了
if (!tmp)
{
std::cout << "fd_array full" << std::endl;
::close(sockfd);
}
}
}
void Read_Write(int i)
{
//没协议,可以直接读(这里考虑IO)
char buffer[1024];
ssize_t n = ::read(fd_array[i], buffer, sizeof(buffer) - 1); // 阻塞?No
if (n > 0)
{
buffer[n] = 0;
std::cout << "Server read:" << buffer << std::endl;
std::string content = "<html><body><h1>hello warld</h1></body></html>";
std::string ech_str = "HTTP/1.0 200 OK\r\n";
ech_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";
ech_str += content;
//(这里)默认是就绪的
::write(fd_array[i], ech_str.c_str(), ech_str.size());//临时
}
else if (n == 0)
{
std::cout << "user quit.." << std::endl;
::close(fd_array[i]);
//清理fd
fd_array[i] = gfd;
}
else
{
std::cout << "recv fail.." << std::endl;
::close(fd_array[i]);
//清理fd
fd_array[i] = gfd;
}
}
void PrintFd_array()
{
for (int i = 0; i < gnum; i++)
{
if (fd_array[i] == gfd)
continue;
std::cout << fd_array[i] << ' ';
}
std::cout << std::endl;
}
private:
uint16_t _port;
std::unique_ptr<Socket> _st;
// fd数组->select管理
int fd_array[gnum];
};
//Main.cc
#include <iostream>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pthread.h>
#include <cstring>
#include <string>
#include"Socket.hpp"
#include"SelectServer.hpp"
int main(int args, char *argv[])
{
if (args != 2)
{
std::cerr << "./Server Localport" << std::endl;
exit(0);
}
uint16_t serverport = std::stoi(argv[1]);
std::unique_ptr<SelectServer> sr=std::make_unique<SelectServer>(serverport);
sr->Init();
sr->Loop();
return 0;
}
//InetAddr.hpp
#pragma once
#include<iostream>
#include<string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class InetAddr
{
void ToHost()
{
//_ip=inet_ntoa(_addr.sin_addr);//4字节地址->char*
_port=ntohs(_addr.sin_port);
char buffer[124];
inet_ntop(AF_INET,&_addr.sin_addr,buffer,sizeof(buffer));
_ip=buffer;
}
public:
InetAddr()
{
}
InetAddr(const struct sockaddr_in& addr)
:_addr(addr)
{
ToHost();
}
std::string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
struct sockaddr_in Addr()
{
return _addr;
}
bool operator==(const InetAddr& ad)
{
return (this->_ip==ad._ip&&this->_port==ad._port);
}
std::string User()
{
std::string tmp=_ip+" "+std::to_string(_port)+":";
return tmp;
}
private:
std::string _ip;
uint16_t _port;
struct sockaddr_in _addr;
};
//Socket.hpp
#pragma once
#include "InetAddr.hpp"
class Socket;
enum
{
SOCK_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
};
const int gbacklog = 8;
class Socket
{
public:
virtual void CreateSocket() = 0;
virtual void InitSocket(uint16_t port) = 0;
virtual int AcceptSocket(InetAddr *addr) = 0; // 对象/变量
virtual bool ConnectSocket(uint16_t port, const std::string &ip) = 0; // clinet连接成功与失败
virtual int Sockfd() = 0;
virtual void Close() = 0;
virtual ssize_t Recv(std::string *out) = 0;
virtual ssize_t Send(const std::string &in) = 0;
public:
void Tcp_ServerSocket(uint16_t port)
{
CreateSocket();
InitSocket(port);
}
bool Tcp_ClientSocket(uint16_t port, const std::string &ip)
{
CreateSocket();
return ConnectSocket(port, ip);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket()
{
}
TcpSocket(int sockfd)
: _sockfd(sockfd)
{
}
~TcpSocket()
{
}
virtual void CreateSocket() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
std::cerr << "socket fail" << std::endl;
exit(SOCK_ERROR);
}
std::cout << "socket sucess" << std::endl;
}
virtual void InitSocket(uint16_t port) override
{
struct sockaddr_in perr;
memset(&perr, 0, sizeof(perr));
perr.sin_family = AF_INET;
perr.sin_port = htons(port);
perr.sin_addr.s_addr = INADDR_ANY;
if (::bind(_sockfd, (struct sockaddr *)&perr, sizeof(perr)) < 0)
{
std::cerr << "bind fail" << std::endl;
exit(BIND_ERROR);
}
std::cout << "bind sucess" << std::endl;
if (::listen(_sockfd, gbacklog) < 0)
{
std::cerr << "listen fail" << std::endl;
exit(LISTEN_ERROR);
}
std::cout << "listen sucess" << std::endl;
}
virtual int AcceptSocket(InetAddr *addr) override // 外层要获取客户端信息 TODO
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
std::cerr << "accept fail" << std::endl;
return -1;
}
*addr = client;
std::cout << "get a new link " <<addr->User()<< std::endl;
return sockfd;
}
virtual bool ConnectSocket(uint16_t port, const std::string &ip) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &server.sin_addr);
int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
return true;
}
virtual int Sockfd() override
{
return _sockfd;
}
virtual void Close() override
{
if (_sockfd > 0)
{
::close(_sockfd);
}
}
virtual ssize_t Recv(std::string *out) override
{
char buffer[4096];
ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
*out += buffer; // 细节
}
return n;
}
virtual ssize_t Send(const std::string &in) override
{
return ::send(_sockfd, in.c_str(), in.size(), 0);
}
private:
int _sockfd; // 两个角色
};
1.4优缺点
优点: • 实现简单 • 兼容性好,可以实现跨平台 • 在一些老内核中就只能使用select来实现多路复用
缺点: • 需要手动设置 fd 集合(使用起来麻烦(fd 集合混杂着不同的事件类型))
• 需要把 fd 集合从用户态拷贝到内核态(OS不相信任何人)(无法避免)
• 需要在内核遍历传递进来的所有 fd
• select 支持的文件描述符数量太小(进程打开的文件描述符也是有上限)
2poll
poll主要来解决手动设置核fd太小的问题(其它缺点poll同样存在)
2.1作用和定位
作用:等待多个fd上面的事件就绪,通知用户事件已经就绪,可以进行IO了
定位:只负责等:等就绪后进行事件派发
2.2介绍参数
events和revents的取值
仅仅用一个结构体就解决了:用户让OS关心的事件(设置)和用户让我关心的事件(返回)
不用因为select设计的缺陷, 对fd和fd关心的事件进行重新设定!
而数量太小问题:poll就从此就给了用户,由用户定义fd数量后交给poll
2.3修改select代码
//PollServer.hpp
#pragma once
#include <cstdint>
#include <memory>
#include <sys/poll.h>
#include "Socket.hpp"
const static int gnum = sizeof(fd_set) * 8;
const static int gfd = -1;
class PollServer
{
public:
PollServer(uint16_t port) : _port(port), _st(std::make_unique<TcpSocket>())
{
_st->Tcp_ServerSocket(_port);
}
void Init()
{
for (int i = 0; i < gnum; i++)
{
fd_array[i].fd = gfd;
fd_array[i].events = 0;
fd_array[i].revents = 0;
}
// 自己先设置
fd_array[0].fd = _st->Sockfd();
fd_array[0].events = POLLIN;
}
void Link()
{
InetAddr ir;
int sockfd = _st->AcceptSocket(&ir);
if (sockfd > 0)
{
// 新的sockfd -> fd_array 让select进行管理
bool tmp = false;
for (int i = 1; i < gnum; i++)
{
if (fd_array[i].fd == gfd)
{
tmp = true;
fd_array[i].fd = sockfd;
fd_array[i].events = POLLIN;
std::cout << "add:" << fd_array[i].fd << std::endl;
break;
}
}
// fd_array满了
if (!tmp)
{
std::cout << "fd_array full" << std::endl;
}
}
}
void Read_Write(int i)
{
char buffer[1024];
ssize_t n = ::read(fd_array[i].fd, buffer, sizeof(buffer) - 1); // 阻塞?No
if (n > 0)
{
buffer[n] = 0;
std::cout << "Server read:" << buffer << std::endl;
std::string content = "<html><body><h1>hello warld</h1></body></html>";
std::string ech_str = "HTTP/1.0 200 OK\r\n";
ech_str += "Content-Type: text/html\r\n";
ech_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";
ech_str += content;
::write(fd_array[i].fd, ech_str.c_str(), ech_str.size());
}
else if (n == 0)
{
std::cout << "user quit.." << std::endl;
::close(fd_array[i].fd);
fd_array[i].fd = gfd;
fd_array[i].events = 0;
fd_array[i].revents = 0;
}
else
{
std::cout << "recv fail.." << std::endl;
::close(fd_array[i].fd);
fd_array[i].fd = gfd;
fd_array[i].events = 0;
fd_array[i].revents = 0;
}
}
void HandEvent()
{
for (int i = 0; i < gnum; i++)
{
if (fd_array[i].fd == gfd)
continue;
// 哪个读事件就绪
if (fd_array[i].revents & POLLIN)
{
// 处理特定sockfd
if (fd_array[i].fd == _st->Sockfd())
{
Link();
}
// 处理普通fd
else
{
Read_Write(i);
}
}
}
}
void Loop()
{
while (true)
{
// 不用设置啦 -> 优雅
int n = ::poll(fd_array, gnum, 1000);
switch (n)
{
case 0:
std::cout << "time done " << std::endl;
break;
case -1:
std::cerr << "select fail" << std::endl;
break;
default:
std::cout << "have even: " << n << std::endl;
HandEvent();
PrintFd_array();
break;
}
}
}
void PrintFd_array()
{
for (int i = 0; i < gnum; i++)
{
if (fd_array[i].fd == gfd)
continue;
std::cout << fd_array[i].fd << ' ';
}
std::cout << std::endl;
}
private:
uint16_t _port;
std::unique_ptr<Socket> _st;
// fd数组->select管理
pollfd fd_array[gnum];
};
poll的底层也是要OS遍历所有的fd,来获取fd和对应的事件(效率与select一样不高)
所以就有了下面的epoll~
3epoll
按照 man 手册的说法: 为处理大批量句柄而作了改进的 poll
它是在 2.5.44 内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)它几乎具备了之前所说的一切优点, 被公认为 Linux2.6 下性能最好的多路 I/O 就绪通知方法
3.1介绍参数
event可以是几个宏的集合
• EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);
• EPOLLOUT : 表示对应的文件描述符可以写;
• EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外
数据到来);
• EPOLLERR : 表示对应的文件描述符发生错误;
• EPOLLHUP : 表示对应的文件描述符被挂断;
• EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平触发来说的.
• EPOLLONESHOT: 只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里
虽然说是epoll的加强版,但两者差别真的挺大的!
3.2工作原理
介绍完参数,此刻的你或许有很多问题:不急先来谈谈epoll的工作原理
理解数据到达主机
主机从网络中收数据时通过网卡来解收的,从而往上交给数据链路层;数据链路层是OS管的,也就是说OS怎么知道网卡有数据了需要进行拷贝了?
有人会说:简单,让OS定期检测一遍不就行了?OS要管理小到进程行大到外设,所有事情都要轮询OS不得忙死,所以OS是通过网卡给我发送信号(硬件中断)来得知网卡有数据啦!
epoll函数与OS的关系
但实际上,OS管理就绪队列时不用创建节点的!
OS也要对epoll模型作管理:先描述,在组织
3.3编写epoll代码
//ServerEpoll.hpp
#pragma once
#include "Socket.hpp"
const static int gnum = 128;
const static int gsize = 128;
class ServerEpoll
{
public:
ServerEpoll(uint16_t port)
: _port(port), _st(std::make_unique<TcpSocket>())
{
_st->Tcp_ServerSocket(_port);
_epfd = ::epoll_create(gsize);
if (_epfd < 0)
{
LOG(FATAL, "epoll_creat fail\n");
exit(1);
}
LOG(INFO, "epoll_creat sucess,_epfd:%d\n", _epfd);
}
~ServerEpoll()
{
if (_epfd > 0)
::close(_epfd);
_st->Close();
}
void Init()
{
epoll_event et;
et.data.fd = _st->Sockfd(); // 为了在事件就绪时知道fd就绪了
et.events = EPOLLIN;
int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, _st->Sockfd(), &et);
if (n < 0)
{
LOG(ERROR, "epoll_ctl fail\n");
exit(2);
}
LOG(INFO, "epoll_ctl sucess,add %d\n", _st->Sockfd());
}
void Start()
{
int time = -1;
while (true)
{
int n = ::epoll_wait(_epfd, ets, gnum, time);
switch (n)
{
case 0:
LOG(INFO, "epoll_wait time done\n");
break;
case -1:
LOG(INFO, "epoll_wait fail\n");
break;
default:
LOG(INFO, "epoll sucess total:%d\n", n);
Handler(n);
break;
}
}
}
void Handler(int n)
{
for (int i = 0; i < n; i++)
{
int sockfd = ets[i].data.fd;
uint32_t event = ets[i].events;
LOG(INFO, "%d 事件就绪 event:%s\n", sockfd, EventToString(event).c_str());
if (event & EPOLLIN)
{
if (sockfd == _st->Sockfd())
{
Accept();
}
else
{
HandlerIO(sockfd);
}
}
}
}
void Accept()
{
InetAddr ar;
int sockfd = _st->AcceptSocket(&ar);
if (sockfd < 0)
{
LOG(ERROR, "gain line error sockfd: %d\n", sockfd);
return;
}
LOG(INFO, "gain link:%d client:%s\n", sockfd, ar.User().c_str());
epoll_event et;
et.data.fd = sockfd;
et.events = EPOLLIN;
// OS在底层建立红黑树节点 -- select/poll要循环自己维护~
int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &et);
if (n < 0)
{
LOG(ERROR, "epoll_ctl fail\n");
exit(2);
}
LOG(INFO, "epoll_ctl sucess,add %d\n", sockfd);
}
void HandlerIO(int fd)
{
// 不保证读到的数据是正确的:要通过协议来保证~
char buffer[4096];
ssize_t n = ::recv(fd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
std::cout << buffer;
std::string content = "<html><body><h1>Hello Warld</h1></body></html>";
std::string ech_str = "HTTP/1.0 200 OK\r\n";
ech_str += "Content-Length:" + std::to_string(content.size()) + "\r\n";
ech_str += "\r\n";
ech_str += content;
::send(fd, ech_str.c_str(), ech_str.size(), 0);
}
else if (n == 0)
{
LOG(INFO, "Clinet quit fd:%d\n", fd);
// epoll移除 fd fd要保证:健康&&合法
::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
// 再关闭
::close(fd);
}
else
{
LOG(INFO, "read fail\n");
// epoll移除 fd fd要保证:健康&&合法
::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
::close(fd);
}
}
std::string EventToString(uint32_t event)
{
std::string et;
if (event & EPOLLIN)
et += "EPOLLIN";
if (event & EPOLLOUT)
et += " | EPOLLOUT";
return et;
}
private:
uint16_t _port;
std::unique_ptr<Socket> _st;
int _epfd;
epoll_event ets[gnum]; // 不用自己维护
};
//Main.cc
#include<iostream>
#include<memory>
#include<sys/epoll.h>
#include"ServerEpoll.hpp"
int main(int args,char* argc[])
{
if(args!=2)
{
std::cout<<"./Main Port"<<std::endl;
exit(1);
}
uint16_t port=std::stoi(argc[1]);
std::unique_ptr<ServerEpoll> usr=std::make_unique<ServerEpoll>(port);
usr->Init();
usr->Start();
return 0;
}
//InetAddr.hpp
#pragma once
#include<iostream>
#include<string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class InetAddr
{
void ToHost()
{
//_ip=inet_ntoa(_addr.sin_addr);//4字节地址->char*
_port=ntohs(_addr.sin_port);
char buffer[124];
inet_ntop(AF_INET,&_addr.sin_addr,buffer,sizeof(buffer));
_ip=buffer;
}
public:
InetAddr()
{
}
InetAddr(const struct sockaddr_in& addr)
:_addr(addr)
{
ToHost();
}
std::string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
struct sockaddr_in Addr()
{
return _addr;
}
bool operator==(const InetAddr& ad)
{
return (this->_ip==ad._ip&&this->_port==ad._port);
}
std::string User()
{
std::string tmp=_ip+" "+std::to_string(_port)+":";
return tmp;
}
private:
std::string _ip;
uint16_t _port;
struct sockaddr_in _addr;
};
//Log.hpp
#pragma once
#include<iostream>
#include<string>
#include<cstring>
#include<fstream>
#include <stdarg.h>
#include <unistd.h>
#include <sys/syscall.h>
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string Getlevel(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
break;
case INFO:
return "INFO";
break;
case WARNING:
return "WARNING";
break;
case ERROR:
return "ERROR";
break;
case FATAL:
return "FATAL";
break;
default:
return "";
break;
}
}
std::string Gettime()
{
time_t now = time(nullptr);
struct tm *time = localtime(&now);
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
time->tm_year + 1900,
time->tm_mon + 1,
time->tm_mday,
time->tm_hour,
time->tm_min,
time->tm_sec);
return buffer;
}
struct log_message
{
std::string _level;
int _id;
std::string _filename;
int _filenumber;
std::string _cur_time;
std::string _message;
};
#define SCREAM 1
#define FILE 2
#define DEVELOP 3
#define OPERATION 4
const std::string gpath = "./log.txt";
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
class log
{
public:
log(const std::string &path = gpath, const int status = DEVELOP)
: _mode(SCREAM), _path(path), _status(status)
{
}
void SelectMode(int mode)
{
_mode = mode;
}
void SelectStatus(int status)
{
_status = status;
}
void PrintScream(const log_message &le)
{
printf("[%s][%d][%s][%d][%s] %s",
le._level.c_str(),
le._id,
le._filename.c_str(),
le._filenumber,
le._cur_time.c_str(),
le._message.c_str());
}
void PrintFile(const log_message &le)
{
std::fstream in(_path, std::ios::app);
if (!in.is_open())
return;
char buffer[1024];
snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s] %s",
le._level.c_str(),
le._id,
le._filename.c_str(),
le._filenumber,
le._cur_time.c_str(),
le._message.c_str());
in.write(buffer, strlen(buffer)); // 不用sizeof
in.close();
}
void PrintLog(const log_message &le)
{
// 过滤
if (_status == OPERATION)
return;
// 线程安全
pthread_mutex_lock(&gmutex);
switch (_mode)
{
case SCREAM:
PrintScream(le);
break;
case FILE:
PrintFile(le);
break;
default:
break;
}
pthread_mutex_unlock(&gmutex);
}
void logmessage(int level, const std::string &filename, int filenumber, const char *message, ...)
{
log_message le;
le._level = Getlevel(level);
le._id = syscall(SYS_gettid);
le._filename = filename;
le._filenumber = filenumber;
le._cur_time = Gettime();
va_list vt;
va_start(vt, message);
char buffer[128];
vsnprintf(buffer, sizeof(buffer), message, vt);
va_end(vt);
le._message = buffer;
// 打印日志
PrintLog(le);
}
~log()
{
}
private:
int _mode;
std::string _path;
int _status;
};
// 方便上层调用
log lg;
// ##不传时可忽略参数
#define LOG(level, message, ...) \
do \
{ \
lg.logmessage(level, __FILE__, __LINE__, message, ##__VA_ARGS__); \
} while (0)
#define SleftScream() \
do \
{ \
lg.SelectMode(SCREAM); \
} while (0)
#define SleftFile() \
do \
{ \
lg.SelectMode(FILE); \
} while (0)
#define SleftDevelop() \
do \
{ \
lg.SelectStatus(DEVELOP); \
} while (0)
#define SleftOperation() \
do \
{ \
lg.SelectStatus(OPERATION); \
} while (0)
//#pragma once
#include "InetAddr.hpp"
#include "Log.hpp"
class Socket;
enum
{
SOCK_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
};
const int gbacklog = 8;
class Socket
{
public:
virtual void CreateSocket() = 0;
virtual void InitSocket(uint16_t port) = 0;
virtual int AcceptSocket(InetAddr *addr) = 0; // 对象/变量
virtual bool ConnectSocket(uint16_t port, const std::string &ip) = 0; // clinet连接成功与失败
virtual int Sockfd() = 0;
virtual void Close() = 0;
virtual ssize_t Recv(std::string *out) = 0;
virtual ssize_t Send(const std::string &in) = 0;
public:
void Tcp_ServerSocket(uint16_t port)
{
CreateSocket();
InitSocket(port);
}
bool Tcp_ClientSocket(uint16_t port, const std::string &ip)
{
CreateSocket();
return ConnectSocket(port, ip);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket()
{
}
TcpSocket(int sockfd)
: _sockfd(sockfd)
{
}
~TcpSocket()
{
}
virtual void CreateSocket() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(ERROR,"create socket fail\n");
exit(SOCK_ERROR);
}
LOG(INFO,"create socket sucess,sockfd:%d\n",_sockfd);
}
virtual void InitSocket(uint16_t port) override
{
struct sockaddr_in perr;
memset(&perr, 0, sizeof(perr));
perr.sin_family = AF_INET;
perr.sin_port = htons(port);
perr.sin_addr.s_addr = INADDR_ANY;
if (::bind(_sockfd, (struct sockaddr *)&perr, sizeof(perr)) < 0)
{
LOG(ERROR,"bind fail\n");
exit(BIND_ERROR);
}
LOG(INFO,"bind sucess\n");
if (::listen(_sockfd, gbacklog) < 0)
{
LOG(ERROR,"listen fail\n");
exit(LISTEN_ERROR);
}
LOG(INFO,"listen sucess\n");
}
virtual int AcceptSocket(InetAddr *addr) override // 外层要获取客户端信息 TODO
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
LOG(ERROR,"link fail\n");
return -1;
}
*addr = client;
return sockfd;
}
virtual bool ConnectSocket(uint16_t port, const std::string &ip) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &server.sin_addr);
int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
return true;
}
virtual int Sockfd() override
{
return _sockfd;
}
virtual void Close() override
{
if (_sockfd > 0)
{
::close(_sockfd);
}
}
virtual ssize_t Recv(std::string *out) override
{
char buffer[4096];
ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
*out += buffer; // 细节
}
return n;
}
virtual ssize_t Send(const std::string &in) override
{
return ::send(_sockfd, in.c_str(), in.size(), 0);
}
private:
int _sockfd; // 两个角色
};
3.4优点
• 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
• 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而 select/poll 都是每次循环都要进行拷贝)
• 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中(填写就绪队列的节点), epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度 O(1),拷贝到用户定义的数组虽然也是O(N),但它严格按顺序进行拷贝
3.5工作模式
在XX快递站中有两个快递员:张三和李四;张三工作相对负责,而李四工作较毛躁;有一天住在5楼的小王在网上买了5个快递,现在正在派发:有三个在张三手上,有两个在李四手上;现在张三先到了小王楼下,带电话给小王叫他下来去快递,但由于小王在玩游戏,也就没下去取;张三一看小王没下来就重复给小王打电话;等到小王游戏打完后就下楼拿快递了,但他只拿了两个就上楼了;现在李四到了问张三:你还有多少个快递没派完?“还有一个小王的”张三说;那正好我手上有2个小王的包裹,你帮我一起派发吧!张三也只好接下了2个小王新增的包裹,继续给小王打电话,知道他吧包裹全取走为止...
后来小王又在网上买了5个快递,派发是恰好3个在李四手上,2个在张三手上;现在轮到李四先到小王楼下;由于李四工作比较毛躁,打电话给小王说:我只打电话给你一次,如果你不下来取快递,派发完别人的我就先走了!小王一听对方语气有点强硬,也就刚上了就不下去取;此时张三到了小王楼下,看到李四问有小王的快递没?一听有,张三就把小王的快递交给了李四说:上次我帮了你,这次你帮我下;李四也是挺讲义气的,也就把包裹拿来了,这次手上新增了2个包裹,再次打给了小王,小王说:你不是说不再给我打电话的吗?“没办法,手上又新增了你的2个快递,你快点下来拿把,不然我就走了!”小王也就得毕竟是自己的快递没必要不下去拿,也就下去把5个快递都拿回家了~
张三:只要底层一直有快递(数据),就一直通知小王(上层)
李四:包裹从无到有,从有到多的时候,才会通知小王(上层)
张三和李四就对应着epoll的两种不同的工作模式:水平触发(LT)边缘触发(ET)
理解LT和ET
场景:客户端向(ET模式下)服务器发送10k请求数据,服务器只读了1k数据
在ET模式下只通知一次,本轮数据如果没读完不会再通知;如果服务器没读完数据,导致双方都要发数据给对方,对方才能进行处理数据(陷入死循环)
这就要求在ET模式下读事件一旦就绪,必须把数据读完;那我怎么知道我把数据读完了?
这就好比关系还不错的朋友来向你借钱,第一次借100,下一次再借100,那他下次会不会再向你借钱呢?肯定会(知道你有钱~);那他怎么知道你没钱了?第三次向你借钱时,你发信息给他:我只有36.66块钱了,这是你朋友就知道你没钱了!
也就是在设计时进行循环读取,只到读取不到数据就证明没数据了!
如果这时朋友发信息再次向你借钱,你没回复他,他也不知道是什么情况(没钱还是没看到信息),就会一直僵持着
也就是说:循环读取势必会引起阻塞问题,但服务器敢让你阻塞吗(一阻塞可能服务器就挂了)!所以我们要对fd设置非阻塞来解决该问题!
以上便是为了引出结论:在ET模式下,所有的fd必须是非阻塞的!LT模式阻塞非阻塞都行~
ET更高效,体现在:1通知效率更高;2IO效率更高(给对方通告一个更大的接收窗口,让对方能发送更多的数据,提高IO效率)
那LT也可以设置非阻塞,也可以循环读取所有数据啊!但它会有退路可言(设置成阻塞,一次一次读数据)而ET一定要让上层读取全部数据!这在设计上ET也比LT要复杂些;但虽然ET更高效,也并不意味着LT就没用了:在一些场景中如果要实现响应是有顺序的或者是要求设计简单的,就选LT~
四Reactor
编写Reactor的基本框架
在Reactor的代码中,我们要来解决读和写的问题
关于读:我们能够直接读吗? 不能!
我们要怎么知道接收缓冲区有数据了? 通过epoll等待读事件就绪!
我们能保证读到的就是一个完整的请求吗? 不能!
如何保证? 引入协议:对读到的内容进行报文解析
关于写:a当我们获取一个新的sockfd时,输入和输出缓冲区都是空的
b读事件就绪:就是输入缓冲区有了数据/底层有新连接到来
c写事件就绪:不是关心数据,而是关心发送缓冲区中有没有空间,有空间就证明写事件就绪;否则写事件不满足!
d把sockfd托管给select,poll,epoll:原因是sockfd上事件没就绪!
所以默认sockfd新建的情况下,读事件是不就绪的:接收缓冲区暂时没数据,所以我们要把它进行托管
而默认sockfd新建的情况下,写事件是就绪的:接收缓冲区本来就有空间,所以我们直接写
当写条件不满足时,我们才要按需开启对EPOLLOUT的关心!
什么情况下写条件不满足? 发送缓冲区写满了&&数据还没发完
设置EPOLLOUT之后,我们就不用管了!后续的剩余数据epoll会自动给我发送!
补充细节:
1如果我们直接开启EPOLLOUT关心,epoll就会一直就绪
2如果未来我们发完数据了,对EPOLLOUT的关心就会被关闭
3如果缓冲区没满&&数据发完了,我们就不用开启对EPOLLOUT关心
4如果我们设置对EPOLLOUT的关心,epoll默认只会就绪一次!
5直接发,我们怎么知道写入条件不满足? 我们写入时进行判断errno == EWOULDBLOCK就是缓冲区被写满了!
在代码中我们没有sockfd的概念,只有一个一个的connection到来:所以我们要对connection进行描述并构建成对象,有Reactor对象来管理connection对象(通过unordered_map数据结构来组织)
accept设置连接时并不知道有多少个连接即将到来,所以我们要把listensockfd设置成非阻塞,循环获取新连接!
获取新连接时,我们要对读 写 异常方法进行bind回调函数,我们不在listener类中进行bind,我们统一在Main.cc(外部)将方法bind,怎么实现? 在connection类中定义一个type:表示该连接是监听sockfd还是普通sockfd;在Reactor类中定义一批方法集(读 写 异常方法),对外提供接口设置这批方法(也就是先在外部bind);从此我们AddConnection()函数添加连接是通过type判断是那种连接需要添加从而进行注册对应的方法(注册进connection类的成员方法中 ,后面等新事件到来时就能通过connection找到要执行的方法从而回调出去!)
在connection类中定义一个Reactor* R指针? 进行回指找到Reacor中的AddConnection()函数(在获取连接时我们要把新连接添加到unordered_map中要通过Reactor对象才能实现)
完整源代码
未来服务器将不会有sockfd的概念,只有一个一个的Connection
//Connection.hpp
#pragma once
#include <string>
#include <functional>
#define ListenConnection 0
#define NornalConnection 1
class Connection;
class Reactor;
using handler = std::function<void(Connection *)>;
class Connection
{
public:
Connection(int sockfd)
: _sockfd(sockfd)
{
}
~Connection()
{
}
void SetEvent(uint32_t event)
{
_event = event;
}
void SetReactor(Reactor *R)
{
_R = R;
}
void SetType(int type)
{
_type = type;
}
void SetAddr(InetAddr &addr)
{
_addr = addr;
}
void AppendInbuffer(const std::string &in)
{
_inbuffer += in;
}
void AppendOutbuffer(const std::string &in)
{
_outbuffer += in;
}
int Sockfd()
{
return _sockfd;
}
uint32_t Event()
{
return _event;
}
handler Recver()
{
return _recver;
}
handler Sender()
{
return _sender;
}
handler Excepter()
{
return _excepter;
}
Reactor *R()
{
return _R;
}
int Type()
{
return _type;
}
std::string &Inbuffer() // 引用!
{
return _inbuffer;
}
InetAddr Addr()
{
return _addr;
}
std::string &Outbuffer()
{
return _outbuffer;
}
void RegisterHandler(handler recver, handler sender, handler excepter)
{
_recver = recver;
_sender = sender;
_excepter = excepter;
}
void DiscardOutbuffer(ssize_t n)
{
_outbuffer.erase(0, n);
}
void Close()
{
if (_sockfd >= 0)
::close(_sockfd);
}
private:
int _sockfd;
uint32_t _event;
std::string _inbuffer;
std::string _outbuffer;
handler _recver; // 处理读事件
handler _sender; // 处理写事件
handler _excepter; // 处理异常事件
Reactor *_R; // 定义了一个Reactor指针?
int _type;
InetAddr _addr;
};
Connection被Reactor进行管理
//Reactor.hpp
#pragma once
#include <unordered_map>
#include <memory>
#include "Multiplex.hpp"
#include "Connection.hpp"
// 只对Connection进行管理
class Reactor
{
const static int gnum = 128;
public:
Reactor()
: _epoller(std::make_unique<Epoller>()), _isrunning(false)
{
}
~Reactor()
{
}
// 我不想让外部去new Connection对象
// void AddConnection(int sockfd, uint32_t event, handler recver, handler sender, handler excepter)
void AddConnection(int sockfd, uint32_t event, InetAddr &addr, int type)
{
// 构建Connection -- 不要用智能指针!!!
Connection *conn = new Connection(sockfd);
conn->SetEvent(event);
conn->SetReactor(this); // 设置conn与R的回指
conn->SetType(type); // 设置connection的类型
conn->SetAddr(addr);
// 注册读写异常方法
// conn->RegisterHandler(recver, sender, excepter);
if (conn->Type() == ListenConnection)
conn->RegisterHandler(_OnAccepter, nullptr, nullptr);
if (conn->Type() == NornalConnection)
conn->RegisterHandler(_OnRecver, _OnSender, _OnExcepter);
// epoll托管 - epoll模型进行封装
if (_epoller->AddEvent(conn->Sockfd(), conn->Event()))
_conn.insert({sockfd, conn}); // TcpServer管理
}
void Dispatcher()
{
int timeout = -1;
_isrunning = true;
while (_isrunning)
{
LoopOnce(timeout);
// Do Other Thing
PrintDebug();
}
_isrunning = false;
}
void LoopOnce(int timeout)
{
int n = _epoller->WaitEvent(revs, gnum, timeout);
for (int i = 0; i < n; i++)
{
int sockfd = revs[i].data.fd;
uint32_t event = revs[i].events;
if (event & EPOLLERR)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLHUP)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLIN)
{
if (IsConnExist(sockfd) && _conn[sockfd]->Recver())
_conn[sockfd]->Recver()(_conn[sockfd]); // 写事件派发
}
if (event & EPOLLOUT)
{
if (IsConnExist(sockfd) && _conn[sockfd]->Sender())
_conn[sockfd]->Sender()(_conn[sockfd]); // 读事件派发
}
}
}
bool IsConnExist(int sockfd)
{
return _conn.find(sockfd) != _conn.end();
}
void SetListenConnection(handler OnAccepter)
{
_OnAccepter = OnAccepter;
}
void SetNornalConnection(handler OnRecver, handler OnSender, handler OnExcepter)
{
_OnRecver = OnRecver;
_OnSender = OnSender;
_OnExcepter = OnExcepter;
}
handler OnAccepter()
{
return _OnAccepter;
}
handler OnRecver()
{
return _OnRecver;
}
handler OnSender()
{
return _OnSender;
}
handler OnExcepter()
{
return _OnExcepter;
}
void PrintDebug()
{
std::string str;
for (auto &conn : _conn)
{
str += std::to_string(conn.second->Sockfd()) + " ";
}
LOG(DEBUG, "conn list:%s\n", str.c_str());
}
void EnableReadWriteEvent(int sockfd, bool read, bool write)
{
if (!IsConnExist(sockfd))
return;
uint32_t event = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
// 修改
_conn[sockfd]->SetEvent(event);
// 设置进内核
_epoller->ModEvent(_conn[sockfd]->Sockfd(), _conn[sockfd]->Event());
}
void DelConnection(int sockfd)
{
// 进行安全检测
if (!IsConnExist(sockfd))
return;
LOG(INFO, "client %s quit,server delete all sources\n", _conn[sockfd]->Addr().User().c_str());
// 先从epoller中移除->细节!epoller移除的sockfd必须是健康的
// EnableReadWriteEvent(sockfd,false,false);
_epoller->DelEvent(sockfd);
// 关闭sockfd
_conn[sockfd]->Close();
// 从_conn中去除
_conn.erase(sockfd);
}
private:
// sockfd与connection进行映射
std::unordered_map<int, Connection *> _conn;
std::unique_ptr<Multiplex> _epoller;
bool _isrunning;
epoll_event revs[gnum];
// 设置方法集进行统一处理
handler _OnAccepter;
handler _OnRecver;
handler _OnSender;
handler _OnExcepter;
};
epoll接口封装成对象
//Multiplex.hpp
#pragma once
#include <sys/epoll.h>
#include "Log.hpp"
#include "Comm.hpp"
using namespace log_ns;
class Multiplex
{
public:
virtual bool AddEvent(int sockfd, uint32_t event) = 0;
virtual int WaitEvent(struct epoll_event revt[], int num, int timeout) = 0;
virtual bool ModEvent(int sockfd, uint32_t event) = 0;
virtual bool DelEvent(int sockfd) = 0;
};
class Epoller : public Multiplex
{
const static int size = 128;
private:
bool EventHelper(int sockfd, uint32_t event, int oper)
{
struct epoll_event evt;
evt.data.fd = sockfd;
evt.events = event;
int n = ::epoll_ctl(_epfd, oper, sockfd, &evt);
if (n < 0)
{
LOG(ERROR, "epoll_ctl failed\n");
return false;
}
LOG(INFO, "epoll_ctl %d events is %s sucess\n", sockfd, EventToString(event).c_str());
return true;
}
public:
Epoller()
{
_epfd = ::epoll_create(size);
if (_epfd < 0)
{
LOG(FATAL, "epoll_create error\n");
exit(EPOLL_CTL_ERROR);
}
LOG(INFO, "epoll_create sucess efd: %d\n", _epfd);
}
~Epoller()
{
}
std::string EventToString(uint32_t event)
{
std::string str;
if (event & EPOLLIN)
str += "EPOLLIN";
if (event & EPOLLOUT)
str += "|EPOLLOUT";
if (event & EPOLLET)
str += "|EPOLLET";
return str;
}
virtual bool AddEvent(int sockfd, uint32_t event) override
{
return EventHelper(sockfd, event, EPOLL_CTL_ADD);
}
virtual bool ModEvent(int sockfd, uint32_t event) override
{
return EventHelper(sockfd, event, EPOLL_CTL_MOD);
}
virtual bool DelEvent(int sockfd) override
{
return EventHelper(sockfd, 0, EPOLL_CTL_DEL);
}
virtual int WaitEvent(epoll_event revs[], int num, int timeout) override
{
return ::epoll_wait(_epfd, revs, num, timeout);
}
private:
int _epfd;
};
// class Poller:private Multiplex
// {
// };
// class Select:private Multiplex
// {
// };
对listensockfd的封装,内含读事件(连接)方法
//Listener.hpp
#pragma once
#include <memory>
#include "Socket.hpp"
#include "Connection.hpp"
#include "Log.hpp"
#include "Reactor.hpp"
using namespace socket_ns;
using namespace log_ns;
class Listener
{
public:
Listener(uint16_t port)
: _port(port), _ListenSocket(std::make_unique<TcpSocket>())
{
_ListenSocket->Tcp_ServerSocket(port);
}
int Listensockfd()
{
return _ListenSocket->Sockfd();
}
// listensockfd 读方法
void Accepter(Connection *conn)
{
while (true)
{
errno = 0;
InetAddr addr;
int code = 0;
int sockfd = _ListenSocket->AcceptSocket(&addr, &code);
if (sockfd > 0)
{
LOG(INFO, "get a new link %s sockfd: %d\n", addr.User().c_str(), sockfd);
conn->R()->AddConnection(sockfd, EPOLLIN | EPOLLET, addr, NornalConnection);
}
else
{
if (code == EWOULDBLOCK)
{
LOG(INFO, "gain connection is finished\n");
break;
}
else if (code == EINTR)
{
continue;
}
else
{
LOG(ERROR, "gain connection failed\n");
break;
}
}
}
}
private:
uint16_t _port;
std::unique_ptr<Socket> _ListenSocket;
};
//Socket.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <pthread.h>
#include "Log.hpp"
#include "InetAddr.hpp"
#include "Comm.hpp"
namespace socket_ns
{
using namespace log_ns;
const static int gbacklog = 8;
// 模板方法模式
class Socket
{
public:
virtual void CreateSocket() = 0;
virtual void InitSocket(uint16_t port, int backlog = gbacklog) = 0;
virtual int AcceptSocket(InetAddr *addr, int *code) = 0; // 对象/变量
virtual bool ConnectSocket(uint16_t port, const std::string &ip) = 0; // clinet连接成功与失败
virtual int Sockfd() = 0;
virtual void Close() = 0;
virtual ssize_t Recv(std::string *out) = 0;
virtual ssize_t Send(const std::string &in) = 0;
virtual void ReuseAddr() = 0;
public:
void Tcp_ServerSocket(uint16_t port)
{
CreateSocket();
ReuseAddr();
InitSocket(port);
}
bool Tcp_ClientSocket(uint16_t port, const std::string &ip)
{
CreateSocket();
return ConnectSocket(port, ip);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket()
{
}
TcpSocket(int sockfd)
: _sockfd(sockfd)
{
}
~TcpSocket()
{
}
virtual void CreateSocket() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket fail\n");
exit(SOCK_ERROR);
}
LOG(INFO, "socket sucess sockfd: %d\n", _sockfd);
NoBlock(_sockfd); // 设置非阻塞
}
virtual void InitSocket(uint16_t port, int backlog) override
{
struct sockaddr_in perr;
memset(&perr, 0, sizeof(perr));
perr.sin_family = AF_INET;
perr.sin_port = htons(port);
perr.sin_addr.s_addr = INADDR_ANY;
if (::bind(_sockfd, (struct sockaddr *)&perr, sizeof(perr)) < 0)
{
LOG(FATAL, "bind fail\n");
exit(BIND_ERROR);
}
if (::listen(_sockfd, backlog) < 0)
{
LOG(ERROR, "listen fail\n");
exit(LISTEN_ERROR);
}
}
virtual int AcceptSocket(InetAddr *addr, int *code) override
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
*code = errno;
if (sockfd < 0)
{
return -1;
}
NoBlock(sockfd); // 设置非阻塞
*addr = client;
return sockfd;
}
virtual bool ConnectSocket(uint16_t port, const std::string &ip) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &server.sin_addr);
int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
return true;
}
virtual int Sockfd() override
{
return _sockfd;
}
virtual void Close() override
{
if (_sockfd > 0)
{
::close(_sockfd);
}
}
virtual ssize_t Recv(std::string *out) override
{
char buffer[4096];
ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
*out += buffer; // 细节
}
return n;
}
virtual ssize_t Send(const std::string &in) override
{
return ::send(_sockfd, in.c_str(), in.size(), 0);
}
virtual void ReuseAddr() override
{
int opt = 1;
::setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
private:
int _sockfd; // 两个角色
};
// class UdpServer:public Socket
//{}
}
//Log.hpp
#pragma once
#include <pthread.h>
#include <fstream>
#include <syscall.h>
#include <stdarg.h>
#include <unistd.h>
#include <cstring>
namespace log_ns
{
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string Getlevel(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
break;
case INFO:
return "INFO";
break;
case WARNING:
return "WARNING";
break;
case ERROR:
return "ERROR";
break;
case FATAL:
return "FATAL";
break;
default:
return "";
break;
}
}
std::string Gettime()
{
time_t now = time(nullptr);
struct tm *time = localtime(&now);
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
time->tm_year + 1900,
time->tm_mon + 1,
time->tm_mday,
time->tm_hour,
time->tm_min,
time->tm_sec);
return buffer;
}
struct log_message
{
std::string _level;
int _id;
std::string _filename;
int _filenumber;
std::string _cur_time;
std::string _message;
};
#define SCREAM_TYPE 1
#define FILE_TYPE 2
#define DEVELOP 3
#define OPERATION 4
const std::string gpath = "./log.txt";
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
class log
{
public:
log(const std::string &path = gpath, const int status = DEVELOP)
: _mode(SCREAM_TYPE), _path(path), _status(status)
{
}
void SelectMode(int mode)
{
_mode = mode;
}
void SelectStatus(int status)
{
_status = status;
}
void PrintScream(const log_message &le)
{
printf("[%s][%d][%s][%d][%s] %s",
le._level.c_str(),
le._id,
le._filename.c_str(),
le._filenumber,
le._cur_time.c_str(),
le._message.c_str());
}
void PrintFile(const log_message &le)
{
std::fstream in(_path, std::ios::app);
if (!in.is_open())
return;
char buffer[1024];
snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s] %s",
le._level.c_str(),
le._id,
le._filename.c_str(),
le._filenumber,
le._cur_time.c_str(),
le._message.c_str());
in.write(buffer, strlen(buffer)); // 不用sizeof
in.close();
}
void PrintLog(const log_message &le)
{
// 过滤
if (_status == OPERATION)
return;
// 线程安全
pthread_mutex_lock(&gmutex);
switch (_mode)
{
case SCREAM_TYPE:
PrintScream(le);
break;
case FILE_TYPE:
PrintFile(le);
break;
default:
break;
}
pthread_mutex_unlock(&gmutex);
}
void logmessage(const std::string &filename, int filenumber, int level, const char *message, ...)
{
log_message le;
le._level = Getlevel(level);
le._id = syscall(SYS_gettid);
le._filename = filename;
le._filenumber = filenumber;
le._cur_time = Gettime();
va_list vt;
va_start(vt, message);
char buffer[128];
vsnprintf(buffer, sizeof(buffer), message, vt);
va_end(vt);
le._message = buffer;
// 打印日志
PrintLog(le);
}
~log()
{
}
private:
int _mode;
std::string _path;
int _status;
};
// 方便上层调用
log lg;
// ##不传时可忽略参数
#define LOG(level, message, ...) \
do \
{ \
lg.logmessage(__FILE__, __LINE__, level, message, ##__VA_ARGS__); \
} while (0)
#define SleftScream() \
do \
{ \
lg.SelectMode(SCREAM_TYPE); \
} while (0)
#define SleftFile() \
do \
{ \
lg.SelectMode(FILE_TYPE); \
} while (0)
}
//InetAddr.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class InetAddr
{
void ToHost()
{
//_ip=inet_ntoa(_addr.sin_addr);//4字节地址->char*
_port = ntohs(_addr.sin_port);
char buffer[124];
inet_ntop(AF_INET, &_addr.sin_addr, buffer, sizeof(buffer));
_ip = buffer;
}
public:
InetAddr(const std::string &ip, uint16_t port)
{
_ip = ip;
_port = port;
_addr.sin_family = AF_INET6;
_addr.sin_port = htons(port);
_addr.sin_addr.s_addr = INADDR_ANY;
}
InetAddr()
{
}
InetAddr(const struct sockaddr_in &addr)
: _addr(addr)
{
ToHost();
}
std::string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
struct sockaddr_in Addr()
{
return _addr;
}
bool operator==(const InetAddr &ad)
{
return (this->_ip == ad._ip && this->_port == ad._port);
}
std::string User()
{
std::string tmp = _ip + ":" + std::to_string(_port);
return tmp;
}
private:
std::string _ip;
uint16_t _port;
struct sockaddr_in _addr;
};
对读 写 异常 方法进行封装
//HandlerConnection.hpp
#pragma once
#include "Connection.hpp"
#include "Log.hpp"
using namespace log_ns;
class HandlerConnection
{
static const int gsize = 1024;
public:
HandlerConnection(handler server)
{
_server = server;
}
// 就绪的conn
void HandlerRecver(Connection *conn)
{
while (true)
{
errno = 0;
char bufferstr[gsize];
ssize_t n = ::recv(conn->Sockfd(), bufferstr, sizeof(bufferstr), 0);
if (n > 0)
{
bufferstr[n] = 0;
conn->AppendInbuffer(bufferstr);
}
else
{
if (errno == EWOULDBLOCK)
{
break;
}
else if (errno == EINTR)
{
continue;
}
else
{
conn->Excepter()(conn); // 统一处理异常
return; // 这里是返回!
}
}
}
// 读取完成 - 进行内容解析(处理粘包问题)
// 交给上层
_server(conn);
}
void HandlerSender(Connection *conn)
{
// 直接写
while (true)
{
errno = 0;
ssize_t n = ::send(conn->Sockfd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);
if (n > 0)
{
conn->DiscardOutbuffer(n);
}
else if (n == 0)
{
break;
}
else
{
if (errno == EWOULDBLOCK)
{
break;
}
else if (errno == EINTR)
{
continue;
}
else
{
conn->Excepter()(conn);
return;
}
}
}
// 一定是读条件不就绪(发送缓冲区满了)
if (!conn->Outbuffer().empty())
{
// 开启读事件->剩下的数据epoll会自动帮我发完
conn->R()->EnableReadWriteEvent(conn->Sockfd(), true, true);
}
else
{
conn->R()->EnableReadWriteEvent(conn->Sockfd(), true, false);
}
}
void HandlerExcepter(Connection *conn)
{
conn->R()->DelConnection(conn->Sockfd());
}
private:
handler _server;
};
读事件要进行报文解析,读到完整请求后进行处理,生成应答后发出!
//Protocol.hpp
#pragma once
#include <iostream>
#include <memory>
#include <jsoncpp/json/json.h>
// #define SELF 1
// "len\r\n{json}\r\n" -- 自己设计出完整报文 len json的长度
// "\r\n" 第一个:区分len 和json边界 第二个:观察现象方便
const std::string sym = "\r\n";
const std::string space = " ";
// jsonstr变成完整报文
std::string Encode(const std::string &jsonstr)
{
int len = jsonstr.size();
return std::to_string(len) + sym + jsonstr + sym;
}
// 把json提取出来
// "len\r"
// "len\r\n{json}\r"
// "len\r\n{json}\r\n"
// "len\r\n{json}\r\n""len\r\n{json}\r\n""len\r\n{j"
std::string Decode(std::string &nameplate)
{
size_t pos = nameplate.find(sym);
if (pos == std::string::npos)
{
return "";
}
std::string lenstr = nameplate.substr(0, pos);
int len = std::stoi(lenstr);
// 计算出完整报文长度
int TotalLen = lenstr.size() + sym.size() + len + sym.size();
if (nameplate.size() < TotalLen)
{
return "";
}
// 读报文
std::string jsonstr = nameplate.substr(pos + sym.size(), len);
// 删报文
nameplate.erase(0, TotalLen);
return jsonstr;
}
class req
{
public:
req()
{
}
req(int x, int y, std::string &sym)
: _x(x), _y(y), _sym(sym)
{
}
// 结构化->字符串
void Serialize(std::string *out)
{
#ifdef SELF
// 1.自己做 -> "_x _sym _y"
// 2.使用现成库:jsoncpp
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["sym"] = _sym;
Json::FastWriter writer;
*out = writer.write(root);
#else
//"len\r\n {_x _sym _y} \r\n"
std::string x = std::to_string(_x);
std::string y = std::to_string(_y);
*out = x + space + _sym + space + y;
#endif
}
// 字符串->结构化
bool Deserialize(std::string &in)
{
#ifdef SELF
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res)
return false;
_x = root["x"].asInt();
_y = root["y"].asInt();
_sym = root["sym"].asString();
return true;
#else
//"len\r\n {_x _sym _y} \r\n"
auto left_space = in.find(space);
if (left_space == std::string::npos)
return false;
auto right_space = in.rfind(space);
if (right_space == std::string::npos)
return false;
if (left_space + space.size() + 1 != right_space)
return false;
std::string x = in.substr(0, left_space);
if (x.empty())
return false;
std::string sym = in.substr(left_space + space.size(), right_space);
if (sym.empty())
return false;
std::string y = in.substr(right_space + space.size());
if (y.empty())
return false;
_x = std::stoi(x.c_str());
_sym = sym;
_y = std::stoi(y.c_str());
return true;
#endif
}
void SetValue(int x, int y, char sym)
{
_x = x;
_y = y;
_sym = sym;
}
~req() {}
int _x;
int _y;
std::string _sym; //-x +-*/ _y
};
class rep
{
public:
rep()
: _result(0), _code(0), _des("sucess")
{
}
void Serialize(std::string *out)
{
#ifdef SELF
Json::Value root;
root["result"] = _result;
root["code"] = _code;
root["des"] = _des;
Json::FastWriter writer;
*out = writer.write(root);
#else
std::string result = std::to_string(_result);
std::string code = std::to_string(_code);
*out = result + space + code + space + _des;
#endif
}
bool Deserialize(std::string &in)
{
#ifdef SELF
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res)
return false;
_result = root["result"].asInt();
_code = root["code"].asInt();
_des = root["des"].asString();
return true;
#else
auto left_space = in.find(space);
if (left_space == std::string::npos)
return false;
auto right_space = in.rfind(space);
if (right_space == std::string::npos)
return false;
if (left_space + space.size() + 1 != right_space)
return false;
std::string result = in.substr(0, left_space);
if (result.empty())
return false;
std::string code = in.substr(left_space + space.size(), right_space);
if (code.empty())
return false;
std::string des = in.substr(right_space + space.size());
if (des.empty())
return false;
_result = std::stoi(result.c_str());
_code = std::stoi(code.c_str());
_des = des;
return true;
#endif
}
~rep() {}
int _result;
int _code; // 0-> sucess 1->div zero 2->mod zero 3->fail
std::string _des;
};
// 工厂模式
class Factory
{
public:
static std::shared_ptr<req> BuidRequest()
{
return std::make_shared<req>();
}
static std::shared_ptr<rep> BuidReponse()
{
return std::make_shared<rep>();
}
};
//PackageParse.hpp
#pragma once
#include <iostream>
#include <functional>
#include "Log.hpp"
#include "Protocol.hpp"
#include "Connection.hpp"
using namespace log_ns;
using protocol_t = std::function<std::shared_ptr<rep>(std::shared_ptr<req>)>;
class PackageParse
{
public:
PackageParse(protocol_t process)
: _process(process)
{
}
~PackageParse()
{
}
void server(Connection *conn)
{
while (true)
{
// 报文解析 -- 不能保证读到完整的报文
std::string message = ::Decode(conn->Inbuffer());
if (message == "")
break;
// 反序列化 -- 能保证读到完整报文
std::shared_ptr<req> q = Factory::BuidRequest();
q->Deserialize(message);
// 处理事务
auto p = _process(q);
// 序列化处理
std::string result;
p->Serialize(&result);
LOG(DEBUG, "client request finish: %s\n", result.c_str());
// 添加报头
result = Encode(result);
// 发送
conn->AppendOutbuffer(result);
// break;
}
// 至少处理了一个请求,有一个应答
// 方法1:直接发 - 单进程
if (!conn->Outbuffer().empty()) conn->Sender()(conn);
// 方法2:激活对事件的关心即可,IO全都是由Reactor处理 -> 解决线程安全
// 半同步半异步模式
//if (!conn->Outbuffer().empty()) conn->R()->EnableReadWriteEvent(conn->Sockfd(),true,true);
}
private:
protocol_t _process;
};
//Netcal.hpp
#pragma once
#include "Protocol.hpp"
// req -> rep
class Cal
{
public:
Cal() {}
~Cal() {}
std::shared_ptr<rep> Count(std::shared_ptr<req> q)
{
auto p = Factory::BuidReponse();
const char *a = q->_sym.c_str();
switch (*a)
{
case '+':
p->_result = q->_x + q->_y;
break;
case '-':
p->_result = q->_x - q->_y;
break;
case '*':
p->_result = q->_x * q->_y;
break;
case '/':
{
if (q->_y == 0)
{
p->_result = -1;
p->_code = 1;
p->_des = "div zero";
}
else
{
p->_result = q->_x / q->_y;
}
break;
}
case '%':
{
if (q->_y == 0)
{
p->_result = -1;
p->_code = 2;
p->_des = "mod zero";
}
else
{
p->_result = q->_x % q->_y;
}
break;
}
default:
p->_result = -1;
p->_code = 3;
p->_des = "illegal operation";
break;
}
return p;
}
};
现象
对单进程版Reactor进行改进:添加线程池
另外一种方案
实现思路
以上便是IO模型的全部内容,有错误欢迎在评论区指正,感谢观看!