当前位置: 首页 > article >正文

C/C++网络编程--文件分块传输

文件分块传输是网络编程中一个常见的任务,尤其是在处理大文件时,将文件分块可以提高传输效率,简化错误处理,并可以实现并发传输。下面,写个从客户端向服务器发送大型数据的demo。

客户端

客户端有两点需要注意,在传输分两个一个是文件总块数和文件块,。传输文件总块数让服务器知道有多少文件块需要接收,确保所有数据都被完整地发送到服务器,避免因文件块数不对导致文件重组失败。

传输文件总块数

int sendAll(SOCKET s, const char* buf, int len, int flags) {
    int total = 0;
    int bytesLeft = len;
    int n;

    while (total < len) {
        n = send(s, buf + total, bytesLeft, flags);
        if (n == SOCKET_ERROR) {
            return SOCKET_ERROR;
        }
        total += n;
        bytesLeft -= n;
    }

    return total;
}

传输文件块

int sendFileBlock(SOCKET clientSocket, FileBlock* block) {
    int retry_count = 0;
    while (retry_count < MAX_RETRY_COUNT) {
        if (sendAll(clientSocket, (char*)&block->id, sizeof(block->id), 0) == SOCKET_ERROR ||
            sendAll(clientSocket, (char*)&block->size, sizeof(block->size), 0) == SOCKET_ERROR ||
            sendAll(clientSocket, block->data, block->size, 0) == SOCKET_ERROR ||
            sendAll(clientSocket, (char*)&block->crc32, sizeof(block->crc32), 0) == SOCKET_ERROR) {
            
            printf("发送块 %lld 失败,重试中...\n", block->id);
            retry_count++;
            Sleep(1000);  // 等待1秒后重试
        } else {
            return 1;  // 成功发送
        }
    }
    printf("发送块 %lld 失败,已达到最大重试次数\n", block->id);
    return 0;
}

服务器端

在服务器接收文件块时得做两件事:文件块接收和文件重组。

文件块接收

文件块接收牵扯到是否接收正确的判断,这里可以采用CRC32校验和判断服务器是否有接收正确文件块:

uint32_t calculateCRC32(const char* data, int32_t size) {
    uint32_t crc = 0xFFFFFFFF;
    for (int i = 0; i < size; i++) {
        crc ^= data[i];
        for (int j = 0; j < 8; j++) {
            crc = (crc >> 1) ^ (0xEDB88320 & -(crc & 1));
        }
    }
    return ~crc;
}

稍微解释一下,第一个for遍历全部字节,第二个for是得到每一位的crc值,crc = (crc >> 1) ^ (0xEDB88320 & -(crc & 1));,其中0xEDB88320是多项式常数,客户端和服务器应该一致。这个表达式将CRC值右移一位,如果CRC值的最低位是1,则将CRC值与0xEDB88320进行异或操作。

文件重组

文件重组还是比较简单的,首先在服务器目录下建立一个新的文件,遍历每个文件块fwrite写入创建好的文件,但是这里一定要记住在整个客户端和服务器端建立连接通信过程,一定要记得释放缓冲区内存!!!

void reassembleFile(const char* filename) {
    FILE* file = fopen(filename, "wb"); // 以二进制写模式打开文件
    if (file == NULL) {
        perror("无法创建输出文件");
        return;
    }
    // 写入文件
    for (int i = 0; i < totalBlocksExpected; i++) {
        fwrite(receivedBlocks[i].data, 1, receivedBlocks[i].size, file);
        free(receivedBlocks[i].data); // 释放内存
    }
    fclose(file); // 关闭文件
    printf("文件已重组并保存为 %s\n", filename);
}

最后,讲下服务器处理客户端发送的文件,首先接收客户端发来的文件总块数,先开辟相应大小的缓存数组:receivedBlocks = (FileBlock*)malloc(sizeof(FileBlock) * totalBlocksExpected);这里值得一提FileBlock结构体,它是由块ID,data,size,crc32值组成。然后,将接收文件数据放入缓冲数组

 block.data = (char*)malloc(block.size);
 int totalReceived = 0;
 while (totalReceived < block.size) {
     bytesReceived = recv(clientSocket, buffer, (BUFFER_SIZE<block.size - totalReceived?BUFFER_SIZE:block.size - totalReceived), 0);
     if (bytesReceived <= 0) {
         free(block.data);
         goto cleanup;
     }
     memcpy(block.data + totalReceived, buffer, bytesReceived);
     totalReceived += bytesReceived;
 }

最后验证CRC是否正确,将接收到的块重组成文件。

代码汇总

客户端代码

#include "p2p_common.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>

#pragma comment(lib, "ws2_32.lib")
#pragma execution_character_set("utf-8")
#define BUFFER_SIZE 1024
#define MAX_RETRY_COUNT 5

