当前位置: 首页 > article >正文

【Linux网络编程】第五弹---构建基于UDP的简单聊天室系统:代码结构与功能模块详解

个人主页: 熬夜学编程的小林

💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】【Linux网络编程】

目录

1、UdpServer.hpp

1.1、UdpServer类基本结构

1.2、构造函数

1.3、Start()

2、Route.hpp

2.1、Route类基本结构

2.2、构造析构函数

2.3、Forward()

2.3.1、CheckOnlineUser()

2.3.2、Offline()

2.3.3、ForwardHelper()

3、UdpServerMain.cc

4、UdpClientMain.cc

4.1、主函数

4.2、InitClient()

4.3、RecvMessage()

4.4、SendMessage()

5、补充参考内容

5.1、地址转换函数

5.2、关于 inet_ntoa

6、完整代码 

6.1、InetAddr.hpp

6.2、LockGuard.hpp

6.3、Log.hpp

6.4、Makefile

6.5、nocopy

6.6、Route.hpp

6.7、Thread.hpp

6.8、ThreadPool.hpp

6.9、UdpClientMain.cc

6.10、UdpServer.hpp

6.11、UdpServerMain.cc

6.12、Test.c


前面一弹实现了翻译版本的UDP网络通信,此弹实现一个多线程版本聊天室的网络通信(即将消息转发给所以用户),此处先拷贝第一版本通信的代码,即第三弹的代码!

1、UdpServer.hpp

UdpServer类的启动函数是实现执行聊天室的功能,因此需要加执行函数的成员变量并适当修改构造函数,启动函数!!

1.1、UdpServer类基本结构

UdpServer类基本结构没变,只是添加了一个执行方法

using server_t = std::function<void(int,const std::string message,InetAddr& who)>;

// UdpServer user("192.1.1.1",8899)
// 一般服务器主要是用来进行网络数据读取和写入的,即IO的
// 服务器 IO逻辑 和 业务逻辑 解耦
class UdpServer : public nocopy
{
public:
    UdpServer(server_t func, uint16_t localport = glocalport);

    void InitServer();

    void Start();

    ~UdpServer();
private:
    int _sockfd;          // 读写都用同一个sockfd,说明:udp是全双工通信的
    uint16_t _localport;  // 端口号
    bool _isrunning;

    server_t _func;       // 执行方法
};

1.2、构造函数

构造函数只需加一个函数对象的初始化

UdpServer(server_t func, uint16_t localport = glocalport)
    : _func(func), _sockfd(gsockfd), _localport(localport), _isrunning(false)
{
}

1.3、Start()

启动函数原本只是接收并发送消息,现在改为接收消息和执行转发消息的功能

void Start()
{
    _isrunning = true;
    char message[1024];
    while (_isrunning)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        // 收消息,返回值: 实际收到多少字节
        ssize_t n = recvfrom(_sockfd, message, sizeof(message) - 1, 0, (struct sockaddr *)&peer, &len);
        if (n > 0)
        {
            InetAddr addr(peer);
            message[n] = 0;
            LOG(DEBUG,"[%s]# %s\n",addr.AddrStr().c_str(),message);
            _func(_sockfd,message,addr); // 转发消息
            LOG(DEBUG,"return udpserver\n");
        }
        else
        {
            std::cout << "recvfrom ,error " << std::endl;
        }
    }
}

2、Route.hpp

Route类 对转发消息功能的封装

2.1、Route类基本结构

 Route类 成员变量是一个InetAddr类型的vector容器,但是此处为多线程版本可能造成数据不一致问题,因此需要加锁,内部实现转发消息的函数

using task_t = std::function<void()>;

class Route
{
public:
    Route();
    // 检查用户是否在 在线列表中
    void CheckOnlineUser(InetAddr& who);
    // for test(移除用户)
    void Offline(InetAddr& who);
    // 转发消息函数具体实现
    void ForwardHelper(int sockfd,const std::string& message,InetAddr who);
    // 转发
    void Forward(int sockfd,const std::string& message,InetAddr& who);
    ~Route();
private:
    std::vector<InetAddr> _online_user; // 用户在线列表
    pthread_mutex_t _mutex;
};

2.2、构造析构函数

Route类的构造函数初始化互斥锁,析构函数释放互斥锁

Route()
{
    pthread_mutex_init(&_mutex,nullptr);
}

~Route()
{
    pthread_mutex_destroy(&_mutex);
}

2.3、Forward()

转发函数有两大步骤

1、检查用户是否在 在线列表中,在则忽略,不在则添加到 _online_user中

  • 1.1、如果转发的消息为 QUIT 或者 Q 则将用户移除 在线列表

2、执行转发消息函数

