当前位置: 首页 > article >正文

Linux网络:网络套接字-TCP回显服务器——多进程/线程池(生产者消费者模型)

1.多进程版本

这里选择创建子进程,让子进程再创建子进程。父进程等待子进程,子进程的子进程处理业务逻辑。因为子进程是直接退出,子进程的子进程变成孤儿进程被系统管理,所以父进程在等待的时候不是阻塞等待的,所以可以处理多条链接请求。或者可以忽略SIGCHID信号让子进程不需要等待。

服务端
pragma once 

#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<string.h>
#include<fcntl.h>
#include<signal.h>
#include<sys/wait.h>

#define DEF_PORT 8081

class TcpSever
{
  private:
    int port;
    int listen_sock;//监听套接字
  public: 
    TcpSever(int _port=DEF_PORT):port(_port),listen_sock(-1)
    {}

    void InitSever()
    {
      listen_sock=socket(AF_INET,SOCK_STREAM,0);//IPV4流式套接
      if(listen_sock<0){
        std::cerr<<"监听套接字创建失败"<<std::endl;
        exit(-1);
      }
      //绑定
      struct sockaddr_in local;
      memset(&local,'\0',sizeof(local));//初始化结构体
      local.sin_family=AF_INET;
      local.sin_port=htons(port);//主机转网络
      local.sin_addr.s_addr=INADDR_ANY;//随机绑定服务器ip
      if(bind(listen_sock,(struct sockaddr*)&local,sizeof(local))<0){
        std::cerr<<"绑定失败"<<std::endl;
        exit(-2);
      }
      //建立链接
#define BACK_LOG 5
      if(listen(listen_sock,BACK_LOG)<0){
        std::cerr<<"链接失败"<<std::endl;
        exit(-3);
      }
    }

    //获取链接
    void Loop()
    {
      //获取客户端信息
      struct sockaddr_in client;
      //signal(SIGCHID,hander);//hander是信号捕捉方法
      while(true)
      {
        socklen_t len=sizeof(client);
        //获取新链接
        int sock=accept(listen_sock,(struct sockaddr*)&client,&len);
        if(sock<0){
          std::cerr<<"接受链接失败"<<std::endl;
          continue;//继续接受其他链接请求
        }
        //多进程处理多个客户端
        pid_t id=fork();
        if(id==0)//子进程
        {
          //子进程,与子进程的子进程不关心listen_sock
          close(listen_sock);
          if(fork()>0)
          {
            exit(0);//子进程直接退出后子进程的子进程变为孤儿进程被守护进程接管,不用等待了
          }
          //子进程的子进程,处理业务
          std::cout<<"获取新连接成功! ip["<<inet_ntoa(client.sin_addr)<<"] port["<<ntohs(client.sin_port)<<"]"<<std::endl;//整数ip转为字符ip形式,网络字节端口转化为本地端口
           //开始服务
          std::string CIp=inet_ntoa(client.sin_addr);
          int CPort=ntohs(client.sin_port);
          Sever(sock,CIp,CPort);
          exit(0);
        }
        //不能使用阻塞等待,可以使用信号机制忽略SIGCHID信号,或者使用两次fork函数
        close(sock);//父进程不关心客户端的链接请求,请求交给子进程做。
        waitpid(id,nullptr,0);//不会阻塞,因为id进程创建完子进程会立即退出,所以不会阻塞。
      }
    }

    void Sever(int sock,std::string&ip,int port)
    {
      char buff[1024];
      while(true)
      {
        memset(buff,0,sizeof(buff));
        ssize_t ReadSize=read(sock,buff,sizeof(buff)-1);
        if(ReadSize>0){
          buff[ReadSize]='\0';//添加字符串结束标志
          std::cout<<"Client["<<ip<<"-port-"<<port<<"]# "<<buff<<std::endl;
          write(sock,buff,ReadSize);//向客户端写入数据
        }
        else if(ReadSize==0){//对端关闭链接
          std::cout<<"Client["<<ip<<"-port-"<<port<<"]# 关闭"<<std::endl;
          break;
        }
        else{
          std::cerr<<sock<<"读取错误"<<std::endl;
          break;
        }
      }
      std::cout<<"服务终止"<<std::endl;
    }
    ~TcpSever(){
      if(listen_sock>=0){
        close(listen_sock);
      }
    }
};
启动服务端 
#include"Tcp_Sever.h"

// Tcp_Sever +port

void User(char*name)
{
  std::cout<<name<<" + port"<<std::endl;
}

int main(int argc,char*argv[])
{
  if(argc!=2){
    User(argv[0]);
    exit(-1);
  }
  TcpSever*Sever=new TcpSever(atoi(argv[1]));//将字符串形式的端口号转化为整数形式
  Sever->InitSever();
  Sever->Loop();
  return 0;
}
客户端 
#pragma once

