当前位置: 首页 > article >正文

【Linux】Socket 编程 TCP

Socket 编程 TCP

  • 一.TCP socket API 详解
  • 二.EchoServer(单进程/多进程/多线程/线程池)
    • 1.Makefile
    • 2.Mutex.hpp
    • 3.Cond.hpp
    • 4.Log.hpp
    • 5.Thread.hpp
    • 6.ThreadPool.hpp
    • 7.InetAddr.hpp
    • 7.CommandExecute.hpp
    • 9.TcpServer.hpp
    • 10.TcpServer.cc
    • 11.TcpClient.cc

一.TCP socket API 详解

int socket(int domain, int type, int protocol);
  • socket() 打开一个网络通讯端口,如果成功的话,就像 open()一样返回一个文件描述符。
  • 应用程序可以像读写文件一样用 read/write 在网络上收发数据。
  • 如果 socket() 调用出错则返回-1。
  • 对于 IPv4 网络通信,domain 参数指定为 AF_INET。
  • 对于 TCP 协议,type 参数指定为 SOCK_STREAM,表示面向字节流的传输协议。
  • protocol 参数的介绍从略,指定为 0 即可。
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • 服务器程序所监听的网络地址和端口号通常是固定不变的,客户端程序得知服务器程序的地址和端口号后就可以向服务器发起连接。服务器需要调用 bind 绑定网络地址(INADDR_ANY)和固定的端口号。
  • bind()成功返回 0,失败返回-1。
  • bind()的作用是将参数 sockfd 和 myaddr 绑定在一起,使 sockfd 这个用于网络通讯的文件描述符监听 myaddr 所描述的地址和端口号。
  • struct sockaddr* 是一个通用指针类型,myaddr 参数实际上可以接受多种协议的 sockaddr 结构体(例如网络通信用 struct sockaddr_in*),而它们的长度各不相同,所以需要第三个参数 addrlen指定结构体的长度。

我们的程序中对 myaddr 参数是这样初始化的:

在这里插入图片描述

  1. 将整个结构体清零。
  2. 设置地址类型为 AF_INET。
  3. 网络地址为 INADDR_ANY,这个宏表示本地的任意 IP 地址,因为服务器可能有多个网卡,每个网卡也可能绑定多个 IP 地址,这样设置可以在所有的 IP 地址上监听,直到与某个客户端建立了连接时才确定下来到底用哪个 IP 地址。
  4. 端口号为 SERV_PORT,我们定义为 8080。
int listen(int sockfd, int backlog);
  • listen()声明 sockfd 处于监听状态,并且最多允许有 backlog 个客户端处于连接等待状态(它们位于监听队列中),如果接收到更多的连接请求就忽略,这里设置不会太大,一般是 5。
  • listen()成功返回 0,失败返回-1。
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
  • 三次握手完成后,服务器调用 accept()接受连接。
  • 如果服务器调用 accept()时还没有客户端的连接请求,就阻塞等待直到有客户端连接上来。
  • addr 是一个输出型参数,accept()返回时传出客户端的地址和端口号。
  • 如果给 addr 参数传 nullptr,表示不关心客户端的地址。
  • addrlen 参数是一个输入输出参数,输入的是调用者提供的,缓冲区 addr 的长度以避免缓冲区溢出问题,输出的是客户端地址结构体的实际长度。
  • 注意:输入的 sockfd 是监听套接字,accept 成功时:返回的是实际进行网络通信的套接字。

在这里插入图片描述

int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • 客户端需要调用 connect()连接服务器。
  • connect 和 bind 的参数形式一致,区别在于 bind 的参数是自己的地址,而connect 的参数是对方的地址。
  • connect()成功返回 0,出错返回-1。

二.EchoServer(单进程/多进程/多线程/线程池)

1.Makefile

.PHONY:all
all:server_tcp client_tcp

client_tcp:TcpClient.cc
	g++ -o $@ $^ -std=c++17
server_tcp:TcpServer.cc
	g++ -o $@ $^ -std=c++17 -lpthread

.PHONY:clean
clean:
	rm -f client_tcp server_tcp

2.Mutex.hpp

#pragma once

#include <pthread.h>

namespace MutexModule
{
    class Mutex
    {
        Mutex(const Mutex &m) = delete;
        const Mutex &operator=(const Mutex &m) = delete;

    public:
        Mutex()
        {
            ::pthread_mutex_init(&_mutex, nullptr);
        }