// 转发
void Forward(int sockfd,const std::string& message,InetAddr& who)
{
    // 1.该用户是否在 在线用户列表中呢?如果在忽略;如果不在,自动添加到_online_user
    CheckOnlineUser(who);

    // 1.1 message == "QUIT" "Q"
    if(message == "QUIT" || message == "Q")
    {
        Offline(who);
    }
    // 2.who一定在_online_user里面
    // ForwardHelper(sockfd,message);
    task_t t = std::bind(&Route::ForwardHelper,this,sockfd,message,who);
    ThreadPool<task_t>::GetInstance()->Equeue(t);
}

2.3.1、CheckOnlineUser()

检查用户是否在 在线列表中!

// 检查用户是否在 在线列表中
void CheckOnlineUser(InetAddr& who)
{
    LockGuard lockguard(&_mutex);
    for(auto& user : _online_user)
    {
        // 需要重载
        if(user == who) 
        {
            LOG(DEBUG,"[%s] is exists\n",who.AddrStr().c_str());
            return;
        }
    }
    LOG(DEBUG,"[%s] is not exists,add it\n",who.AddrStr().c_str());
    _online_user.push_back(who);
}

1、用户的比较需要在InetAddr类中实现等于的重载函数!

用户的IP 和 端口都相等表示两个用户相等!

bool operator==(const InetAddr& addr)
{
    return (this->_ip == addr._ip && this->_port == addr._port);
}

 2、用户的AddrStr()函数也需要实现

AddrStr()函数打印IP和端口,并使用冒号(:)拼接!

std::string AddrStr()
{
    return _ip + ":" + std::to_string(_port);
}

2.3.2、Offline()

移除用户!

注意:vector容器删除成员需要使用迭代器! 

void Offline(InetAddr& who)
{
    LockGuard lockguard(&_mutex);
    // 删除需要迭代器
    auto iter = _online_user.begin();
    for(;iter != _online_user.end();iter++)
    {
        if(*iter == who)
        {
            LOG(DEBUG,"[%s] is offline\n",who.AddrStr().c_str());
            _online_user.erase(iter);
            break;
        }
    }
}

2.3.3、ForwardHelper()

转发消息函数具体实现[为了知道是谁发送的消息,此处打印的消息包含IP 和 端口]!

// 转发消息函数具体实现
void ForwardHelper(int sockfd,const std::string& message,InetAddr who)
{
    LockGuard lockguard(&_mutex);
    std::string send_message = "[" + who.AddrStr() + "]# " + message;
    
    for(auto& user : _online_user)
    {
        struct sockaddr_in peer = user.Addr();
        // 无IP和端口
        // LOG(DEBUG,"Forward message to %s,message is %s\n",user.AddrStr().c_str(),message.c_str());
        // ::sendto(sockfd,message.c_str(),message.size(),0,(struct sockaddr*)&peer,sizeof(peer));
        LOG(DEBUG,"Forward message to %s,message is %s\n",user.AddrStr().c_str(),send_message.c_str());
        ::sendto(sockfd,send_message.c_str(),send_message.size(),0,(struct sockaddr*)&peer,sizeof(peer));
    }
}

3、UdpServerMain.cc

UdpServerMain.cc 调用服务端的执行函数!!!

// .udp_client local-port
// .udp_client 8888
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << "  server-port" << std::endl;
        exit(0);
    }

    uint16_t port = std::stoi(argv[1]);

    EnableScreen();
    Route messageRoute;

    server_t message_route = std::bind(&Route::Forward, &messageRoute,
            std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);

    std::unique_ptr<UdpServer> usvr = std::make_unique<UdpServer>(message_route,port); // C++14标准

    usvr->InitServer();
    usvr->Start();
    return 0;
}

运行结果(可能出现的问题) 

因为博主前面进行过测试,可能会出现这样的情况!

运行结果(正常) 

此处确实能够进行多个客户端通信,但是收到的消息还是有点乱码(因为此处的客户端是单进程单线程的程序,而服务端是多线程的),因此需要修改客户端的主函数代码

4、UdpClientMain.cc

客户端主函数实现成两个线程版本,一个线程用户发消息,一个线程用于收消息

4.1、主函数

主函数通过命令行参数获取IP和端口,然后创建套接字,再创建两个线程,最后启动并回收线程!!

int main(int argc, char *argv[])
{
    if (argc != 3)
    {
        std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;
        exit(0);
    }
    std::string serverip = argv[1];
    uint16_t serverport = std::stoi(argv[2]);
    int sockfd = InitClient();

    Thread recver("recver-thread", std::bind(&RecvMessage, sockfd, std::placeholders::_1));
    Thread sender("sender-thread", std::bind(&SendMessage, sockfd, serverip, serverport, std::placeholders::_1));

    recver.Start();
    sender.Start();

    recver.Join();
    sender.Join();

    ::close(sockfd);
    return 0;
}