void setConsoleCodePage() {
    SetConsoleOutputCP(65001);
    SetConsoleCP(65001);
}

uint32_t calculateCRC32(const char* data, int32_t size) {
    uint32_t crc = 0xFFFFFFFF;
    for (int i = 0; i < size; i++) {
        crc ^= data[i];
        for (int j = 0; j < 8; j++) {
            crc = (crc >> 1) ^ (0xEDB88320 & -(crc & 1));
        }
    }
    return ~crc;
}
//传输总块数
int sendAll(SOCKET s, const char* buf, int len, int flags) {
    int total = 0;
    int bytesLeft = len;
    int n;

    while (total < len) {
        n = send(s, buf + total, bytesLeft, flags);
        if (n == SOCKET_ERROR) {
            return SOCKET_ERROR;
        }
        total += n;
        bytesLeft -= n;
    }

    return total;
}

int sendFileBlock(SOCKET clientSocket, FileBlock* block) {
    int retry_count = 0;
    while (retry_count < MAX_RETRY_COUNT) {
        if (sendAll(clientSocket, (char*)&block->id, sizeof(block->id), 0) == SOCKET_ERROR ||
            sendAll(clientSocket, (char*)&block->size, sizeof(block->size), 0) == SOCKET_ERROR ||
            sendAll(clientSocket, block->data, block->size, 0) == SOCKET_ERROR ||
            sendAll(clientSocket, (char*)&block->crc32, sizeof(block->crc32), 0) == SOCKET_ERROR) {
            
            printf("发送块 %lld 失败,重试中...\n", block->id);
            retry_count++;
            Sleep(1000);  // 等待1秒后重试
        } else {
            return 1;  // 成功发送
        }
    }
    printf("发送块 %lld 失败,已达到最大重试次数\n", block->id);
    return 0;
}

void displayProgress(int64_t current, int64_t total) {
    const int barWidth = 50;
    float progress = (float)current / total;
    int pos = barWidth * progress;

    printf("\r[");
    for (int i = 0; i < barWidth; ++i) {
        if (i < pos) printf("=");
        else if (i == pos) printf(">");
        else printf(" ");
    }
    printf("] %d%%", (int)(progress * 100.0));
    fflush(stdout);
}
int main() {
    setConsoleCodePage();

    WSADATA wsaData;
    SOCKET clientSocket;
    struct sockaddr_in serverAddr;
    char filename[BUFFER_SIZE];

    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        printf("WSAStartup 失败!\n");
        return -1;
    }

    printf("请输入要上传的文件名: ");
    scanf("%s", filename);
    getchar(); // 清除输入缓冲

    FILE *fp = fopen(filename, "rb");
    if (fp == NULL) {
        perror("无法打开文件");
        WSACleanup();
        return -1;
    }
    // 获取文件大小,
    fseek(fp, 0, SEEK_END);
    long fileSize = ftell(fp);
    fseek(fp, 0, SEEK_SET);
    //向上取整
    int totalBlocks = (fileSize + BLOCK_SIZE - 1) / BLOCK_SIZE;

    clientSocket = socket(AF_INET, SOCK_STREAM, 0);
    if (clientSocket == INVALID_SOCKET) {
        printf("套接字创建失败!\n");
        fclose(fp);
        WSACleanup();
        return -1;
    }

    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(PORT);
    serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if (connect(clientSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
        printf("连接失败!\n");
        closesocket(clientSocket);
        fclose(fp);
        WSACleanup();
        return -1;
    }

    printf("已连接到服务器。\n");

    if (sendAll(clientSocket, (char*)&totalBlocks, sizeof(totalBlocks), 0) == SOCKET_ERROR) {
        printf("发送总块数失败\n");
        closesocket(clientSocket);
        fclose(fp);
        WSACleanup();
        return -1;
    }

    FileBlock block;
    for (int64_t i = 0; i < totalBlocks; i++) {
        block.id = i;
        block.size = (i == totalBlocks - 1) ? (fileSize % BLOCK_SIZE) : BLOCK_SIZE;
        if (block.size == 0) block.size = BLOCK_SIZE;

        block.data = (char*)malloc(block.size);
        fread(block.data, 1, block.size, fp);

        block.crc32 = calculateCRC32(block.data, block.size);

        if (!sendFileBlock(clientSocket, &block)) {
            printf("\n发送块 %lld 失败,传输中断\n", block.id);
            free(block.data);
            closesocket(clientSocket);
            fclose(fp);
            WSACleanup();
            return -1;
        }

        displayProgress(i + 1, totalBlocks);
        free(block.data);
    }

    printf("\n文件传输完成。\n");

    // 等待服务器确认
    char confirmation[BUFFER_SIZE];
    int received = recv(clientSocket, confirmation, BUFFER_SIZE, 0);
    if (received > 0) {
        confirmation[received] = '\0';
        printf("服务器响应: %s\n", confirmation);
    } else {
        printf("未收到服务器确认\n");
    }

    fclose(fp);
    closesocket(clientSocket);
    WSACleanup();

    printf("上传成功。按任意键退出...\n");
    getchar();
    return 0;
}