        ~Mutex()
        {
            ::pthread_mutex_destroy(&_mutex);
        }

        void Lock()
        {
            ::pthread_mutex_lock(&_mutex);
        }

        void Unlock()
        {
            ::pthread_mutex_unlock(&_mutex);
        }

        pthread_mutex_t *LockAddr() { return &_mutex; }

    private:
        pthread_mutex_t _mutex;
    };

    class LockGuard
    {
    public:
        LockGuard(Mutex &mutex)
            : _mutex(mutex)
        {
            _mutex.Lock();
        }

        ~LockGuard()
        {
            _mutex.Unlock();
        }

    private:
        Mutex &_mutex; // 使用引用: 互斥锁不支持拷贝
    };
}

3.Cond.hpp

#pragma

#include <pthread.h>
#include "Mutex.hpp"

namespace CondModule
{
    using namespace MutexModule;

    class Cond
    {
    public:
        Cond() 
        {
            ::pthread_cond_init(&_cond, nullptr);
        }

        ~Cond() 
        {
            ::pthread_cond_destroy(&_cond);
        }

        void Wait(Mutex &mutex) // 线程释放曾经持有的锁, 不能拷贝
        {
            ::pthread_cond_wait(&_cond, mutex.LockAddr());
        }

        void Signal()
        {
            ::pthread_cond_signal(&_cond);
        }

        void Broadcast()
        {
            ::pthread_cond_broadcast(&_cond);
        }

    private:
        pthread_cond_t _cond;
    };
}

4.Log.hpp

#pragma once

#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem>
#include <fstream>
#include <sstream>
#include <memory>
#include <unistd.h>
#include <time.h>
#include "Mutex.hpp"

namespace LogModule
{
    using namespace MutexModule;

    // 获取系统时间
    std::string CurrentTime()
    {
        time_t time_stamp = ::time(nullptr); // 获取时间戳
        struct tm curr;
        localtime_r(&time_stamp, &curr); // 将时间戳转化为可读性强的信息

        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",
                 curr.tm_year + 1900,
                 curr.tm_mon + 1,
                 curr.tm_mday,
                 curr.tm_hour,
                 curr.tm_min,
                 curr.tm_sec);