4.2、InitClient()

创建套接字,创建成功返回文件描述符!

int InitClient()
{
    int sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
    if (sockfd < 0)
    {
        std::cerr << "create socket eror\n"
                  << std::endl;
        exit(1);
    }
    return sockfd;
}

4.3、RecvMessage()

收消息函数,如果接收消息成功向标准错误流打印结果(以便于后面测试)!

void RecvMessage(int sockfd, const std::string &name)
{
    while (true)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        char buffer[1024];
        int n = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cerr << buffer << std::endl;
        }
        else
        {
            std::cerr << "recvfrom error" << std::endl;
            break;
        }
    }
}

4.4、SendMessage()

向指定网络进程发送消息!

void SendMessage(int sockfd, std::string serverip, uint16_t severport, const std::string &name)
{
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(severport); // 转换重要!!!
    server.sin_addr.s_addr = inet_addr(serverip.c_str());

    std::string cli_profix = name + "# "; // sender-thread#
    while (true)
    {
        std::string line;
        std::cout << cli_profix;
        std::getline(std::cin, line);
        // 发消息,你要知道发送给谁
        int n = sendto(sockfd, line.c_str(), line.size(), 0, (struct sockaddr *)&server, sizeof(server));
        if (n <= 0)
            break;
    }
}

5、补充参考内容

5.1、地址转换函数

  • 本节只介绍基于 IPv4 的 socket 网络编程,sockaddr_in 中的成员 struct in_addr sin_addr 表示 32 位 的 IP 地址
  • 但是我们通常用点分十进制的字符串表示 IP 地址,以下函数可以在字符串表示 和 in_addr 表示之间转换;

字符串转 in_addr 的函数:

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

int inet_aton(const char *cp, struct in_addr *inp);

in_addr_t inet_addr(const char *cp);

int inet_pton(int af, const char *src, void *dst);

in_addr 转字符串的函数: 

#include <arpa/inet.h>

char *inet_ntoa(struct in_addr in);

const char *inet_ntop(int af, const void *src,char *dst, socklen_t size);

其中 inet_pton 和 inet_ntop 不仅可以转换 IPv4 的 in_addr,还可以转换 IPv6 的 in6_addr,因此函数接口是 void *addrptr。 

代码示例:

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

int main()
{
    struct sockaddr_in addr;
    inet_aton("127.0.0.1",&addr.sin_addr);
    uint32_t* ptr = (uint32_t*)&addr.sin_addr;
    printf("addr: %x\n",*ptr);
    printf("addr_str: %s\n",inet_ntoa(addr.sin_addr));
    return 0;
}

 

5.2、关于 inet_ntoa

inet_ntoa 这个函数返回了一个 char*, 很显然是这个函数自己在内部为我们申请了一块内存来保存 ip 的结果. 那么是否需要调用者手动释放呢? 

The  inet_ntoa() function converts the Internet host address in, 
given in network byte order, to a string in IPv4 dotted-decimal notation.  
The string is returned in a statically allocated buffer, which 
subsequent calls will overwrite.

man 手册上说, inet_ntoa 函数, 是把这个返回结果放到了静态存储区. 这个时候不需要我们手动进行释放.
那么问题来了, 如果我们调用多次这个函数, 会有什么样的效果呢? 参见如下代码: 

int main()
{
    struct sockaddr_in addr1;
    struct sockaddr_in addr2;
    addr1.sin_addr.s_addr = 0;
    addr2.sin_addr.s_addr = 0xffffffff;
    char* ptr1 = inet_ntoa(addr1.sin_addr);
    char* ptr2 = inet_ntoa(addr2.sin_addr);
    printf("ptr1: %s,ptr2: %s\n",ptr1,ptr2);
    return 0;
}

运行结果如下:

因为 inet_ntoa 把结果放到自己内部的一个静态存储区, 这样第二次调用时的结果会覆盖掉上一次的结果.

思考: 如果有多个线程调用 inet_ntoa, 是否会出现异常情况呢?

  • 在 APUE 中, 明确提出 inet_ntoa 不是线程安全的函数;
  • 但是在 centos7 上测试, 并没有出现问题, 可能内部的实现加了互斥锁;
  • uu们课后自己写程序验证一下在自己的机器上 inet_ntoa 是否会出现多线程的问题;
  • 在多线程环境下, 推荐使用 inet_ntop, 这个函数由调用者提供一个缓冲区保存结果, 可以规避线程安全问题;

6、完整代码 

6.1、InetAddr.hpp

#pragma once

#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