服务器端代码

#include "p2p_common.h" 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <windows.h> 
#include <winsock2.h> // Windows套接字库
#include <ws2tcpip.h> // TCP/IP协议相关库
#include <process.h> 

#pragma comment(lib, "ws2_32.lib") // 链接ws2_32库
#pragma execution_character_set("utf-8") // 设置执行字符集为UTF-8
#define MAX_BLOCKS 1000000 // 定义最大块数
#define BUFFER_SIZE 1024 // 定义缓冲区大小

FileBlock* receivedBlocks = NULL; // 接收到的文件块数组
int blockCount = 0; // 已接收的块数
CRITICAL_SECTION blockMutex; // 用于线程同步的临界区
int serverRunning = 1; // 服务器运行状态标志
int totalBlocksExpected = 0; // 预期接收的总块数

// 设置控制台代码页为UTF-8
void setConsoleCodePage() {
    SetConsoleOutputCP(65001); // 设置输出代码页为UTF-8
    SetConsoleCP(65001); // 设置输入代码页为UTF-8
}

// CRC32校验和计算函数
uint32_t calculateCRC32(const char* data, int32_t size) {
    uint32_t crc = 0xFFFFFFFF;
    for (int i = 0; i < size; i++) {
        crc ^= data[i];
        for (int j = 0; j < 8; j++) {
            crc = (crc >> 1) ^ (0xEDB88320 & -(crc & 1));
        }
    }
    return ~crc;
}
// 文件重组函数
void reassembleFile(const char* filename) {
    FILE* file = fopen(filename, "wb"); // 以二进制写模式打开文件
    if (file == NULL) {
        perror("无法创建输出文件");
        return;
    }

    // 写入文件
    for (int i = 0; i < totalBlocksExpected; i++) {
        fwrite(receivedBlocks[i].data, 1, receivedBlocks[i].size, file);
        //fwrite(const void *ptr, size_t size, size_t count, FILE *stream);
        free(receivedBlocks[i].data); // 释放内存
    }

    fclose(file); // 关闭文件
    printf("文件已重组并保存为 %s\n", filename);
}

// 处理客户端连接的线程函数
unsigned __stdcall handleClient(void* arg) {
    SOCKET clientSocket = *(SOCKET*)arg; // 客户端套接字
    FileBlock block;
    char buffer[BUFFER_SIZE];
    int bytesReceived;

    // 接收总块数针对服务器的预期
    if (totalBlocksExpected == 0) {//开始接收文件块之前,服务器需要知道总共有多少个文件块需要接收
        if (recv(clientSocket, (char*)&totalBlocksExpected, sizeof(totalBlocksExpected), 0) <= 0) {
            printf("接收总块数失败\n");
            closesocket(clientSocket);
            return 0;
        }
        printf("预期接收总块数:%d\n", totalBlocksExpected);
        receivedBlocks = (FileBlock*)malloc(sizeof(FileBlock) * totalBlocksExpected);
        if (receivedBlocks == NULL) {
            printf("内存分配失败\n");
            closesocket(clientSocket);
            return 0;
        }
    }

    while (1) {
        // 接收块ID
        if (recv(clientSocket, (char*)&block.id, sizeof(block.id), 0) <= 0) {
            break;
        }

        // 接收数据大小
        if (recv(clientSocket, (char*)&block.size, sizeof(block.size), 0) <= 0) {
            break;
        }

        // 分配内存并接收数据
        block.data = (char*)malloc(block.size);
        int totalReceived = 0;
        while (totalReceived < block.size) {
            bytesReceived = recv(clientSocket, buffer, (BUFFER_SIZE<block.size - totalReceived?BUFFER_SIZE:block.size - totalReceived), 0);
            if (bytesReceived <= 0) {
                free(block.data);
                goto cleanup;
            }
            memcpy(block.data + totalReceived, buffer, bytesReceived);
            totalReceived += bytesReceived;
        }

        // 接收CRC32校验和
        if (recv(clientSocket, (char*)&block.crc32, sizeof(block.crc32), 0) <= 0) {
            free(block.data);
            break;
        }

        // 验证CRC32校验和
        uint32_t calculatedCRC32 = calculateCRC32(block.data, block.size);
        if (calculatedCRC32 != block.crc32) {
            printf("块 %lld 的CRC32校验和不匹配\n", block.id);
            free(block.data);
            continue;
        }

        // 将块添加到接收列表
        EnterCriticalSection(&blockMutex);
        if (block.id < totalBlocksExpected) {
            receivedBlocks[block.id] = block;
            blockCount++;
            printf("接收到块 %lld,大小:%d 字节(%d/%d)\n", block.id, block.size, blockCount, totalBlocksExpected);
            
            if (blockCount == totalBlocksExpected) {
                printf("已接收所有块,准备重组文件...\n");
                reassembleFile("demo.mp4");
                serverRunning = 0;
                send(clientSocket, "File received successfully", 27, 0);
            }
        } else {
            printf("收到无效的块ID: %lld\n", block.id);
            free(block.data);
        }
        LeaveCriticalSection(&blockMutex);
    }

cleanup:
    closesocket(clientSocket);
    return 0;
}