        return buffer;
    }

    // 日志文件: 默认路径和默认文件名
    const std::string defaultlogpath = "./log/";
    const std::string defaultlogname = "log.txt";

    // 日志等级
    enum class LogLevel
    {
        DEBUG = 1,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    std::string Level2String(LogLevel level)
    {
        switch (level)
        {
        case LogLevel::DEBUG:
            return "DEBUG";
        case LogLevel::INFO:
            return "INFO";
        case LogLevel::WARNING:
            return "WARNING";
        case LogLevel::ERROR:
            return "ERROR";
        case LogLevel::FATAL:
            return "FATAL";
        default:
            return "NONE";
        }
    }

    // 3. 策略模式: 刷新策略
    class LogStrategy
    {
    public:
        virtual ~LogStrategy() = default;
        // 纯虚函数: 无法实例化对象, 派生类可以重载该函数, 实现不同的刷新方式
        virtual void SyncLog(const std::string &message) = 0;
    };

    // 3.1 控制台策略
    class ConsoleLogStrategy : public LogStrategy
    {
    public:
        ConsoleLogStrategy() {}
        ~ConsoleLogStrategy() {}

        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);
            std::cout << message << std::endl;
        }

    private:
        Mutex _mutex;
    };

    // 3.2 文件级(磁盘)策略
    class FileLogStrategy : public LogStrategy
    {
    public:
        FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname)
            : _logpath(logpath), _logname(logname)
        {
            // 判断_logpath目录是否存在
            if (std::filesystem::exists(_logpath))
            {
                return;
            }
            try
            {
                std::filesystem::create_directories(_logpath);
            }
            catch (std::filesystem::filesystem_error &e)
            {
                std::cerr << e.what() << std::endl;
            }
        }
        ~FileLogStrategy() {}

        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);
            std::string log = _logpath + _logname;
            std::ofstream out(log, std::ios::app); // 以追加的方式打开文件
            if (!out.is_open())
            {
                return;
            }
            out << message << "\n"; // 将信息刷新到out流中
            out.close();
        }

    private:
        std::string _logpath;
        std::string _logname;
        Mutex _mutex;
    };

    // 4. 日志类: 构建日志字符串, 根据策略进行刷新
    class Logger
    {
    public:
        Logger()
        {
            // 默认往控制台上刷新
            _strategy = std::make_shared<ConsoleLogStrategy>();
        }
        ~Logger() {}

        void EnableConsoleLog()
        {
            _strategy = std::make_shared<ConsoleLogStrategy>();
        }

        void EnableFileLog()
        {
            _strategy = std::make_shared<FileLogStrategy>();
        }

        // 内部类: 记录完整的日志信息
        class LogMessage
        {
        public:
            LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger)
                : _currtime(CurrentTime()), _level(level), _pid(::getpid())
                , _filename(filename), _line(line), _logger(logger)
            {
                std::stringstream ssbuffer;
                ssbuffer << "[" << _currtime << "] "
                         << "[" << Level2String(_level) << "] "
                         << "[" << _pid << "] "
                         << "[" << _filename << "] "
                         << "[" << _line << "] - ";

                _loginfo = ssbuffer.str();
            }
            ~LogMessage()
            {
                if(_logger._strategy)
                {
                    _logger._strategy->SyncLog(_loginfo);
                }
            }

            template <class T>
            LogMessage &operator<<(const T &info)
            {
                std::stringstream ssbuffer;
                ssbuffer << info;
                _loginfo += ssbuffer.str();
                return *this;
            }

        private:
            std::string _currtime;  // 当前日志时间
            LogLevel _level;       // 日志水平
            pid_t _pid;            // 进程pid
            std::string _filename; // 文件名
            uint32_t _line;        // 日志行号
            Logger &_logger;       // 负责根据不同的策略进行刷新
            std::string _loginfo;  // 日志信息
        };

        // 故意拷贝, 形成LogMessage临时对象, 后续在被<<时,会被持续引用,
        // 直到完成输入,才会自动析构临时LogMessage, 至此完成了日志的刷新,
        // 同时形成的临时对象内包含独立日志数据, 未来采用宏替换, 获取文件名和代码行数
        LogMessage operator()(LogLevel level, const std::string &filename, int line)
        {
            return LogMessage(level, filename, line, *this);
        }

    private:
        // 纯虚类不能实例化对象, 但是可以定义指针
        std::shared_ptr<LogStrategy> _strategy; // 日志刷新策略方案
    };

    // 定义全局logger对象
    Logger logger;

// 编译时进行宏替换: 方便随时获取行号和文件名
#define LOG(level) logger(level, __FILE__, __LINE__)

// 提供选择使用何种日志策略的方法
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}

5.Thread.hpp

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>

namespace ThreadModule
{
    using func_t = std::function<void(std::string)>;
    static int number = 1;

    // 强类型枚举: 枚举的成员名称被限定在枚举类型的作用域内
    enum class TSTATUS
    {
        NEW,
        RUNNING,
        STOP
    };

    class Thread
    {
    private:
        // 成员方法: 需要加上static表示不需要this指针, 否则回调函数报错
        // 而要执行_func()函数又需要由this指针, 所以Routine函数传this指针
        static void *Routine(void *args)
        {
            Thread *t = static_cast<Thread *>(args);
            t->_func(t->Name());
            return nullptr;
        }

        void EnableDetach() { _joinable = false; }

    public:
        Thread(func_t func)
            : _func(func), _status(TSTATUS::NEW), _joinable(true)
        {
            _name = "Thread-" + std::to_string(number++);
            _pid = getpid();
        }

        ~Thread() {}

        // 线程创建
        bool Start()
        {
            if (_status != TSTATUS::RUNNING)
            {
                int n = pthread_create(&_tid, nullptr, Routine, this);
                if (n != 0)
                    return false;
                _status = TSTATUS::RUNNING;
                return true;
            }
            return false;
        }

        // 线程退出
        bool Stop()
        {
            if (_status == TSTATUS::RUNNING)
            {
                int n = ::pthread_cancel(_tid);
                if (n != 0)
                    return false;
                _status = TSTATUS::STOP;
                return true;
            }
            return false;
        }

        // 线程等待
        bool Join()
        {
            if (_joinable)
            {
                int n = ::pthread_join(_tid, nullptr);
                if (n != 0)
                    return false;
                _status = TSTATUS::STOP;
                return true;
            }
            return false;
        }

        // 线程分离
        bool Detach()
        {
            EnableDetach();
            int n = ::pthread_detach(_tid);
            if (n != 0)
                return false;
            return true;
        }

