io多路复用, select, poll, epoll
系列文章目录
异步I/O操作函数aio_xxx函数 https://blog.csdn.net/surfaceyan/article/details/134710393
文章目录
- 系列文章目录
- 前言
- 一、5种IO模型
- 二、IO多路复用API
- select
- poll
- epoll
- 三、两种高效的事件处理模式
- Reactor模式
- Proactor模式
- 模拟 Proactor 模式
- 基于事件驱动的非阻塞同步IO
- 辅助函数
- 四、多种线程池的实现方式
- 基本的
- modern C++
- references
前言
一、5种IO模型
- 阻塞IO
- 非阻塞IO
- IO复用
- 信号驱动
Linux用套接字进行信号驱动IO,安装一个信号处理函数,进程继续运行并不阻塞,当IO事件就绪,进程收到SIGIO信号,然后处理IO事件 - 异步
https://blog.csdn.net/surfaceyan/article/details/134710393
二、IO多路复用API
include
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/select.h>
#include <poll.h>
#include <sys/epoll.h>
select
selelct 能够监控的最大文件描述符数量必须小于FD_SETSIZE,poll和epoll没有文件描述符数量限制
select返回后所有的参数都看成未定义的需要重填
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
- nfds: 监听集合中最大的文件描述符+1,数组索引+1
- readfds: 监听待读取的文件描述符集合
- writefds: 监听待写入的文件描述符集合
- exceptfds: 监听“exceptional conditions”,see POLLPRI in poll(2)
当函数返回后,上面的集合都会被清零,除了集合中满足条件的
- timeout: 超时时间, 为NULL, select阻塞,时间为0则函数立即返回
函数会在以下情况时返回:
- 一个文件描述符处于就绪状态
- 调用被信号句柄中断
- 时间到期
select调用后可将timeout看成未定义的(timeout剩余时间,有些系统可能不会这样做)
select调用后返回 r w e集合总共被置位的个数,0代表到期
-1代表错误,可能原因: badfd, signal int, nfds < 0 > RLIMIT_NOFILE, timeout invalid, 内存不足导致无法分配内部表格
int pselect(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, const struct timespec *timeout,
const sigset_t *sigmask);
pselect允许捕获信号
select可能更新timeout为剩余时间,pselect不会改变这个参数
int main()
{
fd_set rfds;
struct timeval tv;
int retval;
FD_ZERO(&rfds);
int fd = 0;
FD_SET(fd, &rfds);
tv.tv_sec = 5;
tv.tv_usec = 0;
retval = select(fd+1, &rfds, NULL, NULL, &tv);
if (retval < 0)
perror("select()");
else if (retval) {
printf("data is %d.\n", FD_ISSET(fd, &rfds);
char buf[BUFSIZ] = {0};
int n = read(fd, buf, BUFSIZ);
printf("n %d: %s\n", n, buf);
} else {
printf("timeout\n");
}
}
poll
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- fds: 待监控的fd集合
struct pollfd {
int fd; // 如果< 0 则events会被忽略,revents返回0
short events; // requested events 输入参数
short revents; // returned events 输出参数
};
- nfds: fds数组大小
- timeout: 阻塞的毫秒数,-1代表阻塞,0代表不阻塞
epoll会一直阻塞,直到:
- 一个文件描述符准备就绪
- 调用被信号中断
- 时间到期
POLLIN: 就绪读
POLLPRI: 异常条件
POLLOUT: 可写的
POLLRDHUP: socket对端关闭了连接
POLLERR:必有
POLLHUP: 必有,对端关闭链接后再read返回0(EOF)(仅当所有残留的数据都被读取时)
POLLNVAL:必有,无效请求(fd没有open)
成功返回非负值,指明pollfds中有几个revents为非零,0代表时间到期
-1 on error
EFAULT fds指针错误
EINTR 被信号中断
EINVAL The nfds value exceeds the RLIMIT_NOFILE value.
EINVAL (ppoll()) The timeout value expressed in *ip is invalid (negative).
ENOMEM 不能为内核数据结构分配内存
int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *tmo_p, const sigset_t *sigmask);
允许应用程序安全地等待,直到文件描述符准备就绪或捕获到信号。
#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \
} while (0)
int main(int argc, char *argv[])
{
int nfds, num_open_fds;
struct pollfd *pfds;
if (argc < 2) {
fprintf(stderr, "Usage: %s file...\n", argv[0]);
exit(EXIT_FAILURE);
}
num_open_fds = nfds = argc - 1;
pfds = (pollfd*)calloc(nfds, sizeof(struct pollfd));
if (pfds == NULL)
errExit("malloc");
/* Open each file on command line, and add it 'pfds' array */
for (int j = 0; j < nfds; j++) {
pfds[j].fd = open(argv[j + 1], O_RDONLY);
if (pfds[j].fd == -1)
errExit("open");
printf("Opened \"%s\" on fd %d\n", argv[j + 1], pfds[j].fd);
pfds[j].events = POLLIN;
}
/* Keep calling poll() as long as at least one file descriptor is
open */
while (num_open_fds > 0) {
int ready;
printf("About to poll()\n");
ready = poll(pfds, nfds, -1);
if (ready == -1)
errExit("poll");
printf("Ready: %d\n", ready);
/* Deal with array returned by poll() */
for (int j = 0; j < nfds; j++) {
char buf[10];
if (pfds[j].revents != 0) {
printf(" fd=%d; events: %s%s%s\n", pfds[j].fd,
(pfds[j].revents & POLLIN) ? "POLLIN " : "",
(pfds[j].revents & POLLHUP) ? "POLLHUP " : "",
(pfds[j].revents & POLLERR) ? "POLLERR " : "");
if (pfds[j].revents & POLLIN) {
ssize_t s = read(pfds[j].fd, buf, sizeof(buf));
if (s == -1)
errExit("read");
printf(" read %zd bytes: %.*s\n",
s, (int) s, buf);
} else { /* POLLERR | POLLHUP */
printf(" closing fd %d\n", pfds[j].fd);
if (close(pfds[j].fd) == -1)
errExit("close");
num_open_fds--;
}
}
}
}
printf("All file descriptors closed; bye\n");
exit(EXIT_SUCCESS);
}
epoll
epoll_create(2), epoll_create1(2), epoll_ctl(2), epoll_wait(2)
int epoll_create1(int flags);
创建一个epoll实例
- flags: 为0 等价于epoll_create
EPOLL_CLOEXEC
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
;
返回就绪的问题件描述符个数
0代表时间到期
-1代表 EINTR被信号中断
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- op:
- EPOLL_CTL_ADD:
- EPOLL_CTL_MOD
- EPOLL_CTL_DEL
events为带监听的事件
- EPOLLIN :
- EPOLLOUT :
- EPOLLRDHUP :对端关闭连接(√)
- EPOLLPRI :异常条件(√)
- EPOLLERR :发生错误(默认必监听)(√)
- EPOLLHUP : 类似对端关闭链接(默认必监听)(√)
- EPOLLET :
- EPOLLONESHOT : 只会触发一次
- EPOLLWAKEUP :
- EPOLLEXCLUSIVE: (默认必监听)(√)
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
epollfd = epoll_create1(EPOLL_CLOEXEC);
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev);
for (;;)
{
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
error;
}
for (int i = 0; i < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
con_sock = accept(listen_sock, NULL, NULL);
setnonblocking(conn_sock);
ev.data.fd conn_sock;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev);
} else {
do_use_fd(events[i].data.fd);
}
}
}
三、两种高效的事件处理模式
服务器程序通常需要处理三类事件:I/O 事件、信号及定时事件。有两种高效的事件处理模式:Reactor
和 Proactor,同步 I/O 模型通常用于实现 Reactor 模式,异步 I/O 模型通常用于实现 Proactor 模式。
Reactor模式
要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即将该事件通知工作
线程(逻辑单元),将 socket 可读可写事件放入请求队列,交给工作线程处理。除此之外,主线程不做
任何其他实质性的工作。读写数据,接受新的连接,以及处理客户请求均在工作线程中完成。
使用同步 I/O(以 epoll_wait 为例)实现的 Reactor 模式的工作流程是:
- 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
- 主线程调用 epoll_wait 等待 socket 上有数据可读。
- 当 socket 上有数据可读时, epoll_wait 通知主线程。主线程则将 socket 可读事件放入请求队列。
- .睡眠在请求队列上的某个工作线程被唤醒,它从 socket 读取数据,并处理客户请求,然后往 epoll
内核事件表中注册该 socket 上的写就绪事件。 - .当主线程调用 epoll_wait 等待 socket 可写。
- 当 socket 可写时,epoll_wait 通知主线程。主线程将 socket 可写事件放入请求队列。
- 睡眠在请求队列上的某个工作线程被唤醒,它往 socket 上写入服务器处理客户请求的结果。
Proactor模式
Proactor 模式将所有 I/O 操作都交给主线程和内核来处理(进行读、写),工作线程仅仅负责业务逻
辑。使用异步 I/O 模型(以 aio_read 和 aio_write 为例)实现的 Proactor 模式的工作流程是:
- 主线程调用 aio_read 函数向内核注册 socket 上的读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序(这里以信号为例)。
- 主线程继续处理其他逻辑。
- 当 socket 上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用。
- 应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求后,调用 aio_write 函数向内核注册 socket 上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序。
- 主线程继续处理其他逻辑。
- 当用户缓冲区的数据被写入 socket 之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕。
- 应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭 socket。
模拟 Proactor 模式
使用同步 I/O 方式模拟出 Proactor 模式。原理是:主线程执行数据读写操作,读写完成之后,主线程向工作线程通知这一”完成事件“。那么从工作线程的角度来看,它们就直接获得了数据读写的结果,接下来要做的只是对读写的结果进行逻辑处理。
使用同步 I/O 模型(以 epoll_wait为例)模拟出的 Proactor 模式的工作流程如下:
- . 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
- 主线程调用 epoll_wait 等待 socket 上有数据可读。
- 当 socket 上有数据可读时,epoll_wait 通知主线程。主线程从 socket 循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列。
- 睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往 epoll 内核事件表中注册 socket 上的写就绪事件。
- 主线程调用 epoll_wait 等待 socket 可写。
- . 主线程调用 epoll_wait 等待 socket 可写。
- 当 socket 可写时,epoll_wait 通知主线程。主线程往 socket 上写入服务器处理客户请求的结果。
基于事件驱动的非阻塞同步IO
int main(int argc, char* argv[])
{
Client* clients = new Client[MAXIMUM_FD];
int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
struct sockaddr_in address;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_family = AF_INET;
address.sin_port = htons( std::atoi(argv[1]) );
int reuse = 1;
int ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
assert(ret == 0);
ret = bind(listen_fd, (struct sockaddr*)&address, sizeof(address));
assert(ret == 0);
ret = listen(listen_fd, 5);
assert(ret == 0);
epoll_event events[MAX_EVENTS];
int epoll_fd = epoll_create1(EPOLL_CLOEXEC);
assert(epoll_fd > 0);
addfd2epoll(epoll_fd, listen_fd, false);
Client::listen_fd = listen_fd;
Client::epoll_fd = epoll_fd;
while (1) {
int number = epoll_pwait(epoll_fd, events, MAX_EVENTS, -1, nullptr);
if (number < 0) {
if (errno == EINTR) {
fprintf(stderr, "interrupted by a sig\n");
break;
} else {
perror("epoll_wait");
break;
}
} else if (number == 0){
continue; // timeout
}
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listen_fd) {
int client_fd = accept4(listen_fd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (client_fd < 0) continue;
if (client_fd >= MAXIMUM_FD) {
fprintf(stderr, "clients out of limits\n");
close(client_fd);
continue;
}
clients[client_fd].init(client_fd, NULL);
} else if (events[i].events & (EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)) {
clients[sockfd].close_connection(); // error
} else if (events[i].events & EPOLLIN) {
Client* client = clients + sockfd;
if ( client->do_read() && pool->push_back(client)) {
continue;
}
client->close_connection();
} else if (events[i].events & EPOLLOUT) {
Client* client = clients + sockfd;
if (client->do_write() == false) {
client->close_connection();
}
}
}
}
close(epoll_fd);
close(listen_fd);
delete[] clients;
delete pool;
return 0;
}
bool Client::do_read()
{
if (recv_idx_ >= recv_buf_len_) return false;
while (true) {
int n_read = recv(client_fd_, recv_buf_+recv_idx_, grecv_buf_len-recv_idx_, 0);
if (n_read < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break; // read done
return false;
} else if (n_read == 0) { // closed by peer
return false;
}
recv_idx_ += n_read;
}
return true;
}
bool Client::do_write()
{
if (send_num_ >= send_idx_) {
init();
modfd2epoll(epoll_fd, client_fd_, EPOLLIN);
return true;
}
while (1) {
int n_send = send(client_fd_, send_buf_+send_num_, send_idx_-send_num_, 0);
if (n_send < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
modfd2epoll(epoll_fd, client_fd_, EPOLLOUT);
return true;
}
return false;
} else if (n_send == 0) { // 对端已经关闭,然后继续写会收到 SIGPIPE
fprintf(stderr, "client: %d\n n_send = 0\n", client_fd_);
return false;
}
send_num_ += n_send;
if (send_num_ >= send_idx_) {
init();
modfd2epoll(epoll_fd, client_fd_, EPOLLIN);
return true;
}
}
}
辅助函数
int setnonblocking(int fd)
{
int old_opt = fcntl(fd, F_GETFL);
int new_opt = old_opt | O_NONBLOCK;
fcntl(fd, F_SETFL, new_opt);
return old_opt;
}
int setblocking(int fd)
{
int old_opt = fcntl(fd, F_GETFL);
int new_opt = old_opt & (~O_NONBLOCK);
fcntl(fd, F_SETFL, new_opt);
return old_opt;
}
int setcloexec(int fd)
{
int old_opt = fcntl(fd, F_GETFD);
int new_opt = old_opt | FD_CLOEXEC;
fcntl(fd, F_SETFD, new_opt);
return old_opt;
}
void addfd2epoll(int epoll, int fd, bool one_shot)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI;
if (one_shot)
{
event.events |= EPOLLONESHOT;
}
epoll_ctl(epoll, EPOLL_CTL_ADD, fd, &event);
}
void rmfromepoll(int epoll, int fd)
{
epoll_ctl(epoll, EPOLL_CTL_DEL, fd, nullptr);
}
void modfd2epoll(int epoll, int fd, int ev)
{
epoll_event event;
event.data.fd = fd;
event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP | EPOLLPRI;
epoll_ctl(epoll, EPOLL_CTL_MOD, fd, &event);
}
四、多种线程池的实现方式
基本的
#include <pthread.h>
#include <semaphore.h>
#include <list>
#include <exception>
class ThpoolException : public std::exception
{
public:
const char* what() const _GLIBCXX_TXN_SAFE_DYN _GLIBCXX_NOTHROW override
{ return "thread pool init failed\n"; }
};
class LockGuard // RAII
{
private:
pthread_mutex_t& mtx_;
public:
LockGuard(pthread_mutex_t& mtx) : mtx_(mtx)
{ pthread_mutex_lock(&mtx_); }
~LockGuard()
{ pthread_mutex_unlock(&mtx_); }
};
template<typename T>
class thread_pool
{
public:
static bool running;
private:
int thread_num;
std::list<T*> queue;
int max_q_len;
pthread_mutex_t q_mtx;
sem_t q_sem;
public:
thread_pool(int thread_num_, int max_len)
: thread_num(thread_num_), max_q_len(max_len)
{
int ret = 0;
ret = pthread_mutex_init(&q_mtx, nullptr);
if (ret != 0) throw ThpoolException();
ret = sem_init(&q_sem, 0, 0);
if (ret != 0) throw ThpoolException();
for (int i=0; i < thread_num; ++i) {
pthread_t thid;
ret = pthread_create(&thid, nullptr, &thread_pool::on_process, this);
if (ret != 0) throw ThpoolException();
ret = pthread_detach(thid);
if (ret != 0) throw ThpoolException();
}
}
~thread_pool()
{
pthread_mutex_destroy(&q_mtx);
sem_destroy(&q_sem);
running = false;
}
bool push_back(T* client)
{
bool success;
{
LockGuard lg(q_mtx);
if (queue.size() < max_q_len) {
queue.push_back(client);
success = true;
sem_post(&q_sem);
}
else success = false;
}
return success;
}
static void* on_process(void* arg)
{ ((thread_pool*)arg)->do_process(); return nullptr; }
void do_process()
{
while (running){
T* client = nullptr;
sem_wait(&q_sem);
{
LockGuard lg(q_mtx);
if (queue.empty() == false){
client = queue.front();
queue.pop_front();
}
}
if (client) client->do_process();
}
}
};
template<typename T>
bool thread_pool<T>::running = true;
modern C++
class TaskQueue {
public:
TaskQueue() = default;
virtual ~TaskQueue() = default;
virtual bool enqueue(std::function<void()> fn) = 0;
virtual void shutdown() = 0;
virtual void on_idle() {}
};
class ThreadPool final : public TaskQueue {
public:
explicit ThreadPool(size_t n, size_t mqr = 0)
: shutdown_(false), max_queued_requests_(mqr) {
while (n) {
threads_.emplace_back(worker(*this));
n--;
}
}
ThreadPool(const ThreadPool &) = delete;
~ThreadPool() override = default;
bool enqueue(std::function<void()> fn) override {
{
std::unique_lock<std::mutex> lock(mutex_);
if (max_queued_requests_ > 0 && jobs_.size() >= max_queued_requests_) {
return false;
}
jobs_.push_back(std::move(fn));
}
cond_.notify_one();
return true;
}
void shutdown() override {
// Stop all worker threads...
{
std::unique_lock<std::mutex> lock(mutex_);
shutdown_ = true;
}
cond_.notify_all();
// Join...
for (auto &t : threads_) {
t.join();
}
}
private:
struct worker {
explicit worker(ThreadPool &pool) : pool_(pool) {}
void operator()() {
for (;;) {
std::function<void()> fn;
{
std::unique_lock<std::mutex> lock(pool_.mutex_);
pool_.cond_.wait(
lock, [&] { return !pool_.jobs_.empty() || pool_.shutdown_; });
if (pool_.shutdown_ && pool_.jobs_.empty()) { break; }
fn = pool_.jobs_.front();
pool_.jobs_.pop_front();
}
assert(true == static_cast<bool>(fn));
fn();
}
}
ThreadPool &pool_;
};
friend struct worker;
std::vector<std::thread> threads_;
std::list<std::function<void()>> jobs_;
bool shutdown_;
size_t max_queued_requests_ = 0;
std::condition_variable cond_;
std::mutex mutex_;
};
references
IO多路复用 https://www.cnblogs.com/flashsun/p/14591563.html
socket 网络编程——端口复用技术 https://blog.csdn.net/JMW1407/article/details/107321853