class InetAddr
{
private:
    // 网络地址转本地地址 
    void ToHost(const struct sockaddr_in& addr) 
    {
        _port = ntohs(addr.sin_port); // 网络转主机
        // _ip = inet_ntoa(addr.sin_addr); // 结构化转字符串
        char ip_buf[32];
        // inet_p to n
        // p: process
        // n: net
        // int inet_pton(int af, const char *src, void *dst);
        // inet_pton(AF_INET,ip.c_str(),&addr.sin_addr.s_addr);
        ::inet_ntop(AF_INET,&addr.sin_addr,ip_buf,sizeof(ip_buf)); // 安全转化
        _ip = ip_buf;
    }
public:
    InetAddr(const struct sockaddr_in& addr):_addr(addr)
    {
        ToHost(addr);
    }
    bool operator==(const InetAddr& addr)
    {
        return (this->_ip == addr._ip && this->_port == addr._port);
    }
    std::string Ip()
    {
        return _ip;
    }
    uint16_t Port()
    {
        return _port;
    }
    struct sockaddr_in Addr()
    {
        return _addr;
    }
    std::string AddrStr()
    {
        return _ip + ":" + std::to_string(_port);
    }
    ~InetAddr()
    {}
private:
    std::string _ip;
    uint16_t _port;
    struct sockaddr_in _addr;
};

6.2、LockGuard.hpp

#pragma once 
#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t* _mutex;
};

6.3、Log.hpp

#pragma once

#include <iostream>
#include <fstream>
#include <ctime>
#include <cstring>
#include <sys/types.h>
#include <unistd.h>
#include <stdarg.h>
#include <pthread.h>
#include "LockGuard.hpp"

namespace log_ns
{

    // 日志等级
    enum
    {
        DEBUG = 1,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    std::string LevelToString(int level)
    {
        switch (level)
        {
        case DEBUG:
            return "DEBUG";
        case INFO:
            return "INFO";
        case WARNING:
            return "WARNING";
        case ERROR:
            return "ERROR";
        case FATAL:
            return "FATAL";
        default:
            return "UNKNOW";
        }
    }

    std::string GetCurrTime()
    {
        time_t now = time(nullptr); // 时间戳
        struct tm *curr_time = localtime(&now);
        char buffer[128];
        snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
                    curr_time->tm_year + 1900,
                    curr_time->tm_mon + 1,
                    curr_time->tm_mday,
                    curr_time->tm_hour,
                    curr_time->tm_min,
                    curr_time->tm_sec);
        return buffer;
    }

    class logmessage
    {
    public:
        std::string _level;        // 日志等级
        pid_t _id;                 // pid
        std::string _filename;     // 文件名
        int _filenumber;           // 文件行号
        std::string _curr_time;    // 当前时间
        std::string _message_info; // 日志内容
    };

#define SCREEN_TYPE 1
#define FILE_TYPE 2

    const std::string glogfile = "./log.txt";
    pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;

    // log.logMessage(""/*文件名*/,12/*文件行号*/,INFO/*日志等级*/,"this is a %d message,%f,%s,hello world"/*日志内容*/,x,,);
    class Log
    {
    public:
        // 默认向显示器打印
        Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE)
        {
        }
        // 打印方式
        void Enable(int type)
        {
            _type = type;
        }
        // 向屏幕打印
        void FlushToScreen(const logmessage &lg)
        {
            printf("[%s][%d][%s][%d][%s] %s",
                    lg._level.c_str(),
                    lg._id,
                    lg._filename.c_str(),
                    lg._filenumber,
                    lg._curr_time.c_str(),
                    lg._message_info.c_str());
        }
        // 向文件打印
        void FlushToFile(const logmessage &lg)
        {
            std::ofstream out(_logfile, std::ios::app); // 追加打开文件
            if (!out.is_open())
                return; // 打开失败直接返回

            char logtxt[2048];
            snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",
                        lg._level.c_str(),
                        lg._id,
                        lg._filename.c_str(),
                        lg._filenumber,
                        lg._curr_time.c_str(),
                        lg._message_info.c_str());
            out.write(logtxt, strlen(logtxt)); // 写文件
            out.close();                       // 关闭文件
        }
        // 刷新日志
        void FlushLog(const logmessage &lg)
        {
            // 加过滤逻辑 --- TODO
            // ...

            LockGuard lockguard(&glock); // RAII锁
            switch (_type)
            {
            case SCREEN_TYPE:
                FlushToScreen(lg);
                break;
            case FILE_TYPE:
                FlushToFile(lg);
                break;
            }
        }
        // ... 可变参数(C语言)
        // 初始化日志信息
        void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
        {
            logmessage lg;

            lg._level = LevelToString(level);
            lg._id = getpid();
            lg._filename = filename;
            lg._filenumber = filenumber;
            lg._curr_time = GetCurrTime();

            va_list ap;           // va_list-> char*指针
            va_start(ap, format); // 初始化一个va_list类型的变量
            char log_info[1024];
            vsnprintf(log_info, sizeof(log_info), format, ap);
            va_end(ap); // 释放由va_start宏初始化的va_list资源
            lg._message_info = log_info;

            // 日志打印出来(显示器/文件)
            FlushLog(lg);
        }
        ~Log()
        {
        }

    private:
        int _type;            // 打印方式
        std::string _logfile; // 文件名
    };

    Log lg;