        // 线程是否分离
        bool IsJoinable() {  return _joinable; }
        std::string Name() { return _name; }

    private:
        std::string _name;
        pthread_t _tid;
        pid_t _pid;
        bool _joinable; // 线程是否是分离的, 默认不是
        func_t _func;
        TSTATUS _status;
    };
}

6.ThreadPool.hpp

#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <memory>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
#include "Log.hpp"

namespace ThreadPoolModule
{
    using namespace MutexModule;
    using namespace CondModule;
    using namespace ThreadModule;
    using namespace LogModule;

    using thread_t = std::shared_ptr<Thread>;
    const static int defaultnum = 15;

    template <class T>
    class ThreadPool
    {
    private:
        bool IsEmpty() { return _taskq.empty(); }

        void HandlerTask(std::string name)
        {
            LOG(LogLevel::INFO) << "线程: " << name << ", 进入HandlerTask执行逻辑";
            while (true)
            {
                // 1. 拿任务: 访问共享资源, 需要加锁
                T task;
                {
                    LockGuard lockguard(_mutex);
                    while (IsEmpty() && _isrunning) // while替代if: 防止伪唤醒
                    {
                        _wait_num++;
                        _cond.Wait(_mutex); // 没任务时: 线程在条件变量上阻塞等待
                        _wait_num--;
                    }
                    // 2. 任务队列不为空 && 线程池退出
                    if (IsEmpty() && !_isrunning)
                        break;

                    task = _taskq.front();
                    _taskq.pop();
                }

                // 3. 处理任务: 并发处理, 不需要持有锁
                task();
            }
            LOG(LogLevel::INFO) << "线程: " << name << ", 退出";
        }

        ThreadPool(int num = defaultnum)
            : _num(num), _wait_num(0), _isrunning(false)
        {
            for (int i = 0; i < _num; i++)
            {
                // 在类中: bind类的公有方法, 需要取地址 + 传入this指针
                // 在类外: bind类的公有方法, 需要取地址 + 传入类的匿名对象
                _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1))); // push_back()会调用移动构造
                LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功";
            }
        }

        ThreadPool<T>(const ThreadPool<T> &) = delete;
        ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

    public:
        ~ThreadPool() {}

        // 获取单例对象
        static ThreadPool<T> *GetInstance()
        {
            // 若单例为空: 需要加锁创建单例对象
            if(instance == nullptr)
            {
                LockGuard lockguard(_lock);
                if(instance == nullptr)
                {
                    LOG(LogLevel::INFO) << "单例首次被执行, 需要加载对象...";
                    instance = new ThreadPool<T>();
                    instance->Start();
                }
            }
            // 若单例不为空: 直接返回单例对象
            return instance;
        }

        void Equeue(T in)
        {
            LockGuard lockguard(_mutex);
            if (!_isrunning) return;
            _taskq.push(in);
            if (_wait_num > 0)
            {
                _cond.Signal(); // 唤醒线程
            }
        }

        void Start()
        {
            if (_isrunning) return;
            _isrunning = true;
            for (auto &thread_ptr : _threads)
            {
                thread_ptr->Start();
                LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功";
            }
        }

        void Stop()
        {
            LockGuard lockguard(_mutex);
            if (_isrunning)
            {
                // 1. 不能再新增任务了
                _isrunning = false;

                // 2. 让线程自己退出(唤醒所有的线程) && 历史任务被执行完
                if (_wait_num > 0)
                {
                    _cond.Broadcast();
                }
            }
        }

        void Wait()
        {
            for (auto &thread_ptr : _threads)
            {
                thread_ptr->Join();
                LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功";
            }
        }

    private:
        int _num;                       // 线程的个数
        std::vector<thread_t> _threads; // 线程池
        std::queue<T> _taskq;           // 共享资源: 任务队列
        int _wait_num;                  // 等待的线程数目
        bool _isrunning;                // 线程池是否运行

        Mutex _mutex; // 锁
        Cond _cond;   // 条件变量

        static ThreadPool<T> *instance; // 单例对象
        static Mutex _lock;             // 用来保护单例
    };

    // 静态成员: 类内声明, 类外定义
    template<class T>
    ThreadPool<T> *ThreadPool<T>::instance = nullptr;
    
    template<class T>
    Mutex ThreadPool<T>::_lock;
}

7.InetAddr.hpp