#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<string>
#include<string.h>

class TcpClient
{
  private:
    std::string sev_ip;//服务器ip
    int sev_port;
    int sock;
  public:
    TcpClient(std::string _ip,int _sev_port):sev_ip(_ip),sev_port(_sev_port),sock(-1)
    {}

    void InitClient()
    {
      sock=socket(AF_INET,SOCK_STREAM,0);
      if(sock<0){
        std::cerr<<"创建套接字失败"<<std::endl;
        exit(-1);
      }
      //客户端不需要绑定监听
    }

    //启动客户端
    void Start()
    {
      //向服务器发送链接
      //填充服务器信息
      struct sockaddr_in sever;
      memset(&sever,0,sizeof(sever));
      sever.sin_family=AF_INET;
      sever.sin_port=htons(sev_port);//服务器开放的端口
      sever.sin_addr.s_addr=inet_addr(sev_ip.c_str());//服务器的ip地址

      if(connect(sock,(struct sockaddr*)&sever,sizeof(sever))==0){
        //与服务器交互
        std::cout<<"发送交互请求成功"<<std::endl;
        Request(sock);
      }
      else{
        std::cerr<<"交互失败"<<std::endl;
      }
    }

    void Request(int Sock)
    {
      std::string meg;
      while(true)
      {
        std::cout<<"输入信息#";
        std::cin>>meg;
        char buff[1024];
        write(Sock,meg.c_str(),meg.size());
        //读取服务器发送的消息
        ssize_t ReadSize=read(sock,buff,sizeof(buff)-1);
        if(ReadSize>0){
          buff[ReadSize]='\0';
        }
        std::cout<<"Sever#"<<buff<<std::endl;
      }
    }
    
    ~TcpClient()
    {
      if(sock>=0){
        close(sock);
      }
    }
};
启动客户端 
#include"Tcp_Client.h"

// ./Tcp_Client +ip +port

void User(char*name)
{
  std::cout<<name<<" + sever_ip + sever_port"<<std::endl;
}

int main(int argc,char*argv[])
{
  if(argc!=3){
    User(argv[0]);
    exit(-1);
  }

  TcpClient*Client=new TcpClient(argv[1],atoi(argv[2]));
  Client->InitClient();

  Client->Start();
  return 0;
}

2.多线程版本

注意:在类内部的成员函数,调用类内非静态成员函数时会自动带this指针。而线程执行的函数要求void* Hander(void*)类型,所以线程执行函数要用static修饰。

服务端
#pragma once 

#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<string.h>
#include<fcntl.h>
#include<signal.h>
#include<sys/wait.h>
#include<pthread.h>

#define DEF_PORT 8081

//通信结构体
class Pragram{
  public:
    int sock;
    std::string ip;
    int port;
    
    Pragram(int _sock,std::string _ip,int _port)
      :sock(_sock),ip(_ip),port(_port)
    {}
};

class TcpSever
{
  private:
    int port;
    int listen_sock;//监听套接字
  public: 
    TcpSever(int _port=DEF_PORT):port(_port),listen_sock(-1)
    {}

    void InitSever()
    {
      listen_sock=socket(AF_INET,SOCK_STREAM,0);//IPV4流式套接
      if(listen_sock<0){
        std::cerr<<"监听套接字创建失败"<<std::endl;
        exit(-1);
      }
      //绑定
      struct sockaddr_in local;
      memset(&local,'\0',sizeof(local));//初始化结构体
      local.sin_family=AF_INET;
      local.sin_port=htons(port);//主机转网络
      local.sin_addr.s_addr=INADDR_ANY;//随机绑定服务器ip
      if(bind(listen_sock,(struct sockaddr*)&local,sizeof(local))<0){
        std::cerr<<"绑定失败"<<std::endl;
        exit(-2);
      }
      //建立链接
#define BACK_LOG 5
      if(listen(listen_sock,BACK_LOG)<0){
        std::cerr<<"链接失败"<<std::endl;
        exit(-3);
      }
      std::cout<<"服务启动成功"<<std::endl;
    }