// 打印日志封装成宏,使用函数方式调用
#define LOG(Level, Format, ...)                                          \
    do                                                                   \
    {                                                                    \
        lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
    } while (0)
// 设置打印方式,使用函数方式调用
#define EnableScreen()          \
    do                          \
    {                           \
        lg.Enable(SCREEN_TYPE); \
    } while (0)
// 设置打印方式,使用函数方式调用
#define EnableFile()          \
    do                        \
    {                         \
        lg.Enable(FILE_TYPE); \
    } while (0)

}

6.4、Makefile

.PHONY:all
all:udpserver udpclient

udpserver:UdpServerMain.cc
	g++ -o $@ $^ -std=c++14

udpclient:UdpClientMain.cc
	g++ -o $@ $^ -std=c++14

.PHONY:clean 
clean:
	rm -rf udpserver udpclient

6.5、nocopy

#pragma once

class nocopy
{
public:
    nocopy(){}
    ~nocopy(){}
    nocopy(const nocopy&) = delete;
    const nocopy& operator=(const nocopy&) = delete;
};

6.6、Route.hpp

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include "InetAddr.hpp"
#include "ThreadPool.hpp"
#include "LockGuard.hpp"

using task_t = std::function<void()>;

class Route
{
public:
    Route()
    {
        pthread_mutex_init(&_mutex,nullptr);
    }
    // 检查用户是否在 在线列表中
    void CheckOnlineUser(InetAddr& who)
    {
        LockGuard lockguard(&_mutex);
        for(auto& user : _online_user)
        {
            // 需要重载
            if(user == who) 
            {
                LOG(DEBUG,"[%s] is exists\n",who.AddrStr().c_str());
                return;
            }
        }
        LOG(DEBUG,"[%s] is not exists,add it\n",who.AddrStr().c_str());
        _online_user.push_back(who);
    }
    // for test(移除用户)
    void Offline(InetAddr& who)
    {
        LockGuard lockguard(&_mutex);
        // 删除需要迭代器
        auto iter = _online_user.begin();
        for(;iter != _online_user.end();iter++)
        {
            if(*iter == who)
            {
                LOG(DEBUG,"[%s] is offline\n",who.AddrStr().c_str());
                _online_user.erase(iter);
                break;
            }
        }
    }
    // 转发消息函数具体实现
    void ForwardHelper(int sockfd,const std::string& message,InetAddr who)
    {
        LockGuard lockguard(&_mutex);
        std::string send_message = "[" + who.AddrStr() + "]# " + message;
        
        for(auto& user : _online_user)
        {
            struct sockaddr_in peer = user.Addr();
            // 无IP和端口
            // LOG(DEBUG,"Forward message to %s,message is %s\n",user.AddrStr().c_str(),message.c_str());
            // ::sendto(sockfd,message.c_str(),message.size(),0,(struct sockaddr*)&peer,sizeof(peer));
            LOG(DEBUG,"Forward message to %s,message is %s\n",user.AddrStr().c_str(),send_message.c_str());
            ::sendto(sockfd,send_message.c_str(),send_message.size(),0,(struct sockaddr*)&peer,sizeof(peer));
        }
    }
    // 转发
    void Forward(int sockfd,const std::string& message,InetAddr& who)
    {
        // 1.该用户是否在 在线用户列表中呢?如果在忽略;如果不在,自动添加到_online_user
        CheckOnlineUser(who);

        // 1.1 message == "QUIT" "Q"
        if(message == "QUIT" || message == "Q")
        {
            Offline(who);
        }
        // 2.who一定在_online_user里面
        // ForwardHelper(sockfd,message);
        task_t t = std::bind(&Route::ForwardHelper,this,sockfd,message,who);
        ThreadPool<task_t>::GetInstance()->Equeue(t);
    }
    ~Route()
    {
        pthread_mutex_destroy(&_mutex);
    }
private:
    std::vector<InetAddr> _online_user;
    pthread_mutex_t _mutex;
};

6.7、Thread.hpp

#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>

namespace ThreadMoudle
{
    // typedef std::function<void()> func_t;
    using func_t = std::function<void(const std::string&)>;

