Linux网络:网络套接字-TCP回显服务器——多进程/线程池(生产者消费者模型)
1.多进程版本
这里选择创建子进程,让子进程再创建子进程。父进程等待子进程,子进程的子进程处理业务逻辑。因为子进程是直接退出,子进程的子进程变成孤儿进程被系统管理,所以父进程在等待的时候不是阻塞等待的,所以可以处理多条链接请求。或者可以忽略SIGCHID信号让子进程不需要等待。
服务端
pragma once
#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<string.h>
#include<fcntl.h>
#include<signal.h>
#include<sys/wait.h>
#define DEF_PORT 8081
class TcpSever
{
private:
int port;
int listen_sock;//监听套接字
public:
TcpSever(int _port=DEF_PORT):port(_port),listen_sock(-1)
{}
void InitSever()
{
listen_sock=socket(AF_INET,SOCK_STREAM,0);//IPV4流式套接
if(listen_sock<0){
std::cerr<<"监听套接字创建失败"<<std::endl;
exit(-1);
}
//绑定
struct sockaddr_in local;
memset(&local,'\0',sizeof(local));//初始化结构体
local.sin_family=AF_INET;
local.sin_port=htons(port);//主机转网络
local.sin_addr.s_addr=INADDR_ANY;//随机绑定服务器ip
if(bind(listen_sock,(struct sockaddr*)&local,sizeof(local))<0){
std::cerr<<"绑定失败"<<std::endl;
exit(-2);
}
//建立链接
#define BACK_LOG 5
if(listen(listen_sock,BACK_LOG)<0){
std::cerr<<"链接失败"<<std::endl;
exit(-3);
}
}
//获取链接
void Loop()
{
//获取客户端信息
struct sockaddr_in client;
//signal(SIGCHID,hander);//hander是信号捕捉方法
while(true)
{
socklen_t len=sizeof(client);
//获取新链接
int sock=accept(listen_sock,(struct sockaddr*)&client,&len);
if(sock<0){
std::cerr<<"接受链接失败"<<std::endl;
continue;//继续接受其他链接请求
}
//多进程处理多个客户端
pid_t id=fork();
if(id==0)//子进程
{
//子进程,与子进程的子进程不关心listen_sock
close(listen_sock);
if(fork()>0)
{
exit(0);//子进程直接退出后子进程的子进程变为孤儿进程被守护进程接管,不用等待了
}
//子进程的子进程,处理业务
std::cout<<"获取新连接成功! ip["<<inet_ntoa(client.sin_addr)<<"] port["<<ntohs(client.sin_port)<<"]"<<std::endl;//整数ip转为字符ip形式,网络字节端口转化为本地端口
//开始服务
std::string CIp=inet_ntoa(client.sin_addr);
int CPort=ntohs(client.sin_port);
Sever(sock,CIp,CPort);
exit(0);
}
//不能使用阻塞等待,可以使用信号机制忽略SIGCHID信号,或者使用两次fork函数
close(sock);//父进程不关心客户端的链接请求,请求交给子进程做。
waitpid(id,nullptr,0);//不会阻塞,因为id进程创建完子进程会立即退出,所以不会阻塞。
}
}
void Sever(int sock,std::string&ip,int port)
{
char buff[1024];
while(true)
{
memset(buff,0,sizeof(buff));
ssize_t ReadSize=read(sock,buff,sizeof(buff)-1);
if(ReadSize>0){
buff[ReadSize]='\0';//添加字符串结束标志
std::cout<<"Client["<<ip<<"-port-"<<port<<"]# "<<buff<<std::endl;
write(sock,buff,ReadSize);//向客户端写入数据
}
else if(ReadSize==0){//对端关闭链接
std::cout<<"Client["<<ip<<"-port-"<<port<<"]# 关闭"<<std::endl;
break;
}
else{
std::cerr<<sock<<"读取错误"<<std::endl;
break;
}
}
std::cout<<"服务终止"<<std::endl;
}
~TcpSever(){
if(listen_sock>=0){
close(listen_sock);
}
}
};
启动服务端
#include"Tcp_Sever.h"
// Tcp_Sever +port
void User(char*name)
{
std::cout<<name<<" + port"<<std::endl;
}
int main(int argc,char*argv[])
{
if(argc!=2){
User(argv[0]);
exit(-1);
}
TcpSever*Sever=new TcpSever(atoi(argv[1]));//将字符串形式的端口号转化为整数形式
Sever->InitSever();
Sever->Loop();
return 0;
}
客户端
#pragma once
#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<string>
#include<string.h>
class TcpClient
{
private:
std::string sev_ip;//服务器ip
int sev_port;
int sock;
public:
TcpClient(std::string _ip,int _sev_port):sev_ip(_ip),sev_port(_sev_port),sock(-1)
{}
void InitClient()
{
sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0){
std::cerr<<"创建套接字失败"<<std::endl;
exit(-1);
}
//客户端不需要绑定监听
}
//启动客户端
void Start()
{
//向服务器发送链接
//填充服务器信息
struct sockaddr_in sever;
memset(&sever,0,sizeof(sever));
sever.sin_family=AF_INET;
sever.sin_port=htons(sev_port);//服务器开放的端口
sever.sin_addr.s_addr=inet_addr(sev_ip.c_str());//服务器的ip地址
if(connect(sock,(struct sockaddr*)&sever,sizeof(sever))==0){
//与服务器交互
std::cout<<"发送交互请求成功"<<std::endl;
Request(sock);
}
else{
std::cerr<<"交互失败"<<std::endl;
}
}
void Request(int Sock)
{
std::string meg;
while(true)
{
std::cout<<"输入信息#";
std::cin>>meg;
char buff[1024];
write(Sock,meg.c_str(),meg.size());
//读取服务器发送的消息
ssize_t ReadSize=read(sock,buff,sizeof(buff)-1);
if(ReadSize>0){
buff[ReadSize]='\0';
}
std::cout<<"Sever#"<<buff<<std::endl;
}
}
~TcpClient()
{
if(sock>=0){
close(sock);
}
}
};
启动客户端
#include"Tcp_Client.h"
// ./Tcp_Client +ip +port
void User(char*name)
{
std::cout<<name<<" + sever_ip + sever_port"<<std::endl;
}
int main(int argc,char*argv[])
{
if(argc!=3){
User(argv[0]);
exit(-1);
}
TcpClient*Client=new TcpClient(argv[1],atoi(argv[2]));
Client->InitClient();
Client->Start();
return 0;
}
2.多线程版本
注意:在类内部的成员函数,调用类内非静态成员函数时会自动带this指针。而线程执行的函数要求void* Hander(void*)
类型,所以线程执行函数要用static修饰。
服务端
#pragma once
#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<string.h>
#include<fcntl.h>
#include<signal.h>
#include<sys/wait.h>
#include<pthread.h>
#define DEF_PORT 8081
//通信结构体
class Pragram{
public:
int sock;
std::string ip;
int port;
Pragram(int _sock,std::string _ip,int _port)
:sock(_sock),ip(_ip),port(_port)
{}
};
class TcpSever
{
private:
int port;
int listen_sock;//监听套接字
public:
TcpSever(int _port=DEF_PORT):port(_port),listen_sock(-1)
{}
void InitSever()
{
listen_sock=socket(AF_INET,SOCK_STREAM,0);//IPV4流式套接
if(listen_sock<0){
std::cerr<<"监听套接字创建失败"<<std::endl;
exit(-1);
}
//绑定
struct sockaddr_in local;
memset(&local,'\0',sizeof(local));//初始化结构体
local.sin_family=AF_INET;
local.sin_port=htons(port);//主机转网络
local.sin_addr.s_addr=INADDR_ANY;//随机绑定服务器ip
if(bind(listen_sock,(struct sockaddr*)&local,sizeof(local))<0){
std::cerr<<"绑定失败"<<std::endl;
exit(-2);
}
//建立链接
#define BACK_LOG 5
if(listen(listen_sock,BACK_LOG)<0){
std::cerr<<"链接失败"<<std::endl;
exit(-3);
}
std::cout<<"服务启动成功"<<std::endl;
}
//获取链接
void Loop()
{
//获取客户端信息
struct sockaddr_in client;
//signal(SIGCHID,hander);//hander是信号捕捉方法
while(true)
{
socklen_t len=sizeof(client);
//获取新链接
int sock=accept(listen_sock,(struct sockaddr*)&client,&len);
if(sock<0){
std::cerr<<"接受链接失败"<<std::endl;
continue;//继续接受其他链接请求
}
pthread_t tid;
std::string ip=inet_ntoa(client.sin_addr);
int port=ntohs(client.sin_port);
Pragram*msg=new Pragram(sock,ip,port);
pthread_create(&tid,nullptr,HanderQuest,msg);
}
}
private:
static void* HanderQuest(void*arg)
{
Pragram*msg=(Pragram*)arg;
pthread_detach(pthread_self());//线程分离
std::cout<<"获取新连接成功"<<"[ ip:"<<msg->ip<<" port:"<<msg->port<<" ]"<<std::endl;
Sever(msg->sock,msg->ip,msg->port);
close(msg->sock);
delete msg;
return nullptr;
}
static void Sever(int sock,std::string&ip,int port)
{
char buff[1024];
while(true)
{
memset(buff,0,sizeof(buff));
ssize_t ReadSize=read(sock,buff,sizeof(buff)-1);
if(ReadSize>0){
buff[ReadSize]='\0';//添加字符串结束标志
std::cout<<"Client["<<ip<<"-port-"<<port<<"]# "<<buff<<std::endl;
write(sock,buff,ReadSize);//向客户端写入数据
}
else if(ReadSize==0){//对端关闭链接
std::cout<<"Client["<<ip<<"-port-"<<port<<"]# 关闭"<<std::endl;
break;
}
else{
std::cerr<<sock<<"读取错误"<<std::endl;
break;
}
}
std::cout<<"服务终止"<<std::endl;
}
~TcpSever(){
if(listen_sock>=0){
close(listen_sock);
}
}
};
客户端代码部分不变。
3.模板线程池版本
线程池代码:
PthreadPool.h
#pragma once
#include<iostream>
#include<pthread.h>
#include<queue>
#define NUM 5
template<typename Task>
class ThreadPool
{
private:
std::queue<Task> _task_queue;//临界资源
pthread_mutex_t _lock;
int _thread_num;
pthread_cond_t _cond;
public:
ThreadPool(int Num=NUM):_thread_num(Num){
pthread_mutex_init(&_lock,nullptr);
pthread_cond_init(&_cond,nullptr);
}
bool IsEmpty(){return _task_queue.empty();}
static void*Routine(void*arg)//静态函数无隐含的this指针
{
pthread_detach(pthread_self());
ThreadPool<Task>*self=(ThreadPool<Task>*)arg;
while(true)
{
pthread_mutex_lock(&(self->_lock));
while(self->IsEmpty())//循环检测临界资源是否就绪
{
//休眠
pthread_cond_wait(&(self->_cond),&(self->_lock));
}
//任务队列有任务
Task t;
self->Pop(t);
pthread_mutex_unlock(&(self->_lock));
//解锁后处理任务
t.Run();
}
}
void Push(const Task&in)//外部添加任务
{
pthread_mutex_lock(&_lock);
_task_queue.push(in);
//唤醒一个线程
pthread_cond_signal(&_cond);
pthread_mutex_unlock(&_lock);
}
void Pop(Task&out)//线程获取任务
{
out=_task_queue.front();
_task_queue.pop();
}
~ThreadPool(){
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
void InitThreadPool()
{
pthread_t tid;
for(int i=0;i<_thread_num;i++)
{
//为了避免传参时C++传this指针,在线程执行方法上带static。
//传参数时将this指针传到线程执行方法上。
pthread_create(&tid,nullptr,Routine,this);
}
}
};
设计任务Task.h
#pragma once
#include<iostream>
#include<string>
#include<unistd.h>
#include<string.h>
struct Hander{
void operator()(int sock,std::string&ip,int port)
{
char buff[1024];
while(true)
{
memset(buff,0,sizeof(buff));
ssize_t ReadSize=read(sock,buff,sizeof(buff)-1);
if(ReadSize>0){
buff[ReadSize]='\0';//添加字符串结束标志
std::cout<<"Client["<<ip<<"-port-"<<port<<"]# "<<buff<<std::endl;
write(sock,buff,ReadSize);//向客户端写入数据
}
else if(ReadSize==0){//对端关闭链接
std::cout<<"Client["<<ip<<"-port-"<<port<<"]# 关闭"<<std::endl;
break;
}
else{
std::cerr<<sock<<"读取错误"<<std::endl;
break;
}
}
std::cout<<"服务终止"<<std::endl;
close(sock);
}
};
class Task{
private:
int sock;
std::string ip;
int port;
Hander condut;//仿函数
public:
Task()=default;
Task(int _sock,std::string _ip,int _port)
:sock(_sock),ip(_ip),port(_port)
{}
void Run(){
condut(sock,ip,port);
}
};
服务端代码
#pragma once
#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<string.h>
#include<fcntl.h>
#include<signal.h>
#include<sys/wait.h>
#include<pthread.h>
#include"ThreadPool.h"
#include"Task.h"
#define DEF_PORT 8081
//通信结构体
class Pragram{
public:
int sock;
std::string ip;
int port;
Pragram(int _sock,std::string _ip,int _port)
:sock(_sock),ip(_ip),port(_port)
{}
};
class TcpSever
{
private:
int port;
int listen_sock;//监听套接字
ThreadPool<Task>*Pool;
public:
TcpSever(int _port=DEF_PORT):port(_port),listen_sock(-1),Pool(nullptr)
{}
void InitSever()
{
listen_sock=socket(AF_INET,SOCK_STREAM,0);//IPV4流式套接
if(listen_sock<0){
std::cerr<<"监听套接字创建失败"<<std::endl;
exit(-1);
}
//绑定
struct sockaddr_in local;
memset(&local,'\0',sizeof(local));//初始化结构体
local.sin_family=AF_INET;
local.sin_port=htons(port);//主机转网络
local.sin_addr.s_addr=INADDR_ANY;//随机绑定服务器ip
if(bind(listen_sock,(struct sockaddr*)&local,sizeof(local))<0){
std::cerr<<"绑定失败"<<std::endl;
exit(-2);
}
//建立链接
#define BACK_LOG 5
if(listen(listen_sock,BACK_LOG)<0){
std::cerr<<"链接失败"<<std::endl;
exit(-3);
}
std::cout<<"服务启动成功"<<std::endl;
Pool=new ThreadPool<Task>(10);//10个线程
}
//获取链接
void Loop()
{
Pool->InitThreadPool();
//获取客户端信息
struct sockaddr_in client;
//signal(SIGCHID,hander);//hander是信号捕捉方法
while(true)
{
socklen_t len=sizeof(client);
//获取新链接
int sock=accept(listen_sock,(struct sockaddr*)&client,&len);
if(sock<0){
std::cerr<<"接受链接失败"<<std::endl;
continue;//继续接受其他链接请求
}
std::string ip=inet_ntoa(client.sin_addr);
int port=ntohs(client.sin_port);
Task asig(sock,ip,port);
Pool->Push(asig);
}
}
~TcpSever(){
if(listen_sock>=0){
close(listen_sock);
}
}
};
客户端代码不需要改变