#pragma once

#include <iostream>
#include <string>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "Common.hpp"

class InetAddr
{
private:
    // 端口号: 网络序列->主机序列
    void PortNetToHost()
    {
        _port = ::ntohs(_net_addr.sin_port);
    }

    // IP: 网络序列->主机序列
    void IpNetToHost()
    {
        char ipbuffer[64];
        ::inet_ntop(AF_INET, &_net_addr.sin_addr, ipbuffer, sizeof(ipbuffer));
        _ip = ipbuffer;
    }

public:
    InetAddr() {}

    InetAddr(const struct sockaddr_in &addr)
        : _net_addr(addr)
    {
        PortNetToHost();
        IpNetToHost();
    }

    InetAddr(uint16_t port)
        : _port(port), _ip("")
    {
        _net_addr.sin_family = AF_INET;
        _net_addr.sin_port = ::htons(_port);
        _net_addr.sin_addr.s_addr = INADDR_ANY;
    }

    ~InetAddr() {}

    bool operator==(const InetAddr& addr) { return _ip == addr._ip && _port == addr._port; }

    struct sockaddr *NetAddr() { return CONV(&_net_addr); }
    socklen_t NetAddrLen() { return sizeof(_net_addr); }

    std::string Ip() { return _ip; }
    uint16_t Port() { return _port; }
    std::string Addr() { return Ip() + ":" + std::to_string(Port()); }

private:
    struct sockaddr_in _net_addr;
    std::string _ip; // 主机序列: IP
    uint16_t _port;  // 主机序列: 端口号
};

7.CommandExecute.hpp

#pragma once

#include <iostream>
#include <string>
#include <set>
#include <cstdlib>

const int line_size = 1024;

class Command
{
public:
    Command() 
    {
        _white_list.insert("ls");
        _white_list.insert("pwd");
        _white_list.insert("ls -l");
        _white_list.insert("touch file.txt");
        _white_list.insert("who");
        _white_list.insert("whoami");
    }

    bool SafeCheck(const std::string &cmdstr)
    {
        auto iter = _white_list.find(cmdstr);
        return iter == _white_list.end() ? false : true;
    }

    // 给出命令字符串, 返回执行后的结果
    std::string Execute(std::string cmdstr)
    {
        // 1. pipe
        // 2. fork + dup2(pipe[1], 1) + exec*, 执行结果给父进程, pipe[0]
        // 3. return

        // FILE *popen(const char *command, const char *type);
        // int pclose(FILE * stream);

        if(!SafeCheck(cmdstr))
        {
            return std::string(cmdstr + "不支持");
        }

        // 以读模式打开管道,执行命令
        FILE *fp = ::popen(cmdstr.c_str(), "r");
        if (fp == nullptr)
        {
            return std::string("fail");
        }

        char buffer[line_size];
        std::string result;
        while (true)
        {
            // 从管道中读取命令的输出
            char *ret = ::fgets(buffer, sizeof(buffer), fp); // 自动添加'\0'
            if (ret == nullptr) break;
            result += ret;
        }
        ::pclose(fp);

        return result.empty() ? std::string("Done") : result;
    }

private:
    std::set<std::string> _white_list; // 命令白名单: 允许执行白名单中的命令
};

9.TcpServer.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <functional>

#include <pthread.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"
#include "ThreadPool.hpp"

using namespace LogModule;
using namespace ThreadPoolModule;

using task_t = std::function<void()>;
using handler_t = std::function<std::string(std::string)>;

#define BACKLOG 8

uint16_t gport = 8080;

class TcpServer
{
public:
    TcpServer(handler_t handler, int port = gport)
        : _handler(handler), _port(port), _isrunning(false)
    {}

    ~TcpServer() {}

    void InitServer()
    {
        // 1. 创建TCP套接字
        _listensockfd = ::socket(AF_INET, SOCK_STREAM, 0);
        if (_listensockfd < 0)
        {
            LOG(LogLevel::FATAL) << "socket error";
            Die(SOCKET_ERR);
        }
        LOG(LogLevel::INFO) << "socket create success, listensockfd is: " << _listensockfd;

        // 2. 绑定
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = ::htons(_port);
        local.sin_addr.s_addr = INADDR_ANY;

        int n = ::bind(_listensockfd, CONV(&local), sizeof(local));
        if (n < 0)
        {
            LOG(LogLevel::FATAL) << "bind error";
            Die(BIND_ERR);
        }
        LOG(LogLevel::INFO) << "bind success";

        // 3. Tcp是面向连接的, 就要求Tcp随时随地等待被客户端连接, Tcp需要将socket设置为监听状态
        //    客户端请求连接时: 将客户端连接请求放入一个监听队列中
        n = ::listen(_listensockfd, BACKLOG);
        if (n < 0)
        {
            LOG(LogLevel::FATAL) << "listen error";
            Die(LISTEN_ERR);
        }
        LOG(LogLevel::INFO) << "listen success";

        // ::signal(SIGCHLD, SIG_IGN); // 子进程退出, 父进程无需wait, 操作系统自动回收资源
    }