    class Thread
    {
    public:
        void Excute()
        {
            // std::cout << _name << " is running" << std::endl;
            _isrunning = true;
            _func(_name);
            _isrunning = false;
        }
    public:
        Thread(const std::string& name,func_t func):_name(name),_func(func)
        {}
        // 新线程执行该方法
        static void* ThreadRoutine(void* args)
        {
            Thread* self = static_cast<Thread*>(args);
            self->Excute();
            return nullptr;
        }
        std::string Status()
        {
            if(_isrunning)
                return "running";
            else 
                return "sleep";
        }
        bool Start()
        {
            // ::使用库函数接口,直接使用ThreadRoutine会报错,因为成员函数有隐含this指针 + static
            int n = ::pthread_create(&_tid,nullptr,ThreadRoutine,this);
            if(n != 0) return false;
            return true;
        }
        void Stop()
        {
            if(_isrunning)
            {
                ::pthread_cancel(_tid);
                _isrunning = false;
            }
        }
        std::string Name()
        {
            return _name;
        }
        void Join()
        {
            ::pthread_join(_tid,nullptr);
        }
        ~Thread()
        {}
    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func; // 线程要执行的回调函数
    };
}

6.8、ThreadPool.hpp

#pragma once

#include <iostream>
#include <unistd.h>
#include <vector>
#include <string>
#include <queue>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"

using namespace ThreadMoudle;
using namespace log_ns;

static const int gdefaultnum = 5; // 默认创建5个线程

void test()
{
    while (true)
    {
        std::cout << "hello world" << std::endl;
        sleep(1);
    }
}

template <typename T>
class ThreadPool
{
private:
    // 任务队列加锁
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    // 唤醒一个线程
    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }
    // 唤醒所有线程
    void WakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    // 休眠
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    bool IsEmpty()
    {
        return _task_queue.empty();
    }
    // 处理任务,从任务队列取任务,加锁
    void HandlerTask(const std::string &name)
    {
        while (true)
        {
            // 1.取任务
            LockQueue();
            // 队列为空且运行则休眠
            while (IsEmpty() && _isrunning) // if?
            {
                _sleep_thread_num++; // 防止阻塞,因为一开始休眠线程数为0
                LOG(INFO, "%s thread sleep begin!\n", name.c_str());
                Sleep();
                LOG(INFO, "%s thread wakeup!\n", name.c_str());
                _sleep_thread_num--;
            }
            // 判定一种情况,为空且不运行则退出
            if (IsEmpty() && !_isrunning)
            {
                LOG(INFO, "%s thread quit!\n", name.c_str());
                UnlockQueue();
                break;
            }
            // 有任务
            T t = _task_queue.front();
            _task_queue.pop();
            UnlockQueue();

            // 2.处理任务
            t(); // 处理任务(只属于自己线程),此处不用/不能在临界区处理
        }
    }
    // 单例模式构造函数私有
    ThreadPool(int thread_num = gdefaultnum)
        : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1); // 绑定
        for (int i = 0; i < _thread_num; i++)
        {
            std::string threadname = "thread-" + std::to_string(i + 1);
            _threads.emplace_back(threadname, func);
            LOG(DEBUG, "construct thread %s done,init success\n", threadname.c_str());
        }
    }
    void Start()
    {
        _isrunning = true;
        for (auto &thread : _threads)
        {
            LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
            thread.Start();
        }
    }
    ThreadPool(const ThreadPool<T> &) = delete;     // 禁止拷贝构造
    void operator=(const ThreadPool<T> &) = delete; // 禁止赋值重载
public:
    void Stop()
    {
        LockQueue();
        _isrunning = false;
        WakeupAll(); // 防止有线程在等待
        UnlockQueue();
        LOG(INFO, "thread pool stop done!\n");
    }

    // 如果是多线程获取单例呢?
    static ThreadPool<T> *GetInstance()
    {
        // 只有第一次需要加锁,减少锁的竞争
        if (_tp == nullptr)
        {

            LockGuard lockguard(&_sig_mutex);
            if (_tp == nullptr)
            {
                LOG(INFO, "create thread pool\n");
                _tp = new ThreadPool<T>();
                _tp->Init();
                _tp->Start();
            }
            else
            {
                LOG(INFO, "get thread pool\n");
            }
        }
        return _tp;
    }
    void Equeue(const T &in)
    {
        LockQueue();
        // 防止false情况还一直在入队列
        if (_isrunning)
        {
            _task_queue.push(in);
            // 唤醒线程,有休眠的就唤醒
            if (_sleep_thread_num > 0)
                Wakeup();
        }
        UnlockQueue();
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    int _thread_num;
    std::vector<Thread> _threads;
    std::queue<T> _task_queue; // 任务队列,共享资源,需要保护
    bool _isrunning;

    int _sleep_thread_num; // 休眠线程个数

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    // 单例模式
    static ThreadPool<T> *_tp;
    static pthread_mutex_t _sig_mutex;
};