// 服务器线程函数
unsigned __stdcall serverThread(void* arg) {
    SOCKET serverSocket = socket(AF_INET, SOCK_STREAM, 0); // 创建服务器套接字
    if (serverSocket == INVALID_SOCKET) {
        perror("无法创建套接字");
        return 1;
    }

    struct sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(PORT);
    serverAddr.sin_addr.s_addr = INADDR_ANY;

    if (bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
        perror("绑定套接字失败");
        closesocket(serverSocket);
        return 1;
    }

    if (listen(serverSocket, 5) == SOCKET_ERROR) {
        perror("监听失败");
        closesocket(serverSocket);
        return 1;
    }

    printf("服务器正在监听端口 %d\n", PORT);

    while (serverRunning) {
        struct sockaddr_in clientAddr;
        int clientAddrLen = sizeof(clientAddr);
        SOCKET clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen);
        if (clientSocket == INVALID_SOCKET) {
            if (serverRunning) {
                perror("接受连接失败");
            }
            continue;
        }

        unsigned threadId;
        HANDLE threadHandle = (HANDLE)_beginthreadex(NULL, 0, handleClient, &clientSocket, 0, &threadId);
        if (threadHandle == NULL) {
            perror("创建线程失败");
            closesocket(clientSocket);
        } else {
            CloseHandle(threadHandle);
        }
    }

    closesocket(serverSocket);
    return 0;
}

// 主函数
int main() {
    setConsoleCodePage(); // 设置控制台代码页

    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        perror("WSAStartup 失败");
        return 1;
    }

    InitializeCriticalSection(&blockMutex); // 初始化临界区

    unsigned threadId;
    HANDLE serverThreadHandle = (HANDLE)_beginthreadex(NULL, 0, serverThread, NULL, 0, &threadId);

    if (serverThreadHandle == NULL) {
        perror("创建服务器线程失败");
        DeleteCriticalSection(&blockMutex);
        WSACleanup();
        return 1;
    }

    printf("服务器已启动。等待接收文件块...\n");

    WaitForSingleObject(serverThreadHandle, INFINITE); // 等待服务器线程结束
    CloseHandle(serverThreadHandle);

    DeleteCriticalSection(&blockMutex); // 删除临界区
    if (receivedBlocks) {
        free(receivedBlocks); // 释放内存
    }
    WSACleanup(); // 清理Winsock库

    printf("程序已完成。按回车键退出...\n");
    getchar();

    return 0;
}

效果展示

请添加图片描述


http://www.kler.cn/a/287184.html

相关文章:

  • 深度学习常见术语解释
  • Mac 上如何安装Mysql? 如何配置 Mysql?以及如何开启并使用MySQL
  • 光谱相机如何还原色彩
  • 简述mysql 主从复制原理及其工作过程,配置一主两从并验证。
  • 服务器一次性部署One API + ChatGPT-Next-Web
  • 创建一个简单的spring boot+vue前后端分离项目
  • 数据结构(邓俊辉)学习笔记】串 08——KMP算法:再改进
  • 【MinIO 安装与使用】(新版本-随系统启动)
  • 危化品如何在室外安全暂存
  • 动手学深度学习(pytorch)学习记录21-读写文件(模型与参数)[学习记录]
  • Oracle rac模式下undo表空间爆满的解决
  • 部署project_exam_system项目——及容器的编排
  • stm32开发之rt-thread使SysTick处于微妙级运行时,出现的问题记录
  • GraphPad Prism下载安装教程怎样中文汉化
  • 第3章-03-Python库Requests安装与讲解
  • 机器学习数学公式推导之线性回归
  • 系统监控和命令行环境
  • python中**字典的含义
  • MATLAB下的粒子滤波例程|三维非线性模型|组合导航|PF代码(无需下载,直接复制到MATLAB上即可运行)
  • http的三次握手和四次挥手
  • 制造企业SRM系统中如何进行供应商的管理
  • 质量小议43 - 提效
  • 如何通过选择合适的编程工具来提升编程效率
  • 零基础5分钟上手亚马逊云科技-高可用负载均衡器
  • 浅谈SpringMvc的核心流程与组件
  • 零基础学习Redis(7) -- hash类型命令使用