    // Tcp也是全双工的: 在同一个文件描述符中, 既可以读又可以写
    void HandlerRequest(int sockfd)
    {
        LOG(LogLevel::INFO) << "开始处理客户端请求...";
        char inbuffer[4096];
        while (true)
        {
            // 约定: 客户端发过来的是一条完整的命令string

            // 1. 读取客户端发送来的消息
            // ssize_t n = ::read(sockfd, inbuffer, sizeof(inbuffer) - 1); // 读取是不完善的
            ssize_t n = ::recv(sockfd, inbuffer, sizeof(inbuffer) - 1, 0);
            if (n > 0)
            {
                LOG(LogLevel::INFO) << inbuffer;

                inbuffer[n] = 0;
                // std::string message = "server echo# ";
                // message += inbuffer;

                std::string cmd_reault = _handler(inbuffer);

                // 2. 向客户端发送消息
                // ::write(sockfd, message.c_str(), message.size()); // 写入也是不完善的
                // ::send(sockfd, message.c_str(), message.size(), 0);

                ::send(sockfd, cmd_reault.c_str(), cmd_reault.size(), 0);
            }
            else if (n == 0)
            {
                // read如果读取的返回值是0, 表示客户端退出
                LOG(LogLevel::INFO) << "客户端退出: " << sockfd;
                break;
            }
            else
            {
                // 读取失败
                break;
            }
        }
        ::close(sockfd); // 关闭fd, 防止fd泄漏问题
    }

    void Start()
    {
        _isrunning = true;
        while (_isrunning)
        {
            // 1. Tcp不能直接获取数据: 需要获取新连接
            // 阻塞等待, 直到有客户端连接请求进入监听队列, 然后从队列中取出一个请求, 为该客户端建立连接,
            // 并返回一个新的套接字描述符, 通过这个新的套接字描述符就可以与客户端进行数据的发送和接收
            struct sockaddr_in peer;
            socklen_t peerlen = sizeof(peer);
            int sockfd = ::accept(_listensockfd, CONV(&peer), &peerlen);
            if (sockfd < 0)
            {
                LOG(LogLevel::WARNING) << "accept error: " << strerror(errno);
                continue;
            }
            LOG(LogLevel::INFO) << "accept success, sockfd is: " << sockfd;

            // 获取客户端的信息: IP + 端口号
            InetAddr addr(peer);
            LOG(LogLevel::INFO) << "client info: " << addr.Addr();

            // version1->单进程版本: 单客户端访问
            // HandlerRequest(sockfd);

            // version2->多进程版本: 多客户端访问
            // pid_t id = fork();
            // if (id == 0)
            // {
            //     // 子进程: 继承父进程的文件描述符表, 有两张表
            //     ::close(_listensockfd); // 关闭不需要的文件描述符: 监听套接字

            //     if(fork() > 0) exit(0); // 子进程退出

            //     // 孙子进程->孤儿进程: 不断与客户端的数据传输, 退出后被操作系统自动回收
            //     HandlerRequest(sockfd);

            //     exit(0);
            // }
            // // 父进程: 不断与客户端建立连接
            // ::close(sockfd); // 关闭不需要的文件描述符: socket

            // // waitpid不会阻塞
            // int rid = ::waitpid(id, nullptr, 0);
            // if(rid < 0)
            // {
            //     LOG(LogLevel::WARNING) << "waitpid error";
            // }

            // version3->多线程版本: 多客户端访问
            // 主线程和新线程共享同一张文件描述符表
            // pthread_t tid;
            // ThreadData *data = new ThreadData();
            // data->sockfd = sockfd;
            // data->self = this;
            // pthread_create(&tid, nullptr, ThreadEntry, (void *)data);

            // version4->线程池版本: 多客户端访问(适合处理短任务)
            task_t task = std::bind(&TcpServer::HandlerRequest, this, sockfd); // 构建任务
            ThreadPool<task_t>::GetInstance()->Equeue(task);

            // ThreadPool<task_t>::GetInstance()->Equeue([this, &sockfd](){
            //     this->HandlerRequest(sockfd);
            // });
        }
    }