// 静态成员类外初始化
template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;

template <typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;

6.9、UdpClientMain.cc

#include "UdpServer.hpp"
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Thread.hpp"

using namespace ThreadMoudle;

int InitClient()
{
    int sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
    if (sockfd < 0)
    {
        std::cerr << "create socket eror\n"
                  << std::endl;
        exit(1);
    }
    return sockfd;
}

void RecvMessage(int sockfd, const std::string &name)
{
    while (true)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        char buffer[1024];
        int n = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cerr << buffer << std::endl;
        }
        else
        {
            std::cerr << "recvfrom error" << std::endl;
            break;
        }
    }
}

void SendMessage(int sockfd, std::string serverip, uint16_t severport, const std::string &name)
{
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(severport); // 转换重要!!!
    server.sin_addr.s_addr = inet_addr(serverip.c_str());

    std::string cli_profix = name + "# "; // sender-thread#
    while (true)
    {
        std::string line;
        std::cout << cli_profix;
        std::getline(std::cin, line);
        // 发消息,你要知道发送给谁
        int n = sendto(sockfd, line.c_str(), line.size(), 0, (struct sockaddr *)&server, sizeof(server));
        if (n <= 0)
            break;
    }
}

int main(int argc, char *argv[])
{
    if (argc != 3)
    {
        std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;
        exit(0);
    }
    std::string serverip = argv[1];
    uint16_t serverport = std::stoi(argv[2]);
    int sockfd = InitClient();

    Thread recver("recver-thread", std::bind(&RecvMessage, sockfd, std::placeholders::_1));
    Thread sender("sender-thread", std::bind(&SendMessage, sockfd, serverip, serverport, std::placeholders::_1));

    recver.Start();
    sender.Start();

    recver.Join();
    sender.Join();

    ::close(sockfd);
    return 0;
}

// 单进程单线程版本(有问题!!!)
// 客户端在未来一定要知道服务器的IP地址和端口号
// .udp_client server-ip server-port
// .udp_client 127.0.0.1 8888
// int main(int argc, char *argv[])
// {
//     if (argc != 3)
//     {
//         std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;
//         exit(0);
//     }

//     // 0.读取接收端IP和端口
//     std::string serverip = argv[1];
//     uint16_t serverport = std::stoi(argv[2]);

//     // 1.创建套接字
//     int sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
//     if (sockfd < 0)
//     {
//         std::cerr << "create socket eror\n"
//                   << std::endl;
//         exit(1);
//     }

//     // 2.设置接收端信息
//     struct sockaddr_in server;
//     memset(&server, 0, sizeof(server));
//     server.sin_family = AF_INET;
//     server.sin_port = htons(serverport); // 转换重要!!!
//     server.sin_addr.s_addr = inet_addr(serverip.c_str());

//     // 3.发消息和接收消息
//     while (true)
//     {
//         std::string line;
//         std::cout << "Please Enter# ";
//         std::getline(std::cin, line); // 以行读取消息
//         // 发消息,你要知道发送给谁
//         int n = sendto(sockfd, line.c_str(), line.size(), 0, (struct sockaddr *)&server, sizeof(server));
//         if (n > 0)
//         {
//             // 收消息
//             struct sockaddr_in temp;
//             socklen_t len = sizeof(temp);
//             char buffer[1024];
//             int m = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&temp, &len);
//             if (m > 0)
//             {
//                 buffer[m] = 0;
//                 std::cout << buffer << std::endl;
//             }
//             else
//             {
//                 break;
//             }
//         }
//         else
//         {
//             break;
//         }
//     }

//     // 4.关闭套接字
//     ::close(sockfd);
//     return 0;
// }

6.10、UdpServer.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "nocopy.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"

using namespace log_ns;

static const int gsockfd = -1;
static const uint16_t glocalport = 8888;

enum
{
    SOCKET_ERROR = 1,
    BIND_ERROR
};

using server_t = std::function<void(int,const std::string message,InetAddr& who)>;

// UdpServer user("192.1.1.1",8899)
// 一般服务器主要是用来进行网络数据读取和写入的,即IO的
// 服务器 IO逻辑 和 业务逻辑 解耦
class UdpServer : public nocopy
{
public:
    UdpServer(server_t func, uint16_t localport = glocalport)
        : _func(func), _sockfd(gsockfd), _localport(localport), _isrunning(false)
    {
    }