    //获取链接
    void Loop()
    {
      //获取客户端信息
      struct sockaddr_in client;
      //signal(SIGCHID,hander);//hander是信号捕捉方法
      while(true)
      {
        socklen_t len=sizeof(client);
        //获取新链接
        int sock=accept(listen_sock,(struct sockaddr*)&client,&len);
        if(sock<0){
          std::cerr<<"接受链接失败"<<std::endl;
          continue;//继续接受其他链接请求
        }
        pthread_t tid;

        std::string ip=inet_ntoa(client.sin_addr);
        int port=ntohs(client.sin_port);
        Pragram*msg=new Pragram(sock,ip,port);
        pthread_create(&tid,nullptr,HanderQuest,msg);
      }
    }
  private:
    static void* HanderQuest(void*arg)
    {
      Pragram*msg=(Pragram*)arg;
      pthread_detach(pthread_self());//线程分离
      std::cout<<"获取新连接成功"<<"[ ip:"<<msg->ip<<" port:"<<msg->port<<" ]"<<std::endl;
      Sever(msg->sock,msg->ip,msg->port);
      close(msg->sock);
      delete msg;
      return nullptr;
    }

    static void Sever(int sock,std::string&ip,int port)
    {
      char buff[1024];
      while(true)
      {
        memset(buff,0,sizeof(buff));
        ssize_t ReadSize=read(sock,buff,sizeof(buff)-1);
        if(ReadSize>0){
          buff[ReadSize]='\0';//添加字符串结束标志
          std::cout<<"Client["<<ip<<"-port-"<<port<<"]# "<<buff<<std::endl;
          write(sock,buff,ReadSize);//向客户端写入数据
        }
        else if(ReadSize==0){//对端关闭链接
          std::cout<<"Client["<<ip<<"-port-"<<port<<"]# 关闭"<<std::endl;
          break;
        }
        else{
          std::cerr<<sock<<"读取错误"<<std::endl;
          break;
        }
      }
      std::cout<<"服务终止"<<std::endl;
    }
    ~TcpSever(){
      if(listen_sock>=0){
        close(listen_sock);
      }
    }
};

客户端代码部分不变。


3.模板线程池版本

线程池代码:

PthreadPool.h

#pragma once 

#include<iostream>
#include<pthread.h>
#include<queue>

#define NUM 5

template<typename Task>
class ThreadPool
{
  private:
    std::queue<Task> _task_queue;//临界资源
    pthread_mutex_t _lock;
    int _thread_num;
    pthread_cond_t _cond;
  public:
   ThreadPool(int Num=NUM):_thread_num(Num){
     pthread_mutex_init(&_lock,nullptr);
     pthread_cond_init(&_cond,nullptr);
   }

   bool IsEmpty(){return _task_queue.empty();}

   static void*Routine(void*arg)//静态函数无隐含的this指针
   {
     pthread_detach(pthread_self());
     ThreadPool<Task>*self=(ThreadPool<Task>*)arg;
     while(true)
     {
       pthread_mutex_lock(&(self->_lock));
       while(self->IsEmpty())//循环检测临界资源是否就绪
       {
         //休眠
         pthread_cond_wait(&(self->_cond),&(self->_lock));
       }
       //任务队列有任务
       Task t;
       self->Pop(t);
       pthread_mutex_unlock(&(self->_lock));
       //解锁后处理任务
       t.Run();
     }
   }

   void Push(const Task&in)//外部添加任务
   {
     pthread_mutex_lock(&_lock);
     _task_queue.push(in);
     //唤醒一个线程
     pthread_cond_signal(&_cond);
     pthread_mutex_unlock(&_lock);
   }

   void Pop(Task&out)//线程获取任务
   {
     out=_task_queue.front();
     _task_queue.pop();
   }

   ~ThreadPool(){
     pthread_mutex_destroy(&_lock);
     pthread_cond_destroy(&_cond);
   }

   void InitThreadPool()
   {
     pthread_t tid;
     for(int i=0;i<_thread_num;i++)
     {
       //为了避免传参时C++传this指针,在线程执行方法上带static。
       //传参数时将this指针传到线程执行方法上。 
       pthread_create(&tid,nullptr,Routine,this);
     }
     
   }
};
设计任务Task.h 

 

#pragma once 

#include<iostream>
#include<string>
#include<unistd.h>
#include<string.h>

struct Hander{
  void operator()(int sock,std::string&ip,int port)
  {
    char buff[1024];
    while(true)
    {
       memset(buff,0,sizeof(buff));
       ssize_t ReadSize=read(sock,buff,sizeof(buff)-1);
       if(ReadSize>0){
         buff[ReadSize]='\0';//添加字符串结束标志
         std::cout<<"Client["<<ip<<"-port-"<<port<<"]# "<<buff<<std::endl;
         write(sock,buff,ReadSize);//向客户端写入数据
       }
       else if(ReadSize==0){//对端关闭链接
         std::cout<<"Client["<<ip<<"-port-"<<port<<"]# 关闭"<<std::endl;
         break;
       }
       else{
         std::cerr<<sock<<"读取错误"<<std::endl;
         break;
      }
    }
    std::cout<<"服务终止"<<std::endl;
    close(sock);
  }
};