    struct ThreadData
    {
        int sockfd;
        TcpServer *self;
    };

    // 类中ThreadEntry函数带有this指针, 需要加上static
    // 而没有this指针, 又无法调用HandlerReques函数
    // 解决方法: 封装ThreadData结构体
    static void *ThreadEntry(void *argc)
    {
        pthread_detach(pthread_self()); // 线程分离: 线程退出时由操作系统自动回收, 防止类似僵尸进程的问题
        ThreadData *data = (ThreadData *)argc;
        data->self->HandlerRequest(data->sockfd);
        return nullptr;
    }

    void Stop()
    {
        _isrunning = false;
    }

private:
    int _listensockfd; // 监听套接字
    uint16_t _port;
    bool _isrunning;

    handler_t _handler; // 处理客户端发来的任务
};

10.TcpServer.cc

#include <memory>

#include "TcpServer.hpp"
#include "CommandExecute.hpp"

int main()
{
    ENABLE_CONSOLE_LOG();

    Command cmd;
    std::shared_ptr<TcpServer> tsvr = std::make_shared<TcpServer>([&cmd](std::string cmdstr){
        return cmd.Execute(cmdstr);
    });
    
    tsvr->InitServer();
    tsvr->Start();

    return 0;
}

11.TcpClient.cc

#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "Common.hpp"

// ./client_tcp server_ip server_port
int main(int argc, char *argv[])
{
    if (argc != 3)
    {
        std::cout << "Usage: ./client_tcp server_ip server_port" << std::endl;
        return 1;
    }

    std::string server_ip = argv[1];
    int server_port = std::stoi(argv[2]);

    // 1. 创建套接字
    int sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        std::cout << "socket error" << std::endl;
        return 2;
    }

    // 2. 客户端不需要显示的进行绑定, 但是需要连接服务器, 在建立连接的过程由操作系统进行绑定IP和端口号
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = ::htons(server_port);
    server.sin_addr.s_addr = ::inet_addr(server_ip.c_str());

    int n = ::connect(sockfd, CONV(&server), sizeof(server));
    if (n < 0)
    {
        std::cout << "connect error" << std::endl;
        return 3;
    }

    std::string message;
    while (true)
    {
        char inbuffer[1024];
        std::cout << "input message: ";
        std::getline(std::cin, message);

        // 3. 向服务器发送消息
        n = ::write(sockfd, message.c_str(), message.size());
        if (n > 0)
        {
            // 4. 读取服务器发送来的消息
            int m = ::read(sockfd, inbuffer, sizeof(inbuffer) - 1);
            if (m > 0)
            {
                inbuffer[m] = 0;
                std::cout << inbuffer << std::endl;
            }
            else break;
        }
        else break;
    }

    ::close(sockfd);

    return 0;
}

http://www.kler.cn/a/588522.html

相关文章:

  • 《Python深度学习》第四讲:计算机视觉中的深度学习
  • 在Simulink中将Excel数据导入可变负载模块的方法介绍
  • 工程化与框架系列(30)--前端日志系统实现
  • cursor全栈网页开发最合适的技术架构和开发语言
  • JVM系统变量的妙用
  • 树莓派 连接 PlutoSDR 教程
  • Typedef 与enum的使用
  • 【人工智能基础2】人工神经网络、卷积神经网络基础、循环神经网络、长短时记忆网络
  • [蓝桥杯]花束搭配【算法赛】
  • python+MySQL+HTML实现产品管理系统
  • Ollama+DeepSeek+NatCross内网穿透本地部署外网访问教程
  • Flutter:竖向步骤条,类似查看物流组件
  • 一周学会Flask3 Python Web开发-SQLAlchemy更新数据操作-班级模块
  • Windows 下免安装 PostgreSQL 16、PostGIS 安装
  • Cursor插件市场打不开解决
  • CT重建笔记(四)——三维重建
  • Scheme语言的压力测试
  • 音视频缓存数学模型
  • 计算机视觉--图像数据分析基本操作
  • C# GeneticSharp包