    void InitServer()
    {
        // 1.创建socket文件
        _sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
        if (_sockfd < 0)
        {
            LOG(FATAL, "socket error\n");
            exit(SOCKET_ERROR);
        }
        LOG(DEBUG, "socket create success,sockfd: %d\n", _sockfd); // 3

        // 2.bind
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(_localport);                  // 主机序列转网络序列
        // local.sin_addr.s_addr = inet_addr(_localip.c_str()); // 1.需要4字节ip 2.需要网络序列ip
        local.sin_addr.s_addr = INADDR_ANY; // 服务器端,进行任意IP地址绑定[0]

        int n = ::bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
        if (n < 0)
        {
            LOG(FATAL, "bind error\n");
            exit(BIND_ERROR);
        }
        LOG(DEBUG, "socket bind success\n");
    }

    void Start()
    {
        _isrunning = true;
        char message[1024];
        while (_isrunning)
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            // 收消息,返回值: 实际收到多少字节
            ssize_t n = recvfrom(_sockfd, message, sizeof(message) - 1, 0, (struct sockaddr *)&peer, &len);
            if (n > 0)
            {
                InetAddr addr(peer);
                message[n] = 0;
                LOG(DEBUG,"[%s]# %s\n",addr.AddrStr().c_str(),message);
                _func(_sockfd,message,addr); // 转发消息
                LOG(DEBUG,"return udpserver\n");
            }
            else
            {
                std::cout << "recvfrom ,error " << std::endl;
            }
        }
    }

    ~UdpServer()
    {
        // 关闭文件
        if(_sockfd < gsockfd) ::close(_sockfd);
    }
private:
    int _sockfd;          // 读写都用同一个sockfd,说明:udp是全双工通信的
    uint16_t _localport;  // 端口号
    bool _isrunning;

    server_t _func;       // 执行方法
};

6.11、UdpServerMain.cc

#include "UdpServer.hpp"
#include "Route.hpp"
#include <memory>

// .udp_client local-port
// .udp_client 8888
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << "  server-port" << std::endl;
        exit(0);
    }

    uint16_t port = std::stoi(argv[1]);

    EnableScreen();
    Route messageRoute;

    server_t message_route = std::bind(&Route::Forward, &messageRoute,
            std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);

    std::unique_ptr<UdpServer> usvr = std::make_unique<UdpServer>(message_route,port); // C++14标准

    usvr->InitServer();
    usvr->Start();
    return 0;
}

6.12、Test.c

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

int main()
{
    struct sockaddr_in addr1;
    struct sockaddr_in addr2;
    addr1.sin_addr.s_addr = 0;
    addr2.sin_addr.s_addr = 0xffffffff;
    char* ptr1 = inet_ntoa(addr1.sin_addr);
    char* ptr2 = inet_ntoa(addr2.sin_addr);
    printf("ptr1: %s,ptr2: %s\n",ptr1,ptr2);
    return 0;
}

// 测试一
// int main()
// {
//     struct sockaddr_in addr;
//     inet_aton("127.0.0.1",&addr.sin_addr);
//     uint32_t* ptr = (uint32_t*)&addr.sin_addr;
//     printf("addr: %x\n",*ptr);
//     printf("addr_str: %s\n",inet_ntoa(addr.sin_addr));
//     return 0;
// }


http://www.kler.cn/a/458107.html

相关文章:

  • 电子应用设计方案85:智能 AI门前柜系统设计
  • 【Domain Generalization(2)】领域泛化在文生图领域的工作之——PromptStyler(ICCV23)
  • Linux Red Hat 7.9 Server安装GitLab
  • JVM实战—9.线上FGC的几种案例
  • 【数据结构Ⅰ复习题】
  • 神经网络入门实战:(二十三)使用本地数据集进行训练和验证
  • 评分模型在路网通勤习惯分析中的应用——提出问题(1)
  • 【docker系列】创建博客 halo 2.11
  • 【yolov5】实现FPS游戏人物检测,并定位到矩形框上中部分,实现自瞄
  • 项目实践-贪吃蛇小游戏
  • 基于Resnet、LSTM、Shufflenet及CNN网络的Daily_and_Sports_Activities数据集仿真
  • 机器人骑自行车过程的MATLAB建模与数值仿真模拟
  • Linux之ARM(MX6U)裸机篇----1.开发环境搭建
  • C语言基础——指针(5)
  • MySQL基础-常见的增删改查操作语句总结
  • 【实习】面经:海康威视-软件开发工程师(嵌入式)
  • App信息收集(小迪网络安全笔记~
  • [Unity Shader] [图形渲染]Shader数学基础18-内置变量
  • git 在windows上显示很多文件有改动实际没有变化
  • c/c++ 无法跳转定义
  • 爬虫 APP 逆向 ---> shopee(虾皮) 电商
  • 表单元素(标签)有哪些?
  • 游戏引擎学习第66天
  • 信息差的商业渠道管理:大数据如何优化渠道管理
  • HTML5新特性|01 音频视频
  • 【每日学点鸿蒙知识】属性变量key、waterflow卡顿问题、包无法上传、Video控件播放视频、Vue类似语法