Linux高性能服务器编程 | 读书笔记 | 9.定时器
9. 定时器
网络程序需要处理定时事件,如定期检测一个客户连接的活动状态。服务器程序通常管理着众多定时事件,有效地组织这些定时事件,使其在预期的时间被触发且不影响服务器的主要逻辑,对于服务器的性能有至关重要的影响。为此,我们要将每个定时事件分别封装成定时器,并使用某种容器类数据结构,如链表、排序链表、时间轮,将所有定时器串联起来,以实现对定时事件的统一管理。本章主要讨论两种高效的管理定时器的容器:时间轮和时间堆。
定时指在一段时间后触发某段代码的机制,我们可以在这段代码中依次处理所有到期的定时器,即定时机制是定时器得以被处理的原动力。Linux提供三种定时方法:
- socket套接字选项
SO_RCVTIMEO
和SO_SNDTIMEO
; SIGALRM
信号;- I/O复用系统调用的超时参数。
文章目录
- 9. 定时器
- 1.socket 选项 `SO_RCVTIMEO` 和 `SO_SNDTIMEO`
- 2.使用 `SO_SNDTIMEO` 选项设置定时
- setsockopt讲解
- 代码
- 3.SIGALRM 信号
- 1.基于升序链表的定时器
- 2.处理非活动连接
- 处理非活动连接:利用 alarm 函数周期性触发 SIGALRM 信号
- 服务器端
- 客户端
- 效果
- 4.I/O 复用系统调用的超时函数
- 基于 `epoll` 的事件循环与定时机制实现
- `time_t time(time_t *t);`
- 参数
- 返回值
- 6.高性能定时器(较难,需复习)
- 时间轮
- 复杂度分析
- 时间堆
- 复杂度分析
1.socket 选项 SO_RCVTIMEO
和 SO_SNDTIMEO
SO_RCVTIMEO
设置 socket 接收数据超时时间。
SO_SNDTIMEO
设置 socket 发送数据超时时间。
这两个数据仅对与数据接收和发送相关的 socket 系统调用 send
、sendmsg
、recv
、recvmsg
、accept
和 connect
。
在程序中,我们根据上述系统调用的返回值以及 errno
来判断超时时间是否已到,进而决定是否开始处理定时任务。
2.使用 SO_SNDTIMEO
选项设置定时
setsockopt讲解
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
参数
- sockfd:
- 类型:int
- 解释:这是一个整型变量,表示要操作的套接字的文件描述符(file descriptor)。
- level:
- 类型:int
- 解释:这个参数指定了选项所属的协议层次,如SOL_SOCKET(套接字级别)、IPPROTO_IP(IP协议级别)等。
- optname:
- 类型:int
- 解释:这是选项的名称,比如SO_REUSEADDR、SO_TIMEOUT等,是套接字选项库中预定义的一个整数常量。
- optval:
- 类型:const void*
- 解释:这个参数指向一个包含选项值的内存区域。选项值的类型取决于特定的选项,可能是一个整数、字符数组、枚举值等。
- optlen:
- 类型:socklen_t
- 解释:这是一个长度类型的参数,表示optval所指向的内存区域大小。在调用前,应预先根据选项和可能的值设置其正确的大小。
返回值
- 成功:返回0。
- 失败:返回-1(SOCKET_ERROR),并设置errno来表示错误的原因。可能的错误代码包括EBADF(sock不是有效的文件描述词)、EFAULT(optval指向的内存并非有效的进程空间)、EINVAL(在调用setsockopt()时,optlen无效)、ENOPROTOOPT(指定的协议层不能识别选项)、ENOTSOCK(sock描述的不是套接字)等。
常用选项示例
- SO_RCVBUF和SO_SNDBUF:用于设置/读取发送缓冲区和接收缓冲区大小。选项值类型为int,指定新的缓冲区大小。对setsockopt和getsockopt有效。设置缓冲区大小只能在TCP连接建立之前进行。
- SO_REUSEADDR:用于复用socket地址(端口号)。选项值类型为int,0表示不能复用,1表示可以复用,默认值为0。对setsockopt和getsockopt有效。
- SO_KEEPALIVE:用于TCP socket检测/保持网络连接。选项值类型为int,0表示不能发送探测数据段,1表示可以发送,默认值为0。对setsockopt和getsockopt有效。
- TCP_NODELAY:用于禁止TCP协议的Nagle算法,使小数据包(小于最大数据段大小)立即发送,可能会降低TCP协议的效率。
setsockopt函数的作用是设置与指定的套接字关联的选项,从而影响套接字的行为。通过调用setsockopt函数,可以设置这些选项的值,以满足不同的网络编程需求。
代码
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <libgen.h>
/* 超时连接函数
* 这个函数的作用是尝试与指定IP地址和端口的服务器建立连接,
* 并设置一个连接超时时间。
*/
int timeout_connect(const char *ip, int port, int time) {
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int sockfd = socket(PF_INET, SOCK_STREAM, 0);
assert(sockfd >= 0);
// SO_RCVTIMEO和SO_SNDTIMEO套接字选项对应的值类型为timeval,这和select函数的超时参数类型相同
struct timeval timeout;
timeout.tv_sec = time;
timeout.tv_usec = 0;
socklen_t len = sizeof(timeout);
ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);
assert(ret != -1);
/*
* 如果连接在指定时间内没有成功建立,connect() 函数将返回 -1,
* 并且 errno 会被设置为 EINPROGRESS,表示连接超时。
*/
printf("Attempting to connect...\n");
ret = connect(sockfd, (struct sockaddr *)&address, sizeof(address));
printf("Connect returned: %d\n", ret);
if (ret == -1) {
// 超时对应的错误号是EINPROGRESS,此时就可执行定时任务了
if (errno == EINPROGRESS) {
printf("conencting timeout, process timeout logic\n");
return -1;
}
printf("error occur when connecting to server\n");
return -1;
}
return sockfd;
}
int main(int argc, char *argv[]) {
if (argc != 3) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
/* 超时时间 10s */
int sockfd = timeout_connect(ip, port, 10);
if (sockfd < 0) {
return 1;
}
return 0;
}
编译:
g++ -o client connect_timeout.cpp
运行:
./client 192.168.100.100 8080 /* 这是一个无效的IP地址 */
结果:
3.SIGALRM 信号
由alarm
和setitimer
函数设置的实时闹钟一旦超时,就会触发SIGALRM
信号,我们可以使用该信号的信号处理函数来处理定时任务。
1.基于升序链表的定时器
定时器通常至少要包含两个成员:一个超时时间(相对时间或绝对时间)和一个任务回调函数。有时还可能包含回调函数被执行时需要传入的参数,以及是否重启定时器等信息。如果使用链表作为容器来串联所有定时器,则每个定时器还要包含指向下一定时器的指针成员,如果链表是双向的,则每个定时器还需要包含指向前一个定时器的指针成员。
从执行效率来看,添加定时器的时间复杂度是O(n)
,删除定时器的时间复杂度是O(1)
,执行定时任务的时间复杂度平均是O(1)
。
#ifndef LST_TIMER
#include<time.h>
#include<sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <fcntl.h>
#include <libgen.h>
#define BUFFER_SIZE 64
class util_timer;
//用户数据结构:客户端socket地址,socket文件描述符,读缓存和定时器
struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
util_timer* timer;
};
//定时器类 双向链表节点
class util_timer
{
public:
util_timer():prev(NULL),next(NULL){}
util_timer* prev;//前一个
util_timer* next;//后一个
//任务超时时间 使用绝对时间
time_t expire;
//任务回调函数 时间到了就去调对应的回调函数
void (*cb_func)(client_data *);
//客户数据
client_data * user_data;
};
//定时器链表类 升序,双向,有头节点和尾节点
class sort_timer_1st
{
public:
sort_timer_1st():head(NULL),tail(NULL){}
~sort_timer_1st()
{
util_timer* tmp=head;
while(tmp)
{
head=tmp->next;
delete tmp;
tmp=head;
}
}
//插入定时器到链表
void add_timer(util_timer* timer)
{
if(!timer)
return ;
if(!head)
{
head=tail=timer;
return;
}
//插到开头
if(timer->expire<head->expire)
{
timer->next=head;
head->prev=timer;
head=timer;
return;
}
//插到中间和结尾
add_timer(timer,head);
}
//当某个任务的定时器时间发生变化的时候需要调整,这里是考虑定时器时间被延长的情况
void adjust(util_timer* timer)
{
if(!timer)
{
return;
}
util_timer* tmp=timer->next;
if(!tmp||(timer->expire<tmp->expire))
{
return;
}
//头节点被延长,拿出来重新插入
if(timer==head)
{
head=head->next;
head->prev=NULL;
timer->next=NULL;
add_timer(timer,head);
}
//如果不是头节点,那就拿出来插入到它之后的部分
else
{
timer->prev->next=timer->next;
timer->next->prev=timer->prev;
add_timer(timer,timer->next);
}
}
// 尝试找到定时器Timer从链表里删除 *
void del_timer(util_timer* timer)
{
if (!timer)
{
return;
}
// 下面这个函数用于删除链表中只有一个定时器,即目标定时器
if ((timer == head) && (timer == tail))
{
delete timer;
head = NULL;
tail = NULL;
return;
}
// 如果链表中至少有两个定时器,且目标定时器是链表的头节点,将链表的头节点重定位为原
// 头节点的下一个节点,然后删除目标定时器
if (timer == head)
{
head = head->next;
head->prev = NULL;
delete timer;
return;
}
// 如果链表中至少有两个定时器,且目标定时器是链表的尾节点,将链表的尾节点重定位为原
// 尾节点的前一个节点,然后删除目标定时器
if (timer == tail)
{
tail = tail->prev;
tail->next = NULL;
delete timer;
return;
}
// 如果目标定时器位于链表的中间,把它前后的定时器串联起来,然后删除目标定时器 *
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}
// SIGALRM信号每次被触发时就执行其信号处理函数(如果使用统一事件源,则是主函数)
// 中执行一次tick函数。以处理链表上到期的任务
void tick()
{
if (!head)
{
return;
}
printf("timer tick\n");
time_t cur = time(NULL); // 获取系统当前的时间
util_timer* tmp = head;
// 从头节点开始依次处理每个定时器,直到遇到一个尚未到期的定时器。这是定时器的核心逻辑
while (tmp)
{
// 因为每个定时器都使用绝对时间作为超时时间,所以我们可以判断定时器的超时值和系统当
// 前时间,以判断定时器是否到期
if (cur < tmp->expire)
{
break;
}
// 调用定时器的回调函数,以执行定时任务
tmp->cb_func(tmp->user_data);
// 执行完定时器中的定时任务之后,将它从链表中删除,并重置链表头节点
head = tmp->next;
if (head)
{
head->prev = NULL;
}
delete tmp;
tmp = head;
}
}
private:
//辅助函数
void add_timer(util_timer* timer,util_timer* lst_head)
{
util_timer* prev=lst_head;
util_timer* tmp=prev->next;
//从head开始遍历,找第一个比timer大的
while(tmp)
{
if(timer->expire<tmp->expire)
{
prev->next=timer;
timer->next=tmp;
tmp->prev=timer;
timer->prev=prev;
break;
}
prev=tmp;
tmp=tmp->next;
}
//没找到比它大的,就插入到末尾
if(!tmp)
{
prev->next=timer;
timer->prev=prev;
timer->next=NULL;
tail=timer;
}
}
util_timer* head;
util_timer* tail;
};
#endif
核心函数tick相当于一个心博函数,每隔一段固定的时间就执行一次,以检测并处理到期的任务。
判断任务到期的依据是定时器党的expire值小于当前的系统时间。
2.处理非活动连接
现在考虑以上升序定时器链表的实际应用 ——处理非活动连接。服务器进程通常要定期处理非活动连接:给客户端发一个重连请求,或关闭该连接,或者其他。
Linux在内核中提供了对连接是否处于活动状态的定期检查机制,我们可通过socket选项KEEPALIVE
来激活它,不过这样会使得应用程序对连接的管理变得复杂。我们考虑在应用层实现类似KEEPALIVE
的机制,以管理所有长时间处于非活动状态的连接。
处理非活动连接:利用 alarm 函数周期性触发 SIGALRM 信号
如以下代码利用alarm
函数周期性地触发SIGALRM
信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务——关闭非活动的连接。
服务器端
TIMESLOT
定义了定时器的基本时间单位,用于设定定时器回调函数的调用频率。这个值代表了服务器在触发 SIGALRM
信号后多长时间再次触发该信号,从而执行定时任务。在代码示例中,TIMESLOT
被设置为 5 秒,也就是说,基本时间单位是 5 秒。
alarm
函数用于安排信号在未来的某个时间点发送给进程。这个函数是标准库中的一部分,定义在 <unistd.h>
头文件中。当调用 alarm(TIMESLOT);
时,设定了一个闹钟,让操作系统在 TIMESLOT
秒后向当前进程发送一个 SIGALRM
信号。
alarm(TIMESLOT);
在 timer_handler()
函数中调用,意味着每隔 TIMESLOT
秒,操作系统将向进程发送一个 SIGALRM
信号。收到这个信号后,sig_handler
处理函数将被执行,进而调用 timer_handler
函数来处理所有定时任务。
当一个新的客户端连接被接受,服务器为这个连接创建一个定时器,并设置其超时时间:
timer->expire = cur + 3 * TIMESLOT;
cur
是当前时间,3 * TIMESLOT
表示定时器的超时时间是15秒(因为 3 乘以 5秒)。
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <libgen.h>
#include "lst_timer.h"
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5
static int pipefd[2];
static sort_timer_lst timer_lst; /* 用升序链表来管理定时器 */
static int epollfd = 0;
/* 将文件描述符设置为非阻塞模式 */
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
/* addfd 函数把新的文件描述符添加到 epoll 监听列表中,
* 并且设置为非阻塞和边缘触发模式(EPOLLET),以提高事件处理的效率。
非阻塞socket+ET比单纯的LT高效
*/
void addfd(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); /* 将fd注册到内核事件表epollfd中*/
setnonblocking(fd);
}
/* sig_handler 用于处理程序接收到的信号。
* 它把信号编号发送到 pipefd 管道,这样主循环可以安全地处理信号事件。
* 这种使用信号和管道的组合是多线程或多进程环境中处理信号的一种安全模式。
*/
void sig_handler(int sig) {
int save_errno = errno;
int msg = sig;
// 此处还是老bug,没有考虑字节序就发送了int的低地址的1字节
send(pipefd[1], (char *)&msg, 1, 0);//发送编号,编号的长度为1 0表示send没有任何其他的特殊行为就是普通的发送
errno = save_errno;
}
/* addsig 函数设置信号处理函数,并确保系统调用被中断时能自动重启,避免了部分系统调用失败的问题。 */
void addsig(int sig) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
void timer_handler() {
// 处理定时任务
timer_lst.tick();
// 由于alarm函数只会引起一次SIGALRM信号,因此重新定时,以不断触发SIGALRM信号
alarm(TIMESLOT);
}
/* 定时器回调函数,它删除非活动连接socket上的注册事件,并关闭之 */
void cb_func(client_data *user_data) {
epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);
close(user_data->sockfd);
printf("close fd %d\n", user_data->sockfd);
}
int main(int argc, char *argv[]) {
if (argc != 3) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5); /* 创建内核事件表 */
assert(epollfd != -1);
addfd(epollfd, listenfd);
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
assert(ret != -1);
setnonblocking(pipefd[1]); /* fd[1]只能用于数据写入。 */
addfd(epollfd, pipefd[0]);
// 设置信号处理函数
addsig(SIGALRM);
addsig(SIGTERM);
bool stop_server = false;
// 直接初始化FD_LIMIT个client_data对象,其数组索引是文件描述符
client_data *users = new client_data[FD_LIMIT];
bool timeout = false;
// 定时
alarm(TIMESLOT);
while (!stop_server) {
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); /* 它在一段超时时间内等待一组文件描述符上的事件 */
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}
for (int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd;
// 处理新到的客户连接
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
addfd(epollfd, connfd);
users[connfd].address = client_address;
users[connfd].sockfd = connfd;
// 创建一个定时器,设置其回调函数和超时时间,然后绑定定时器和用户数据,并将定时器添加到timer_lst中
util_timer *timer = new util_timer;
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
users[connfd].timer = timer;
timer_lst.add_timer(timer);
// 处理信号
} else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) {
int sig;
char signals[1024];
ret = recv(pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) {
// handle the error
continue;
} else if (ret == 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGALRM:
// 先标记为有定时任务,因为定时任务优先级比IO事件低,我们优先处理其他更重要的任务
timeout = true;
break;
case SIGTERM:
stop_server = true;
break;
}
}
}
// 从客户连接上接收到数据
} else if (events[i].events & EPOLLIN) {
memset(users[sockfd].buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd);
util_timer *timer = users[sockfd].timer;
if (ret < 0) {
// 如果发生读错误,则关闭连接,并移除对应的定时器
if (errno != EAGAIN) {
cb_func(&users[sockfd]);
if (timer) {
timer_lst.del_timer(timer);
}
}
} else if (ret == 0) {
// 如果对方关闭连接,则我们也关闭连接,并移除对应的定时器
cb_func(&users[sockfd]);
if (timer) {
timer_lst.del_timer(timer);
}
} else {
// 如果客户连接上读到了数据,则调整该连接对应的定时器,以延迟该连接被关闭的时间
if (timer) {
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
printf("adjust timer once\n");
timer_lst.adjust_timer(timer);
}
}
}
}
// 最后处理定时事件,因为IO事件的优先级更高,但这样会导致定时任务不能精确按预期的时间执行
if (timeout) {
timer_handler();
timeout = false;
}
}
close(listenfd);
close(pipefd[1]);
close(pipefd[0]);
delete[] users;
return 0;
}
整体讲解:
这段代码实现了一个基于 epoll
的网络服务器程序,它具备以下主要功能特点:
- 利用
epoll
实现高效的 I/O 复用,能同时处理多个客户端连接的 I/O 事件,并且采用了非阻塞套接字结合边缘触发(ET)模式来提升性能。 - 通过定时器链表(
sort_timer_lst
)来管理连接的超时时间,实现定时关闭长时间无活动的客户端连接功能,并且能够动态调整定时器时间(比如客户端有数据交互时延长连接的超时时间)。 - 可以处理系统信号,例如
SIGALRM
(用于定时任务触发)和SIGTERM
(用于优雅地终止服务器进程),并在相应信号到来时执行特定的操作。
和10.4统一事件源思想类似,把信号处理函数定义为对管道写数据,写的就是信号的编号,然后通过对管道进行事件监听,一旦监听到管道有读事件,那就是有信号发过来了,recv接收并处理信号,本题中的信号就是定时器信号,5秒一次,到了的话就去处理定时器链表里面的超时事件
同时监听客户的读事件,如果客户在5秒内进行了写事件,那么就要重新设置定时器表示这个客户是活跃的
客户端
为了测试服务器能够按照预定的定时器逻辑关闭非活动连接,我们可以编写一个简单的客户端程序,该程序连接到服务器后不发送任何数据,仅保持连接一定时间后(sleep(20);
)关闭,看服务器是否会在设定的超时时间后自动关闭该连接。
服务端中,定时器的超时时间被设置为3 * TIMESLOT
,即15秒(TIMESLOT
设置为5秒)。定时器的目的是在客户端在指定时间内没有任何活动(例如数据交换)的情况下自动关闭该连接。
SIGALRM
信号每次触发就在其信号处理函数(如果使用统一事件源,则是主函数)中执行一次tick
函数,处理链表上的到期任务,并在终端打印"timer tick"。
void timer_handler() {
/* 处理定时任务 */
timer_lst.tick();
/* 由于alarm函数只会引起一次SIGALRM信号,因此重新定时,以不断触发SIGALRM信号 */
alarm(TIMESLOT);
}
在服务器端,在没有收到任何数据的情况下,服务器在定时器到期后关闭了连接的日志消息,例如 “close fd <fd>
” 的输出,表示服务器正确处理了非活动连接。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int main(int argc, char *argv[]) {
if (argc != 3) {
printf("Usage: %s <ip> <port>\n", argv[0]);
return 1;
}
const char *server_ip = argv[1];
int server_port = atoi(argv[2]);
// 创建 socket
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("Socket creation failed");
return 1;
}
// 服务器地址结构
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {
perror("Invalid address/ Address not supported");
return 1;
}
// 连接到服务器
if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Connection Failed");
return 1;
}
printf("Connected to server, now sleeping...\n");
// 保持连接,但不进行任何通信
sleep(20); // 保持连接20秒,调整时间以匹配服务器设置的超时时间
// 关闭 socket
close(sock);
printf("Disconnected from server\n");
return 0;
}
效果
4.I/O 复用系统调用的超时函数
Linux下的3组I/O复用系统调用都带有超时参数,因此它们不仅能统一处理信号(通过管道在信号处理函数中通知主进程)和I/O事件,也能统一处理定时事件,但由于I/O复用系统调用可能在超时时间到期前就返回(有I/O事件发生),所以如果我们要利用它们来定时,就需要不断更新定时参数以反映剩余的时间,如下代码所示:
#define TIMEOUT 5000
int timeout = TIMEOUT;
time_t start = time(NULL);
time_t end = time(NULL);
while (1) {
printf("the timeout is now %d mil-seconds\n", timeout);
start = time(NULL);
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}
// 如果epoll_wait函数返回0,说明超时时间到,此时可处理定时任务,并重置定时时间
if (number == 0) {
// 处理定时任务
timeout = TIMEOUT;
continue;
}
// 到此,epoll_wait函数的返回值大于0,
end = time(NULL);
// 更新timeout的值为减去本次epoll_wait调用的持续时间
timeout -= (end - start) * 1000;
// 重新计算后的timeout值可能是0,说明本次epoll_wait调用返回时,不仅有文件描述符就绪,且其超时时间也刚好到达
// 此时我们要处理定时任务,并充值定时时间
if (timeout <= 0) {
// 处理定时任务
timeout = TIMEOUT;
}
// handle connections
}
基于 epoll
的事件循环与定时机制实现
这段代码实现了一个基于 epoll
的事件循环,并结合了定时处理的功能,整体功能围绕高效处理 I/O 事件以及定时任务展开,具体如下:
- 定时相关变量初始化
首先定义了一个宏TIMEOUT
表示超时时间(单位为毫秒,不过代码中实际换算有些问题,后续会详细说明),初始化为5000
。然后声明了一个整型变量timeout
并初始化为TIMEOUT
,用于记录剩余的超时时间。同时,通过time(NULL)
函数获取当前时间分别赋值给start
和end
,用于后续计算时间差来跟踪时间流逝情况。 - 循环主体功能
进入一个无限循环(while (1)
),在每次循环中执行以下操作:- 打印剩余超时时间信息:
使用printf
函数打印当前剩余的超时时间(这里代码将timeout
当作毫秒来处理,但实际time(NULL)
获取的是秒数,后面计算时间差时需要进行相应转换才能准确对应毫秒单位,不过暂且按现有逻辑分析),方便查看定时情况。 - 启动
epoll_wait
并处理返回结果:- 调用
epoll_wait
:每次循环都会调用epoll_wait
函数,传入epollfd
(epoll
实例对应的文件描述符)、events
(用于接收就绪事件的数组)、MAX_EVENT_NUMBER
(数组大小,限定了最多能接收的事件数量)以及timeout
(本次等待的超时时间,单位在代码逻辑中期望是毫秒,虽然前面提到换算有问题)。这个函数会阻塞等待文件描述符上有感兴趣的事件发生或者超时。 - 处理
epoll_wait
返回值小于 0 的情况:如果epoll_wait
返回值小于0
,并且错误原因不是因为被信号中断(errno!= EINTR
),则说明epoll
操作出现了真正的错误,此时会打印“epoll failure”
提示信息,并通过break
跳出循环,结束整个事件处理流程。 - 处理
epoll_wait
返回值等于 0 的情况(超时情况):当epoll_wait
返回0
时,意味着在设定的超时时间内没有文件描述符就绪事件发生,也就是超时时间到了。此时,代码会将timeout
重新设置为初始的TIMEOUT
值,用于下一轮循环继续等待事件时使用初始设定的超时时间,然后通过continue
语句直接进入下一轮循环,去再次等待事件或者超时。
- 调用
- 处理
epoll_wait
返回值大于 0 的情况(有文件描述符就绪):- 更新时间记录:当
epoll_wait
返回值大于0
时,表示有文件描述符上的事件就绪了。此时先获取当前时间赋值给end
,然后通过end - start
计算出本次epoll_wait
调用所持续的时间(这里获取的时间是以秒为单位,实际代码逻辑期望换算成毫秒去更新timeout
,存在前面提到的单位换算问题)。 - 更新剩余超时时间:用当前剩余的超时时间
timeout
减去本次epoll_wait
调用持续时间换算后的毫秒数((end - start) * 1000
,这里代码本意是将秒换算成毫秒,但实际time(NULL)
函数获取的是从 1970 年 1 月 1 日 00:00:00 UTC 到当前时间的秒数,所以换算逻辑不准确),以此来更新timeout
的值,反映剩余的超时时间还有多少。 - 处理超时时间耗尽情况:如果更新后的
timeout
值小于等于0
,说明本次epoll_wait
在有文件描述符就绪的同时,超时时间也刚好到达(或者已经超过了设定的超时时间),这时会将timeout
重新设置为初始的TIMEOUT
值,准备下一轮循环继续基于初始超时时间进行事件等待与定时处理,同时也意味着此时需要去处理定时任务(虽然代码中没有明确展示具体的定时任务处理内容,只是做了这个时间重置的相关操作)。
- 更新时间记录:当
- 处理就绪的连接(注释中的
handle connections
部分):
这部分代码没有具体展开,但可以推测在这里会对epoll_wait
返回的就绪文件描述符对应的连接进行相应的读、写等 I/O 操作处理,比如接收客户端发送的数据、向客户端发送响应数据等操作,完成服务器与客户端之间基于网络连接的交互功能。
- 打印剩余超时时间信息:
总体而言,这段代码通过不断循环调用 epoll_wait
函数来同时兼顾 I/O 事件的处理以及定时任务的触发,利用超时时间的计算和更新机制,使得程序既能及时响应文件描述符上的就绪事件,又能按照设定的时间间隔(虽然存在时间单位换算问题)来处理定时任务,在网络服务器等需要同时处理 I/O 和定时逻辑的场景中较为常用。不过代码中的时间计算部分需要修正时间单位换算逻辑,才能准确实现预期的定时功能。
time_t time(time_t *t);
函数原型:
time_t time(time_t *t);
参数
t
: 这是一个指向time_t
类型的指针,用于存储时间值。如果参数是NULL
,函数只返回当前时间。
返回值
- 函数返回一个
time_t
类型的值,它表示从Epoch至今的秒数。如果出现错误,则返回(time_t) -1
。
6.高性能定时器(较难,需复习)
时间轮
基于排序链表的定时器存在一个问题:添加定时器的效率偏低(添加定时器的时间复杂度是**O(n)**
)。下面讨论的时间轮解决了这个问题,一种简单的时间轮如下图:
上图中,时间轮内部的实线指针指向轮子上的一个槽(slot),它以恒定的速度顺时针转动,每转动一步就指向下一个槽(虚线指针所指的槽),每次转动称为一个滴答(tick),一个滴答的时间称为时间轮的槽间隔si(slot interval),它实际上就是心搏时间。上图中的时间轮共有N个槽,因此它转动一周的时间是N * si
。每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时事件相差N * si
的整数倍,时间轮正是利用这个关系将定时器散列到不同的链表中。假如现在指针指向槽cs
,我们要添加一个定时事件为ti
的定时器,则该定时器将被插入槽ts
(timer slot)对应的链表中:
ts=(cs+(ti/si))%N
基于排序链表的定时器使用唯一的一条链表来管理所有定时器,所以插入操作的效率随着定时器数目的增多而降低,而时间轮使用哈希表的思想,将定时器散列到不同的链表上,这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入操作的效率基本不受定时器数目的影响。
对时间轮而言,要想提高定时精度,就要使si
足够小,要提高执行效率,就要求N足够大(N越大,散列冲突的概率就越小)。
以下代码描述了一种简单的时间轮,因为它只有一个轮子,而复杂的时间轮可能有多个轮子,不同的轮子有不同的粒度,相邻的两个轮子,精度高的转一圈,精度低的仅仅往前移动一槽,就像水表一样。
对时间轮而言,如果一共有n个定时器,则添加一个定时器的时间复杂度为O(1)
;删除一个定时器的时间复杂度平均也是O(1)
,但最坏情况下可能所有节点都在一个槽中,此时删除定时器的时间复杂度为O(n)
;执行一个定时器的时间复杂度是O(n)
,实际上执行一个定时器任务的效率要比O(n)
好得多,因为时间轮将所有定时器散列到了不同的链表上,时间轮的槽越多,等价于散列表的入口(entry)越多,从而每条链表上的定时器数量越少。此外,以上代码中只使用了1个时间轮,当使用多个轮子来实现时间轮时,执行一个定时器任务的时间复杂度将接近O(1)
。
// 时间轮定时器,用数组(环)存储每一条定时器链表
// 求hash(超时时间)决定定时器到数组的哪个位置
#ifndef TIME_WHEEL_TIMER
#define TIME_WHEEL_TIMER
#include <time.h>
#include <netinet/in.h>
#include <stdio.h>
#define BUFFER_SIZE 64
class tw_timer; // 时间轮定时器前置声明
// 绑定socket和定时器
struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
tw_timer *timer;
}
class tw_timer
{
public:
tw_timer(int rot, int ts)
:next(NULL), prev(NULL), rotation(rot), time_slot(ts){}
public:
int rotation; // 记录定时器在时间轮转多少圈才生效
int time_slot; // 记录定时器属于时间轮上的哪个slot
void (*cb_func)(client_data *);
client_data *user_data; // 用指针存储对应的用户数据
tw_timer *next;
tw_timer *prev;
}
class time_wheel
{
public:
time_wheel() : cur_slot(0)
{
for(int i = 0; i < N; ++i)
{
slots[i] = NULL; // init
}
}
~time_wheel()
{
for(int i = 0; i < N; ++i)
{
tw_timer *tmp = slots[i];
while(tmp)
{
// 重新设置链表头节点
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}
}
// 根据定时值timeout,创建一个定时器,并把它插入到合适的槽中
tw_timer* add_timer(int timeout)
{
if(timeout < 0)
{
return NULL;
}
int ticks = 0;
// 根据待插入定时器的超时值计算它将在时间轮转动多少个滴答后被触发
// 若待插入定时器的超时值小于时间轮的槽间隔,则将ticks向上取整为1
if(timeout < SI)
{
ticks = 1;
}
else
{
ticks = timeout / SI;
}
// 计算转多少圈后被触发
int rotation = ticks / N;
// 计算待插入的定时器应该被插入哪个槽中
int ts = (cur_slot + (ticks % N)) % N;
// 创建新的定时器,它在时间轮转动rotation圈之后触发,位于第ts个slot上
tw_timer *timer = new tw_timer(rotation, ts);
// 插入指定槽中的链表 头
// 第ts个slot没有任何定时器(空链表)
if(!slots[ts])
{
printf("add timer, rotation is %d, ts is %d, cur_slot is %d\n", rotation, ts, cur_slot);
}
else // 链表非空,则头插
{
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}
// 删除目标定时器
void del_timer(tw_timer *timer)
{
if(!timer)
{
return;
}
int ts = timer->time_slot;
// slots[ts] 是目标定时器所在头节点
// 如果待删定时器就是该头节点,则需要重置第ts个slot的链表头节点
if(timer == slots[ts])
{
slots[ts] = slots[ts]->next;
if(slots[ts])
{
slots[ts]->prev = NULL;
}
// 如果第ts个slot的链表就剩下一个节点,直接删除
delete timer;
}
else
{
timer->prev->next = timer->next;
if(timer->next)
{
timer->next->prev = timer->prev;
}
delete timer;
}
}
// 时间轮转动函数,每SI时间之后,cur_slot向前滚动一个slot
void tick()
{
// 时间轮上当前槽的头节点
tw_timer *tmp = slots[cur_slot];
printf("current slot is %d\n", cur_slot);
// 遍历当前slot上链表的每个定时器节点,
while(tmp)
{
printf("tick the timer once\n");
// 定时器超过1轮,跳过
if(tmp->rotation > 0)
{
tmp->rotation--;
tmp = tmp->next;
}
// 否则只要指针到当前slot,里面的所有定时器就都到时了
else // 执行定时任务,删除tmp节点
{
tmp->cb_func(tmp->user_data);
// 链表头节点!!
if(tmp = slots[cur_slot])
{
printf("delete header in cur_slot\n");
slots[cur_slot] = tmp->next; // 让tmp的下一个节点做头节点
delete tmp;
if(slots[cur_slot])
{
slots[cur_slot]->prev = tmp->prev;
}
tmp = slots[cur_slot]; // tmp为刚刚删除的节点的下一个节点
}
else // 非头节点
{
tmp->prev->next = tmp->next;
if(tmp->next)
{
tmp->next->prev = tmp->prev;
}
tw_timer *tmp2 = tmp->next;
delete tmp;
tmp = tmp2; //
}
}
}
// 时间轮转动(指针移动到下一个slot)
cur_slot = (++cur_slot) % N;
}
private:
// 时间轮上slot的数目
static const int N = 60;
// 指针每1s转动一次,即slot的间隔为1s slot interval
static const int SI = 1;
// 时间轮,每个存放定时器链表
tw_timer* slots[N];
int cur_slot; // 指针,指向当前的slot
};
复杂度分析
由于添加一个定时器是链表头插,则时间复杂度为 O ( 1 ) O(1)O(1)
删除一个定时器的时间复杂的也为O ( 1 ) O(1)O(1)
执行一个定时器的复杂度为O ( n ) O(n)O(n).但实际执行一个定时任务效率要比O ( n ) O(n)O(n)好,因为时间轮将所有定时器散列到不同的链表上。
若使用多个轮子实现时间轮,执行一个定时器任务的复杂度可以降到O ( 1 ) O(1)O(1)
时间堆
以上讨论的定时方案都是以固定频率调用心搏函数tick
,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数。设计定时器的另一种思路是:将所有定时器中超时时间最小的定时器的超时值作为心搏间隔,这样,一旦心搏函数tick
被调用,超时时间最小的定时器必然到期,我们就可在tick
函数中处理该定时器,然后,再次从剩余定时器中找出超时时间最小的一个,并将这段最小时间设为下一次心搏间隔。
最小堆很适合这种定时方案:
最小堆是每个节点的值都小于或等于其子节点的值的完全二叉树。
由于最小堆其实就是一棵树,所以其实现可以用链表,也可以用数组。用最小堆实现的定时器称为时间堆。对时间堆而言,添加一个定时器的时间复杂度是O(lgn)
;删除一个定时器的时间复杂度是O(1)
;执行一个定时器的时间复杂度是O(1)
。因此,时间堆的效率很高。
最小堆很适合这种定时方案。本文实现最小堆有以下关键特点:
- 根节点值小于其孩子节点的值(递归成立);
- 插入节点是在最后一个节点添加新节点,然后进行上滤保证最小堆特性;
- 删除节点是删除其根节点上的元素,然后把最后一个元素移动到根节点,进行下滤操作保证最小堆特性;
- 将N个元素的数组(普通二叉树)初始化为最小堆,即从二叉树最后一个非叶节点到根节点(第[ ( N − 1 ) / 2 ] [(N-1) / 2][(N−1)/2] ~ 0 个元素)执行下滤操作。
- 本文实现的最小堆底层是用数组进行存储,是一个适配器,联想C++的
priority_queue<int, vector<int>, greater<int>>
// 用最小堆存储定时器,称为时间堆
#ifndef MIN_HEAP
#define MIN_HEAP
#include <iostream>
#include <netinet/in.h>
#include <time.h>
using std::exception;
#define BUFFER_SIZE 64
class heap_timer;
// 绑定socket和定时器
struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
heap_timer *timer;
};
// 定时器类
class heap_timer
{
public:
heap_timer(int delay)
{
expire = time(NULL) + delay;
}
public:
time_t expire = expire; // 定时器生效的绝对时间
void (*cb_func)(client_data *);
client_data *user_data;
};
// 时间堆类
class time_heap
{
public:
// 初始化一个大小为cap的空堆
// throw (std::exception) 表示该函数可能抛出std::exception 类型的异常
time_heap(int cap) throw (std::exception) : capacity(cap), cur_size(0)
{
array = new heap_timer*[capacity];
if(!array)
{
throw std::exception();
}
else
{
for(int i = 0; i < capacity; ++i)
{
array[i] = NULL;
}
}
}
// 用已有的堆数组初始化堆
time_heap(heap_timer **init_array, int size, int capacity) throw (std::exception) : cur_size(size), capacity(capacity)
{
if(capacity < size)
{
throw std::exception();
}
array = new heap_timer*[capacity];
if(!array)
{
throw std::exception();
}
for(int i = 0; i < capacity; ++i)
{
array[i] = NULL;
}
if(size != 0)
{
// 初始化数组
for(int i = 0; i < size; ++i)
{
array[i] = init_array[i];
}
// 最后一个非叶子节点到根节点调堆(下滤)
for(int i = (cur_size - 1); i >= 0; --i)
{
percolate_down(i);
}
}
}
~time_heap()
{
for(int i = 0; i < cur_size; ++i)
{
delete array[i];
}
delete []array;
}
public:
// 堆添加节点,上滤
void add_timer(heap_timer *timer) throw (std::exception)
{
if(!timer)
{
return;
}
if(cur_size >= capacity) // 容量不足,堆指针数组需要扩充一倍
{
resize();
}
// 新插入了一个元素,在堆最后插入,然后调堆
int hole = cur_size++;
int parent = 0;
// 上滤操作
for(; hole > 0; hole = parent) // hole = parent使得最终结果位置上移
{
parent = (hole - 1) / 2; // hole节点的父节点计算
if(array[parent]->expire <= timer->expire)
{
// 父节点小于插入的节点,满足小根堆要求,直接结束
break;
}
array[hole] = array[parent]; // 父节点节点下移
}
array[hole] = timer;
}
void del_timer(heap_timer *timer)
{
if(!timer)
{
return;
}
// 仅仅将目标定时器的回调函数设置为空,即延迟销毁
// 这将节省真正删除该定时器的开销,但易使堆数指针组膨胀
timer->cb_func = NULL;
}
// 获取堆顶部的定时器,expire最小者
heap_timer* top() const
{
if(empty())
{
return NULL;
}
return array[0];
}
// 删除堆顶部的定时器
void pop_timer()
{
if(empty())
{
return ;
}
if(array[0])
{
delete array[0];
// 将原来堆顶元素用堆的最后一个元素临时填充,然后下滤
array[0] = array[--cur_size];
percolate_down(0);
}
}
// 定时处理函数
void tick()
{
heap_timer *tmp = array[0];
time_t cur = time(NULL); // 循环遍历堆中每个定时器(堆用数组实现,故数组遍历),处理到期的定时器
while(!empty())
{
if(!tmp)
{
break;
}
// 如果堆顶定时器没到期,则退出循环,因为堆顶定时器到时时间使最近的,其他更晚
if(tmp->expire > cur)
{
break;
}
if(array[0]->cb_func)
{
array[0]->cb_func(array[0]->user_data);
}
// 将堆顶元素删除,同时让tmp指向新的堆顶
pop_timer();
tmp = array[0];
}
}
bool empty() const
{
return cur_size == 0;
}
private:
// 下面两个函数是被其他成员函数调用,不对外提供
// 最小堆的下滤操作,确保数组中以第hole个节点作为根的子树满足最小堆性质
void percolate_down(int hole)
{
heap_timer *temp = array[hole];
int child = 0;
// hole * 2 + 1为hole的左孩子
for(; (hole * 2 + 1) <= (cur_size - 1); hole = child) // hole = child是一个下滤的动作
{
child = hole * 2 + 1; // 左孩子
// 要选择expire小的孩子进行比较
if((child < (cur_size - 1)) && (array[child + 1]->expire < array[child]->expire))
{
child++;
}
if(array[child]->expire < temp->expire) // 下滤
{
array[hole] = array[child];
}
else
{
break;
}
}
array[hole] = temp;
}
// 将堆数组容量扩大一倍
void resize() throw (std::exception)
{
heap_timer **temp = new heap_timer*[2 * capacity];
for(int i = 0; i < 2 * capacity; ++i)
{
temp[i] = NULL;
}
if(!temp)
{
throw std::exception();
}
capacity = 2 * capacity;
// 把原来数组的内容拷贝到新的数组
for(int i = 0; i < cur_size; ++i)
{
temp[i] = array[i];
}
delete []array;
array = temp;
}
private:
heap_timer **array; // 定时器指针数组
int capacity;
int cur_size;
};
#endif
复杂度分析
- 对时间堆而言,添加一个定时器的时间复杂度为O ( l o g n ) O(logn)O(log**n)(由于需要上滤操作)
- 删除一个定时器的时间复杂度为O ( 1 ) O(1)O(1),这是因为只是将目标定时器的回调函数设置为空
- 执行一个定时器的时间复杂度为O ( 1 ) O(1)O(1)