【Linux网络编程】第五弹---构建基于UDP的简单聊天室系统:代码结构与功能模块详解
✨个人主页: 熬夜学编程的小林
💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】【Linux网络编程】
目录
1、UdpServer.hpp
1.1、UdpServer类基本结构
1.2、构造函数
1.3、Start()
2、Route.hpp
2.1、Route类基本结构
2.2、构造析构函数
2.3、Forward()
2.3.1、CheckOnlineUser()
2.3.2、Offline()
2.3.3、ForwardHelper()
3、UdpServerMain.cc
4、UdpClientMain.cc
4.1、主函数
4.2、InitClient()
4.3、RecvMessage()
4.4、SendMessage()
5、补充参考内容
5.1、地址转换函数
5.2、关于 inet_ntoa
6、完整代码
6.1、InetAddr.hpp
6.2、LockGuard.hpp
6.3、Log.hpp
6.4、Makefile
6.5、nocopy
6.6、Route.hpp
6.7、Thread.hpp
6.8、ThreadPool.hpp
6.9、UdpClientMain.cc
6.10、UdpServer.hpp
6.11、UdpServerMain.cc
6.12、Test.c
前面一弹实现了翻译版本的UDP网络通信,此弹实现一个多线程版本聊天室的网络通信(即将消息转发给所以用户),此处先拷贝第一版本通信的代码,即第三弹的代码!
1、UdpServer.hpp
UdpServer类的启动函数是实现执行聊天室的功能,因此需要加执行函数的成员变量并适当修改构造函数,启动函数!!
1.1、UdpServer类基本结构
UdpServer类基本结构没变,只是添加了一个执行方法!
using server_t = std::function<void(int,const std::string message,InetAddr& who)>;
// UdpServer user("192.1.1.1",8899)
// 一般服务器主要是用来进行网络数据读取和写入的,即IO的
// 服务器 IO逻辑 和 业务逻辑 解耦
class UdpServer : public nocopy
{
public:
UdpServer(server_t func, uint16_t localport = glocalport);
void InitServer();
void Start();
~UdpServer();
private:
int _sockfd; // 读写都用同一个sockfd,说明:udp是全双工通信的
uint16_t _localport; // 端口号
bool _isrunning;
server_t _func; // 执行方法
};
1.2、构造函数
构造函数只需加一个函数对象的初始化!
UdpServer(server_t func, uint16_t localport = glocalport)
: _func(func), _sockfd(gsockfd), _localport(localport), _isrunning(false)
{
}
1.3、Start()
启动函数原本只是接收并发送消息,现在改为接收消息和执行转发消息的功能!
void Start()
{
_isrunning = true;
char message[1024];
while (_isrunning)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 收消息,返回值: 实际收到多少字节
ssize_t n = recvfrom(_sockfd, message, sizeof(message) - 1, 0, (struct sockaddr *)&peer, &len);
if (n > 0)
{
InetAddr addr(peer);
message[n] = 0;
LOG(DEBUG,"[%s]# %s\n",addr.AddrStr().c_str(),message);
_func(_sockfd,message,addr); // 转发消息
LOG(DEBUG,"return udpserver\n");
}
else
{
std::cout << "recvfrom ,error " << std::endl;
}
}
}
2、Route.hpp
Route类 对转发消息功能的封装!
2.1、Route类基本结构
Route类 成员变量是一个InetAddr类型的vector容器,但是此处为多线程版本,可能造成数据不一致问题,因此需要加锁,内部实现转发消息的函数!
using task_t = std::function<void()>;
class Route
{
public:
Route();
// 检查用户是否在 在线列表中
void CheckOnlineUser(InetAddr& who);
// for test(移除用户)
void Offline(InetAddr& who);
// 转发消息函数具体实现
void ForwardHelper(int sockfd,const std::string& message,InetAddr who);
// 转发
void Forward(int sockfd,const std::string& message,InetAddr& who);
~Route();
private:
std::vector<InetAddr> _online_user; // 用户在线列表
pthread_mutex_t _mutex;
};
2.2、构造析构函数
Route类的构造函数初始化互斥锁,析构函数释放互斥锁!
Route()
{
pthread_mutex_init(&_mutex,nullptr);
}
~Route()
{
pthread_mutex_destroy(&_mutex);
}
2.3、Forward()
转发函数有两大步骤:
1、检查用户是否在 在线列表中,在则忽略,不在则添加到 _online_user中
- 1.1、如果转发的消息为 QUIT 或者 Q 则将用户移除 在线列表
2、执行转发消息函数
// 转发
void Forward(int sockfd,const std::string& message,InetAddr& who)
{
// 1.该用户是否在 在线用户列表中呢?如果在忽略;如果不在,自动添加到_online_user
CheckOnlineUser(who);
// 1.1 message == "QUIT" "Q"
if(message == "QUIT" || message == "Q")
{
Offline(who);
}
// 2.who一定在_online_user里面
// ForwardHelper(sockfd,message);
task_t t = std::bind(&Route::ForwardHelper,this,sockfd,message,who);
ThreadPool<task_t>::GetInstance()->Equeue(t);
}
2.3.1、CheckOnlineUser()
检查用户是否在 在线列表中!
// 检查用户是否在 在线列表中
void CheckOnlineUser(InetAddr& who)
{
LockGuard lockguard(&_mutex);
for(auto& user : _online_user)
{
// 需要重载
if(user == who)
{
LOG(DEBUG,"[%s] is exists\n",who.AddrStr().c_str());
return;
}
}
LOG(DEBUG,"[%s] is not exists,add it\n",who.AddrStr().c_str());
_online_user.push_back(who);
}
1、用户的比较需要在InetAddr类中实现等于的重载函数!
用户的IP 和 端口都相等表示两个用户相等!
bool operator==(const InetAddr& addr)
{
return (this->_ip == addr._ip && this->_port == addr._port);
}
2、用户的AddrStr()函数也需要实现
AddrStr()函数打印IP和端口,并使用冒号(:)拼接!
std::string AddrStr()
{
return _ip + ":" + std::to_string(_port);
}
2.3.2、Offline()
移除用户!
注意:vector容器删除成员需要使用迭代器!
void Offline(InetAddr& who)
{
LockGuard lockguard(&_mutex);
// 删除需要迭代器
auto iter = _online_user.begin();
for(;iter != _online_user.end();iter++)
{
if(*iter == who)
{
LOG(DEBUG,"[%s] is offline\n",who.AddrStr().c_str());
_online_user.erase(iter);
break;
}
}
}
2.3.3、ForwardHelper()
转发消息函数具体实现[为了知道是谁发送的消息,此处打印的消息包含IP 和 端口]!
// 转发消息函数具体实现
void ForwardHelper(int sockfd,const std::string& message,InetAddr who)
{
LockGuard lockguard(&_mutex);
std::string send_message = "[" + who.AddrStr() + "]# " + message;
for(auto& user : _online_user)
{
struct sockaddr_in peer = user.Addr();
// 无IP和端口
// LOG(DEBUG,"Forward message to %s,message is %s\n",user.AddrStr().c_str(),message.c_str());
// ::sendto(sockfd,message.c_str(),message.size(),0,(struct sockaddr*)&peer,sizeof(peer));
LOG(DEBUG,"Forward message to %s,message is %s\n",user.AddrStr().c_str(),send_message.c_str());
::sendto(sockfd,send_message.c_str(),send_message.size(),0,(struct sockaddr*)&peer,sizeof(peer));
}
}
3、UdpServerMain.cc
UdpServerMain.cc 调用服务端的执行函数!!!
// .udp_client local-port
// .udp_client 8888
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " server-port" << std::endl;
exit(0);
}
uint16_t port = std::stoi(argv[1]);
EnableScreen();
Route messageRoute;
server_t message_route = std::bind(&Route::Forward, &messageRoute,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
std::unique_ptr<UdpServer> usvr = std::make_unique<UdpServer>(message_route,port); // C++14标准
usvr->InitServer();
usvr->Start();
return 0;
}
运行结果(可能出现的问题)
因为博主前面进行过测试,可能会出现这样的情况!
运行结果(正常)
此处确实能够进行多个客户端通信,但是收到的消息还是有点乱码(因为此处的客户端是单进程单线程的程序,而服务端是多线程的),因此需要修改客户端的主函数代码!
4、UdpClientMain.cc
客户端主函数实现成两个线程版本,一个线程用户发消息,一个线程用于收消息!
4.1、主函数
主函数通过命令行参数获取IP和端口,然后创建套接字,再创建两个线程,最后启动并回收线程!!
int main(int argc, char *argv[])
{
if (argc != 3)
{
std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;
exit(0);
}
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
int sockfd = InitClient();
Thread recver("recver-thread", std::bind(&RecvMessage, sockfd, std::placeholders::_1));
Thread sender("sender-thread", std::bind(&SendMessage, sockfd, serverip, serverport, std::placeholders::_1));
recver.Start();
sender.Start();
recver.Join();
sender.Join();
::close(sockfd);
return 0;
}
4.2、InitClient()
创建套接字,创建成功返回文件描述符!
int InitClient()
{
int sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0)
{
std::cerr << "create socket eror\n"
<< std::endl;
exit(1);
}
return sockfd;
}
4.3、RecvMessage()
收消息函数,如果接收消息成功向标准错误流打印结果(以便于后面测试)!
void RecvMessage(int sockfd, const std::string &name)
{
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
char buffer[1024];
int n = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);
if (n > 0)
{
buffer[n] = 0;
std::cerr << buffer << std::endl;
}
else
{
std::cerr << "recvfrom error" << std::endl;
break;
}
}
}
4.4、SendMessage()
向指定网络进程发送消息!
void SendMessage(int sockfd, std::string serverip, uint16_t severport, const std::string &name)
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(severport); // 转换重要!!!
server.sin_addr.s_addr = inet_addr(serverip.c_str());
std::string cli_profix = name + "# "; // sender-thread#
while (true)
{
std::string line;
std::cout << cli_profix;
std::getline(std::cin, line);
// 发消息,你要知道发送给谁
int n = sendto(sockfd, line.c_str(), line.size(), 0, (struct sockaddr *)&server, sizeof(server));
if (n <= 0)
break;
}
}
5、补充参考内容
5.1、地址转换函数
- 本节只介绍基于 IPv4 的 socket 网络编程,sockaddr_in 中的成员 struct in_addr sin_addr 表示 32 位 的 IP 地址
- 但是我们通常用点分十进制的字符串表示 IP 地址,以下函数可以在字符串表示 和 in_addr 表示之间转换;
字符串转 in_addr 的函数:
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int inet_aton(const char *cp, struct in_addr *inp);
in_addr_t inet_addr(const char *cp);
int inet_pton(int af, const char *src, void *dst);
in_addr 转字符串的函数:
#include <arpa/inet.h>
char *inet_ntoa(struct in_addr in);
const char *inet_ntop(int af, const void *src,char *dst, socklen_t size);
其中 inet_pton 和 inet_ntop 不仅可以转换 IPv4 的 in_addr,还可以转换 IPv6 的 in6_addr,因此函数接口是 void *addrptr。
代码示例:
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int main()
{
struct sockaddr_in addr;
inet_aton("127.0.0.1",&addr.sin_addr);
uint32_t* ptr = (uint32_t*)&addr.sin_addr;
printf("addr: %x\n",*ptr);
printf("addr_str: %s\n",inet_ntoa(addr.sin_addr));
return 0;
}
5.2、关于 inet_ntoa
inet_ntoa 这个函数返回了一个 char*, 很显然是这个函数自己在内部为我们申请了一块内存来保存 ip 的结果. 那么是否需要调用者手动释放呢?
The inet_ntoa() function converts the Internet host address in,
given in network byte order, to a string in IPv4 dotted-decimal notation.
The string is returned in a statically allocated buffer, which
subsequent calls will overwrite.
man 手册上说, inet_ntoa 函数, 是把这个返回结果放到了静态存储区. 这个时候不需要我们手动进行释放.
那么问题来了, 如果我们调用多次这个函数, 会有什么样的效果呢? 参见如下代码:
int main()
{
struct sockaddr_in addr1;
struct sockaddr_in addr2;
addr1.sin_addr.s_addr = 0;
addr2.sin_addr.s_addr = 0xffffffff;
char* ptr1 = inet_ntoa(addr1.sin_addr);
char* ptr2 = inet_ntoa(addr2.sin_addr);
printf("ptr1: %s,ptr2: %s\n",ptr1,ptr2);
return 0;
}
运行结果如下:
因为 inet_ntoa 把结果放到自己内部的一个静态存储区, 这样第二次调用时的结果会覆盖掉上一次的结果.
思考: 如果有多个线程调用 inet_ntoa, 是否会出现异常情况呢?
- 在 APUE 中, 明确提出 inet_ntoa 不是线程安全的函数;
- 但是在 centos7 上测试, 并没有出现问题, 可能内部的实现加了互斥锁;
- uu们课后自己写程序验证一下在自己的机器上 inet_ntoa 是否会出现多线程的问题;
- 在多线程环境下, 推荐使用 inet_ntop, 这个函数由调用者提供一个缓冲区保存结果, 可以规避线程安全问题;
6、完整代码
6.1、InetAddr.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
class InetAddr
{
private:
// 网络地址转本地地址
void ToHost(const struct sockaddr_in& addr)
{
_port = ntohs(addr.sin_port); // 网络转主机
// _ip = inet_ntoa(addr.sin_addr); // 结构化转字符串
char ip_buf[32];
// inet_p to n
// p: process
// n: net
// int inet_pton(int af, const char *src, void *dst);
// inet_pton(AF_INET,ip.c_str(),&addr.sin_addr.s_addr);
::inet_ntop(AF_INET,&addr.sin_addr,ip_buf,sizeof(ip_buf)); // 安全转化
_ip = ip_buf;
}
public:
InetAddr(const struct sockaddr_in& addr):_addr(addr)
{
ToHost(addr);
}
bool operator==(const InetAddr& addr)
{
return (this->_ip == addr._ip && this->_port == addr._port);
}
std::string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
struct sockaddr_in Addr()
{
return _addr;
}
std::string AddrStr()
{
return _ip + ":" + std::to_string(_port);
}
~InetAddr()
{}
private:
std::string _ip;
uint16_t _port;
struct sockaddr_in _addr;
};
6.2、LockGuard.hpp
#pragma once
#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t* _mutex;
};
6.3、Log.hpp
#pragma once
#include <iostream>
#include <fstream>
#include <ctime>
#include <cstring>
#include <sys/types.h>
#include <unistd.h>
#include <stdarg.h>
#include <pthread.h>
#include "LockGuard.hpp"
namespace log_ns
{
// 日志等级
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOW";
}
}
std::string GetCurrTime()
{
time_t now = time(nullptr); // 时间戳
struct tm *curr_time = localtime(&now);
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
curr_time->tm_year + 1900,
curr_time->tm_mon + 1,
curr_time->tm_mday,
curr_time->tm_hour,
curr_time->tm_min,
curr_time->tm_sec);
return buffer;
}
class logmessage
{
public:
std::string _level; // 日志等级
pid_t _id; // pid
std::string _filename; // 文件名
int _filenumber; // 文件行号
std::string _curr_time; // 当前时间
std::string _message_info; // 日志内容
};
#define SCREEN_TYPE 1
#define FILE_TYPE 2
const std::string glogfile = "./log.txt";
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
// log.logMessage(""/*文件名*/,12/*文件行号*/,INFO/*日志等级*/,"this is a %d message,%f,%s,hello world"/*日志内容*/,x,,);
class Log
{
public:
// 默认向显示器打印
Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE)
{
}
// 打印方式
void Enable(int type)
{
_type = type;
}
// 向屏幕打印
void FlushToScreen(const logmessage &lg)
{
printf("[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
}
// 向文件打印
void FlushToFile(const logmessage &lg)
{
std::ofstream out(_logfile, std::ios::app); // 追加打开文件
if (!out.is_open())
return; // 打开失败直接返回
char logtxt[2048];
snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
out.write(logtxt, strlen(logtxt)); // 写文件
out.close(); // 关闭文件
}
// 刷新日志
void FlushLog(const logmessage &lg)
{
// 加过滤逻辑 --- TODO
// ...
LockGuard lockguard(&glock); // RAII锁
switch (_type)
{
case SCREEN_TYPE:
FlushToScreen(lg);
break;
case FILE_TYPE:
FlushToFile(lg);
break;
}
}
// ... 可变参数(C语言)
// 初始化日志信息
void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
{
logmessage lg;
lg._level = LevelToString(level);
lg._id = getpid();
lg._filename = filename;
lg._filenumber = filenumber;
lg._curr_time = GetCurrTime();
va_list ap; // va_list-> char*指针
va_start(ap, format); // 初始化一个va_list类型的变量
char log_info[1024];
vsnprintf(log_info, sizeof(log_info), format, ap);
va_end(ap); // 释放由va_start宏初始化的va_list资源
lg._message_info = log_info;
// 日志打印出来(显示器/文件)
FlushLog(lg);
}
~Log()
{
}
private:
int _type; // 打印方式
std::string _logfile; // 文件名
};
Log lg;
// 打印日志封装成宏,使用函数方式调用
#define LOG(Level, Format, ...) \
do \
{ \
lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
} while (0)
// 设置打印方式,使用函数方式调用
#define EnableScreen() \
do \
{ \
lg.Enable(SCREEN_TYPE); \
} while (0)
// 设置打印方式,使用函数方式调用
#define EnableFile() \
do \
{ \
lg.Enable(FILE_TYPE); \
} while (0)
}
6.4、Makefile
.PHONY:all
all:udpserver udpclient
udpserver:UdpServerMain.cc
g++ -o $@ $^ -std=c++14
udpclient:UdpClientMain.cc
g++ -o $@ $^ -std=c++14
.PHONY:clean
clean:
rm -rf udpserver udpclient
6.5、nocopy
#pragma once
class nocopy
{
public:
nocopy(){}
~nocopy(){}
nocopy(const nocopy&) = delete;
const nocopy& operator=(const nocopy&) = delete;
};
6.6、Route.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include "InetAddr.hpp"
#include "ThreadPool.hpp"
#include "LockGuard.hpp"
using task_t = std::function<void()>;
class Route
{
public:
Route()
{
pthread_mutex_init(&_mutex,nullptr);
}
// 检查用户是否在 在线列表中
void CheckOnlineUser(InetAddr& who)
{
LockGuard lockguard(&_mutex);
for(auto& user : _online_user)
{
// 需要重载
if(user == who)
{
LOG(DEBUG,"[%s] is exists\n",who.AddrStr().c_str());
return;
}
}
LOG(DEBUG,"[%s] is not exists,add it\n",who.AddrStr().c_str());
_online_user.push_back(who);
}
// for test(移除用户)
void Offline(InetAddr& who)
{
LockGuard lockguard(&_mutex);
// 删除需要迭代器
auto iter = _online_user.begin();
for(;iter != _online_user.end();iter++)
{
if(*iter == who)
{
LOG(DEBUG,"[%s] is offline\n",who.AddrStr().c_str());
_online_user.erase(iter);
break;
}
}
}
// 转发消息函数具体实现
void ForwardHelper(int sockfd,const std::string& message,InetAddr who)
{
LockGuard lockguard(&_mutex);
std::string send_message = "[" + who.AddrStr() + "]# " + message;
for(auto& user : _online_user)
{
struct sockaddr_in peer = user.Addr();
// 无IP和端口
// LOG(DEBUG,"Forward message to %s,message is %s\n",user.AddrStr().c_str(),message.c_str());
// ::sendto(sockfd,message.c_str(),message.size(),0,(struct sockaddr*)&peer,sizeof(peer));
LOG(DEBUG,"Forward message to %s,message is %s\n",user.AddrStr().c_str(),send_message.c_str());
::sendto(sockfd,send_message.c_str(),send_message.size(),0,(struct sockaddr*)&peer,sizeof(peer));
}
}
// 转发
void Forward(int sockfd,const std::string& message,InetAddr& who)
{
// 1.该用户是否在 在线用户列表中呢?如果在忽略;如果不在,自动添加到_online_user
CheckOnlineUser(who);
// 1.1 message == "QUIT" "Q"
if(message == "QUIT" || message == "Q")
{
Offline(who);
}
// 2.who一定在_online_user里面
// ForwardHelper(sockfd,message);
task_t t = std::bind(&Route::ForwardHelper,this,sockfd,message,who);
ThreadPool<task_t>::GetInstance()->Equeue(t);
}
~Route()
{
pthread_mutex_destroy(&_mutex);
}
private:
std::vector<InetAddr> _online_user;
pthread_mutex_t _mutex;
};
6.7、Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
namespace ThreadMoudle
{
// typedef std::function<void()> func_t;
using func_t = std::function<void(const std::string&)>;
class Thread
{
public:
void Excute()
{
// std::cout << _name << " is running" << std::endl;
_isrunning = true;
_func(_name);
_isrunning = false;
}
public:
Thread(const std::string& name,func_t func):_name(name),_func(func)
{}
// 新线程执行该方法
static void* ThreadRoutine(void* args)
{
Thread* self = static_cast<Thread*>(args);
self->Excute();
return nullptr;
}
std::string Status()
{
if(_isrunning)
return "running";
else
return "sleep";
}
bool Start()
{
// ::使用库函数接口,直接使用ThreadRoutine会报错,因为成员函数有隐含this指针 + static
int n = ::pthread_create(&_tid,nullptr,ThreadRoutine,this);
if(n != 0) return false;
return true;
}
void Stop()
{
if(_isrunning)
{
::pthread_cancel(_tid);
_isrunning = false;
}
}
std::string Name()
{
return _name;
}
void Join()
{
::pthread_join(_tid,nullptr);
}
~Thread()
{}
private:
std::string _name;
pthread_t _tid;
bool _isrunning;
func_t _func; // 线程要执行的回调函数
};
}
6.8、ThreadPool.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <vector>
#include <string>
#include <queue>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"
using namespace ThreadMoudle;
using namespace log_ns;
static const int gdefaultnum = 5; // 默认创建5个线程
void test()
{
while (true)
{
std::cout << "hello world" << std::endl;
sleep(1);
}
}
template <typename T>
class ThreadPool
{
private:
// 任务队列加锁
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
// 唤醒一个线程
void Wakeup()
{
pthread_cond_signal(&_cond);
}
// 唤醒所有线程
void WakeupAll()
{
pthread_cond_broadcast(&_cond);
}
// 休眠
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsEmpty()
{
return _task_queue.empty();
}
// 处理任务,从任务队列取任务,加锁
void HandlerTask(const std::string &name)
{
while (true)
{
// 1.取任务
LockQueue();
// 队列为空且运行则休眠
while (IsEmpty() && _isrunning) // if?
{
_sleep_thread_num++; // 防止阻塞,因为一开始休眠线程数为0
LOG(INFO, "%s thread sleep begin!\n", name.c_str());
Sleep();
LOG(INFO, "%s thread wakeup!\n", name.c_str());
_sleep_thread_num--;
}
// 判定一种情况,为空且不运行则退出
if (IsEmpty() && !_isrunning)
{
LOG(INFO, "%s thread quit!\n", name.c_str());
UnlockQueue();
break;
}
// 有任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
// 2.处理任务
t(); // 处理任务(只属于自己线程),此处不用/不能在临界区处理
}
}
// 单例模式构造函数私有
ThreadPool(int thread_num = gdefaultnum)
: _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
void Init()
{
func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1); // 绑定
for (int i = 0; i < _thread_num; i++)
{
std::string threadname = "thread-" + std::to_string(i + 1);
_threads.emplace_back(threadname, func);
LOG(DEBUG, "construct thread %s done,init success\n", threadname.c_str());
}
}
void Start()
{
_isrunning = true;
for (auto &thread : _threads)
{
LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
thread.Start();
}
}
ThreadPool(const ThreadPool<T> &) = delete; // 禁止拷贝构造
void operator=(const ThreadPool<T> &) = delete; // 禁止赋值重载
public:
void Stop()
{
LockQueue();
_isrunning = false;
WakeupAll(); // 防止有线程在等待
UnlockQueue();
LOG(INFO, "thread pool stop done!\n");
}
// 如果是多线程获取单例呢?
static ThreadPool<T> *GetInstance()
{
// 只有第一次需要加锁,减少锁的竞争
if (_tp == nullptr)
{
LockGuard lockguard(&_sig_mutex);
if (_tp == nullptr)
{
LOG(INFO, "create thread pool\n");
_tp = new ThreadPool<T>();
_tp->Init();
_tp->Start();
}
else
{
LOG(INFO, "get thread pool\n");
}
}
return _tp;
}
void Equeue(const T &in)
{
LockQueue();
// 防止false情况还一直在入队列
if (_isrunning)
{
_task_queue.push(in);
// 唤醒线程,有休眠的就唤醒
if (_sleep_thread_num > 0)
Wakeup();
}
UnlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _thread_num;
std::vector<Thread> _threads;
std::queue<T> _task_queue; // 任务队列,共享资源,需要保护
bool _isrunning;
int _sleep_thread_num; // 休眠线程个数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
// 单例模式
static ThreadPool<T> *_tp;
static pthread_mutex_t _sig_mutex;
};
// 静态成员类外初始化
template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
6.9、UdpClientMain.cc
#include "UdpServer.hpp"
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Thread.hpp"
using namespace ThreadMoudle;
int InitClient()
{
int sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0)
{
std::cerr << "create socket eror\n"
<< std::endl;
exit(1);
}
return sockfd;
}
void RecvMessage(int sockfd, const std::string &name)
{
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
char buffer[1024];
int n = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);
if (n > 0)
{
buffer[n] = 0;
std::cerr << buffer << std::endl;
}
else
{
std::cerr << "recvfrom error" << std::endl;
break;
}
}
}
void SendMessage(int sockfd, std::string serverip, uint16_t severport, const std::string &name)
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(severport); // 转换重要!!!
server.sin_addr.s_addr = inet_addr(serverip.c_str());
std::string cli_profix = name + "# "; // sender-thread#
while (true)
{
std::string line;
std::cout << cli_profix;
std::getline(std::cin, line);
// 发消息,你要知道发送给谁
int n = sendto(sockfd, line.c_str(), line.size(), 0, (struct sockaddr *)&server, sizeof(server));
if (n <= 0)
break;
}
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;
exit(0);
}
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
int sockfd = InitClient();
Thread recver("recver-thread", std::bind(&RecvMessage, sockfd, std::placeholders::_1));
Thread sender("sender-thread", std::bind(&SendMessage, sockfd, serverip, serverport, std::placeholders::_1));
recver.Start();
sender.Start();
recver.Join();
sender.Join();
::close(sockfd);
return 0;
}
// 单进程单线程版本(有问题!!!)
// 客户端在未来一定要知道服务器的IP地址和端口号
// .udp_client server-ip server-port
// .udp_client 127.0.0.1 8888
// int main(int argc, char *argv[])
// {
// if (argc != 3)
// {
// std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;
// exit(0);
// }
// // 0.读取接收端IP和端口
// std::string serverip = argv[1];
// uint16_t serverport = std::stoi(argv[2]);
// // 1.创建套接字
// int sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
// if (sockfd < 0)
// {
// std::cerr << "create socket eror\n"
// << std::endl;
// exit(1);
// }
// // 2.设置接收端信息
// struct sockaddr_in server;
// memset(&server, 0, sizeof(server));
// server.sin_family = AF_INET;
// server.sin_port = htons(serverport); // 转换重要!!!
// server.sin_addr.s_addr = inet_addr(serverip.c_str());
// // 3.发消息和接收消息
// while (true)
// {
// std::string line;
// std::cout << "Please Enter# ";
// std::getline(std::cin, line); // 以行读取消息
// // 发消息,你要知道发送给谁
// int n = sendto(sockfd, line.c_str(), line.size(), 0, (struct sockaddr *)&server, sizeof(server));
// if (n > 0)
// {
// // 收消息
// struct sockaddr_in temp;
// socklen_t len = sizeof(temp);
// char buffer[1024];
// int m = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&temp, &len);
// if (m > 0)
// {
// buffer[m] = 0;
// std::cout << buffer << std::endl;
// }
// else
// {
// break;
// }
// }
// else
// {
// break;
// }
// }
// // 4.关闭套接字
// ::close(sockfd);
// return 0;
// }
6.10、UdpServer.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "nocopy.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
using namespace log_ns;
static const int gsockfd = -1;
static const uint16_t glocalport = 8888;
enum
{
SOCKET_ERROR = 1,
BIND_ERROR
};
using server_t = std::function<void(int,const std::string message,InetAddr& who)>;
// UdpServer user("192.1.1.1",8899)
// 一般服务器主要是用来进行网络数据读取和写入的,即IO的
// 服务器 IO逻辑 和 业务逻辑 解耦
class UdpServer : public nocopy
{
public:
UdpServer(server_t func, uint16_t localport = glocalport)
: _func(func), _sockfd(gsockfd), _localport(localport), _isrunning(false)
{
}
void InitServer()
{
// 1.创建socket文件
_sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket error\n");
exit(SOCKET_ERROR);
}
LOG(DEBUG, "socket create success,sockfd: %d\n", _sockfd); // 3
// 2.bind
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_localport); // 主机序列转网络序列
// local.sin_addr.s_addr = inet_addr(_localip.c_str()); // 1.需要4字节ip 2.需要网络序列ip
local.sin_addr.s_addr = INADDR_ANY; // 服务器端,进行任意IP地址绑定[0]
int n = ::bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
if (n < 0)
{
LOG(FATAL, "bind error\n");
exit(BIND_ERROR);
}
LOG(DEBUG, "socket bind success\n");
}
void Start()
{
_isrunning = true;
char message[1024];
while (_isrunning)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 收消息,返回值: 实际收到多少字节
ssize_t n = recvfrom(_sockfd, message, sizeof(message) - 1, 0, (struct sockaddr *)&peer, &len);
if (n > 0)
{
InetAddr addr(peer);
message[n] = 0;
LOG(DEBUG,"[%s]# %s\n",addr.AddrStr().c_str(),message);
_func(_sockfd,message,addr); // 转发消息
LOG(DEBUG,"return udpserver\n");
}
else
{
std::cout << "recvfrom ,error " << std::endl;
}
}
}
~UdpServer()
{
// 关闭文件
if(_sockfd < gsockfd) ::close(_sockfd);
}
private:
int _sockfd; // 读写都用同一个sockfd,说明:udp是全双工通信的
uint16_t _localport; // 端口号
bool _isrunning;
server_t _func; // 执行方法
};
6.11、UdpServerMain.cc
#include "UdpServer.hpp"
#include "Route.hpp"
#include <memory>
// .udp_client local-port
// .udp_client 8888
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " server-port" << std::endl;
exit(0);
}
uint16_t port = std::stoi(argv[1]);
EnableScreen();
Route messageRoute;
server_t message_route = std::bind(&Route::Forward, &messageRoute,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
std::unique_ptr<UdpServer> usvr = std::make_unique<UdpServer>(message_route,port); // C++14标准
usvr->InitServer();
usvr->Start();
return 0;
}
6.12、Test.c
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int main()
{
struct sockaddr_in addr1;
struct sockaddr_in addr2;
addr1.sin_addr.s_addr = 0;
addr2.sin_addr.s_addr = 0xffffffff;
char* ptr1 = inet_ntoa(addr1.sin_addr);
char* ptr2 = inet_ntoa(addr2.sin_addr);
printf("ptr1: %s,ptr2: %s\n",ptr1,ptr2);
return 0;
}
// 测试一
// int main()
// {
// struct sockaddr_in addr;
// inet_aton("127.0.0.1",&addr.sin_addr);
// uint32_t* ptr = (uint32_t*)&addr.sin_addr;
// printf("addr: %x\n",*ptr);
// printf("addr_str: %s\n",inet_ntoa(addr.sin_addr));
// return 0;
// }