class Task{
  private:
    int sock;
    std::string ip;
    int port;
    Hander condut;//仿函数
  public:
    Task()=default;
    Task(int _sock,std::string _ip,int _port)
      :sock(_sock),ip(_ip),port(_port)
    {}

    void Run(){
      condut(sock,ip,port);
    }

};
服务端代码 
#pragma once 

#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<string.h>
#include<fcntl.h>
#include<signal.h>
#include<sys/wait.h>
#include<pthread.h>

#include"ThreadPool.h"
#include"Task.h"

#define DEF_PORT 8081

//通信结构体
class Pragram{
  public:
    int sock;
    std::string ip;
    int port;
    
    Pragram(int _sock,std::string _ip,int _port)
      :sock(_sock),ip(_ip),port(_port)
    {}
};

class TcpSever
{
  private:
    int port;
    int listen_sock;//监听套接字
    ThreadPool<Task>*Pool;
  public: 
    TcpSever(int _port=DEF_PORT):port(_port),listen_sock(-1),Pool(nullptr)
    {}

    void InitSever()
    {
      listen_sock=socket(AF_INET,SOCK_STREAM,0);//IPV4流式套接
      if(listen_sock<0){
        std::cerr<<"监听套接字创建失败"<<std::endl;
        exit(-1);
      }
      //绑定
      struct sockaddr_in local;
      memset(&local,'\0',sizeof(local));//初始化结构体
      local.sin_family=AF_INET;
      local.sin_port=htons(port);//主机转网络
      local.sin_addr.s_addr=INADDR_ANY;//随机绑定服务器ip
      if(bind(listen_sock,(struct sockaddr*)&local,sizeof(local))<0){
        std::cerr<<"绑定失败"<<std::endl;
        exit(-2);
      }
      //建立链接
#define BACK_LOG 5
      if(listen(listen_sock,BACK_LOG)<0){
        std::cerr<<"链接失败"<<std::endl;
        exit(-3);
      }
      std::cout<<"服务启动成功"<<std::endl;
      Pool=new ThreadPool<Task>(10);//10个线程

    }

    //获取链接
    void Loop()
    {
      Pool->InitThreadPool();
      //获取客户端信息
      struct sockaddr_in client;
      //signal(SIGCHID,hander);//hander是信号捕捉方法
      while(true)
      {
        socklen_t len=sizeof(client);
        //获取新链接
        int sock=accept(listen_sock,(struct sockaddr*)&client,&len);
        if(sock<0){
          std::cerr<<"接受链接失败"<<std::endl;
          continue;//继续接受其他链接请求
        }
        std::string ip=inet_ntoa(client.sin_addr);
        int port=ntohs(client.sin_port);
        Task asig(sock,ip,port);
        Pool->Push(asig);
      }
    }

    ~TcpSever(){
      if(listen_sock>=0){
        close(listen_sock);
      }
    }
};

客户端代码不需要改变 


http://www.kler.cn/a/302400.html

相关文章:

  • 解决leetcode第3418题机器人可以获得的最大金币数
  • LabVIEW太赫兹二维扫描成像系统
  • 即现软著工具 - 让软著申请更高效
  • 2024.ailx10的年终总结
  • 《Effective Java》学习笔记——第1部分 创建对象和销毁对象的最佳实践
  • ESP32下FreeRTOS实时操作系统使用
  • “区块链积分系统:支付安全与效率的新篇章
  • 内外网文件安全交换如何做到?
  • 春日美食汇:基于SpringBoot的订餐平台
  • windows vscode ssh 连接远程服务器
  • 工厂模式(二):工厂方法模式
  • 使用Python或者GO实现OTP令牌的获取
  • <Linux> 基础IO
  • 利用物化视图刷新同步表记录
  • 从概念到现实,国际数字影像产业园如何打造数字文创产业标杆?
  • Android 开发避坑经验(2):深入理解Fragment与Activity交互
  • 宽哥之家小程序任务脚本
  • 服务器深度解析:五大关键问题一网打尽
  • CentOS 7 上安装 Docker
  • 【Three.js】实现护罩(防御罩、金钟罩、护盾)效果
  • 【PGCCC】PostgreSQL重做日志内幕!如何掌握事务日志记录的“黑魔法”
  • 9月13日星期五今日早报简报微语报早读
  • ssm“健康早知道”微信小程序 LW PPT源码调试讲解
  • P1544 三倍经验 (记忆化搜索)
  • SpringBoot 整合 Guava Cache 实现本地缓存
  • 算法day23| 93.复原IP地址、78.子集、90.子集II