当前位置: 首页 > article >正文

超硬核区块链算法仿真:联盟链PBFT多线程仿真实现 :c语言完全详解版

1 22年年底想用gpt做出一个pbft的算法仿真,到了25年终于可以结合gpt grok perplexcity deepseek等实现了!!!!!


1.1简化版

// 定义 Windows 版本,确保条件变量相关函数可用
#define _WIN32_WINNT 0x0600

#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <time.h>
#include <locale.h>

// 定义常量:节点数、请求数
#define NODE_COUNT 4         // 1 个主节点(编号 0)和 3 个副本
#define REQUEST_COUNT 10     // 模拟 10 个请求

// 消息类型枚举:PBFT 的三个阶段
typedef enum {
    MSG_PREPREPARE,  // 主节点发送,初始广播请求
    MSG_PREPARE,     // 副本收到 Pre-Prepare 后回复 Prepare 消息
    MSG_COMMIT       // 副本收到 Commit 后回复 Commit 消息
} MsgType;

// 消息结构体:用于节点间通信
typedef struct Message {
    int request_id;         // 请求编号
    MsgType type;           // 消息类型
    int sender;             // 发送节点编号
    struct Message *next;   // 链表指针(用于构建消息队列)
} Message;

// 消息队列结构体:每个节点拥有一个独立的队列
typedef struct {
    Message *head;
    Message *tail;
    CRITICAL_SECTION cs;    // 互斥锁,保证队列操作的线程安全
    CONDITION_VARIABLE cv;  // 条件变量,用于等待队列不为空
} MessageQueue;

// 初始化消息队列
void init_queue(MessageQueue *q) {
    q->head = q->tail = NULL;
    InitializeCriticalSection(&q->cs);
    InitializeConditionVariable(&q->cv);
}

// 入队操作:将消息加入队列尾部
void enqueue(MessageQueue *q, Message *msg) {
    EnterCriticalSection(&q->cs);
    msg->next = NULL;
    if (q->tail == NULL) {
        q->head = q->tail = msg;
    } else {
        q->tail->next = msg;
        q->tail = msg;
    }
    WakeConditionVariable(&q->cv);
    LeaveCriticalSection(&q->cs);
}

// 出队操作:若队列为空,则阻塞等待
Message* dequeue(MessageQueue *q) {
    EnterCriticalSection(&q->cs);
    while (q->head == NULL) {
        SleepConditionVariableCS(&q->cv, &q->cs, INFINITE);
    }
    Message *msg = q->head;
    q->head = msg->next;
    if (q->head == NULL)
        q->tail = NULL;
    LeaveCriticalSection(&q->cs);
    return msg;
}

// 模拟网络延迟(简单随机延迟,单位毫秒)
DWORD simulate_delay() {
    return (DWORD)(rand() % 50);  // 延迟 0-49 毫秒
}

// 全局消息队列数组:每个节点一个队列
MessageQueue nodeQueues[NODE_COUNT];

// 非主节点(副本)的线程函数
DWORD WINAPI node_thread_proc(LPVOID param) {
    int node_id = *(int*)param;
    while (1) {
        Message *msg = dequeue(&nodeQueues[node_id]);
        // 检查终止信号:当 request_id 和 type 均为 -1 时退出
        if (msg->request_id == -1 && msg->type == (MsgType)-1) {
            free(msg);
            break;
        }
        // 模拟网络延迟
        Sleep(simulate_delay());
        // 副本处理消息:收到 Pre-Prepare 回复 Prepare;收到 Commit 回复 Commit
        if (msg->type == MSG_PREPREPARE) {
            Message *reply = (Message*)malloc(sizeof(Message));
            reply->request_id = msg->request_id;
            reply->type = MSG_PREPARE;
            reply->sender = node_id;
            enqueue(&nodeQueues[0], reply); // 回复发送给主节点(节点0)
        } else if (msg->type == MSG_COMMIT) {
            Message *reply = (Message*)malloc(sizeof(Message));
            reply->request_id = msg->request_id;
            reply->type = MSG_COMMIT;
            reply->sender = node_id;
            enqueue(&nodeQueues[0], reply);
        }
        free(msg);
    }
    return 0;
}

int main() {
    // 设置控制台编码和区域,支持中文输出
    SetConsoleOutputCP(CP_UTF8);
    setlocale(LC_ALL, "zh_CN.UTF-8");
    srand((unsigned int)time(NULL));

    // 初始化每个节点的消息队列
    for (int i = 0; i < NODE_COUNT; i++) {
        init_queue(&nodeQueues[i]);
    }

    // 创建非主节点线程(节点1到 NODE_COUNT-1)
    HANDLE nodeThreads[NODE_COUNT - 1];
    int nodeIds[NODE_COUNT - 1];
    for (int i = 1; i < NODE_COUNT; i++) {
        nodeIds[i - 1] = i;
        nodeThreads[i - 1] = CreateThread(NULL, 0, node_thread_proc, &nodeIds[i - 1], 0, NULL);
    }

    // 定义 PBFT 共识阈值(对于 4 个节点,f = floor((n-1)/3)=1,因此阈值可以设为 2,即需要至少 2 个 Prepare/Commit 回复)
    int threshold = 2;

    // 模拟 PBFT 共识过程,每个请求依次执行
    for (int req = 0; req < REQUEST_COUNT; req++) {
        printf("\n====== 请求 %d ======\n", req);
        // 主节点(节点0)发送 Pre-Prepare 消息给所有副本
        for (int i = 1; i < NODE_COUNT; i++) {
            Message *msg = (Message*)malloc(sizeof(Message));
            msg->request_id = req;
            msg->type = MSG_PREPREPARE;
            msg->sender = 0;
            enqueue(&nodeQueues[i], msg);
        }
        // 主节点自身模拟处理 Pre-Prepare 延迟
        Sleep(simulate_delay());

        // 收集 Prepare 消息
        int prepareCount = 0;
        while (prepareCount < threshold) {
            Message *reply = dequeue(&nodeQueues[0]); // 从主节点的队列中取消息
            if (reply->request_id == req && reply->type == MSG_PREPARE) {
                prepareCount++;
                printf("收到 Prepare 消息,发送者: %d\n", reply->sender);
            }
            free(reply);
        }
        printf("Pre-Prepare 阶段完成,收到 %d 个 Prepare 回复\n", prepareCount);

        // 主节点发送 Commit 消息给所有副本
        for (int i = 1; i < NODE_COUNT; i++) {
            Message *msg = (Message*)malloc(sizeof(Message));
            msg->request_id = req;
            msg->type = MSG_COMMIT;
            msg->sender = 0;
            enqueue(&nodeQueues[i], msg);
        }
        // 主节点自身模拟处理 Commit 延迟
        Sleep(simulate_delay());

        // 收集 Commit 消息
        int commitCount = 0;
        while (commitCount < threshold) {
            Message *reply = dequeue(&nodeQueues[0]);
            if (reply->request_id == req && reply->type == MSG_COMMIT) {
                commitCount++;
                printf("收到 Commit 消息,发送者: %d\n", reply->sender);
            }
            free(reply);
        }
        printf("Commit 阶段完成,收到 %d 个 Commit 回复,共识达成!\n", commitCount);
    }

    // 发送终止信号给所有非主节点线程
    for (int i = 1; i < NODE_COUNT; i++) {
        Message *term = (Message*)malloc(sizeof(Message));
        term->request_id = -1;
        term->type = (MsgType)-1;
        term->sender = -1;
        enqueue(&nodeQueues[i], term);
    }

    // 等待所有非主节点线程结束
    WaitForMultipleObjects(NODE_COUNT - 1, nodeThreads, TRUE, INFINITE);

    // 清理:删除每个队列的互斥锁和关闭线程句柄
    for (int i = 0; i < NODE_COUNT; i++) {
        DeleteCriticalSection(&nodeQueues[i].cs);
    }
    for (int i = 0; i < NODE_COUNT - 1; i++) {
        CloseHandle(nodeThreads[i]);
    }

    printf("\nPBFT 仿真结束!\n");
    return 0;
}




1.2deepseek 改进版:

#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <stdbool.h>

// 可配置参数
#define MIN_NODES 4
#define DEFAULT_NODES 10
#define DEFAULT_REQUESTS 20
#define LOG_LEVEL 1

// 自动计算共识阈值
#define CONSENSUS_THRESHOLD (2 * ((NODE_COUNT - 1) / 3))

// 类型定义
typedef enum { MSG_REQUEST, MSG_PREPREPARE, MSG_PREPARE, MSG_COMMIT } MsgType;

typedef struct Message {
    MsgType type;
    int sender;
    int request_id;
    int sequence;
    struct Message* next;
} Message;

typedef struct {
    Message* head;
    Message* tail;
    CRITICAL_SECTION cs;
    CONDITION_VARIABLE cv;
} MessageQueue;

typedef struct {
    int id;
    bool is_primary;
    int last_sequence;
    MessageQueue queue;
    int* received_prepare; // 动态数组
    int* received_commit;
} PBFTNode;

// 全局变量
PBFTNode* nodes = NULL;
HANDLE* node_threads = NULL;
CRITICAL_SECTION log_cs;
int NODE_COUNT = DEFAULT_NODES;
int REQUEST_COUNT = DEFAULT_REQUESTS;
PerformanceStats perf_stats;

// 简洁日志系统
#if LOG_LEVEL >= 1
void node_log(int node_id, const char* format, ...) {
    EnterCriticalSection(&log_cs);
    va_list args;
    printf("[N%d] ", node_id);
    va_start(args, format);
    vprintf(format, args);
    va_end(args);
    printf("\n");
    LeaveCriticalSection(&log_cs);
}
#else
#define node_log(...)
#endif

// 初始化节点
void init_node(int id) {
    nodes[id].id = id;
    nodes[id].is_primary = (id == 0);
    nodes[id].last_sequence = -1;
    nodes[id].received_prepare = calloc(NODE_COUNT, sizeof(int));
    nodes[id].received_commit = calloc(NODE_COUNT, sizeof(int));
    
    InitializeCriticalSection(&nodes[id].queue.cs);
    InitializeConditionVariable(&nodes[id].queue.cv);
    nodes[id].queue.head = nodes[id].queue.tail = NULL;
}

// 高效广播(减少内存复制)
void broadcast(int sender, Message* msg) {
    Message template = *msg;
    for (int i = 0; i < NODE_COUNT; i++) {
        if (i == sender) continue;

        Message* copy = malloc(sizeof(Message));
        *copy = template;
        
        EnterCriticalSection(&nodes[i].queue.cs);
        if (nodes[i].queue.tail) {
            nodes[i].queue.tail->next = copy;
        } else {
            nodes[i].queue.head = copy;
        }
        nodes[i].queue.tail = copy;
        WakeConditionVariable(&nodes[i].queue.cv);
        LeaveCriticalSection(&nodes[i].queue.cs);
        
        LOG_DEBUG(sender, "Sent to N%d", i);
    }
    free(msg);
}

// 共识处理线程
DWORD WINAPI node_process(LPVOID param) {
    int id = *(int*)param;
    while (1) {
        // 消息处理逻辑(同前)
        // ...
        
        // 性能统计
        if (msg->type == MSG_COMMIT && count >= CONSENSUS_THRESHOLD) {
            InterlockedIncrement(&perf_stats.completed_requests);
        }
    }
    return 0;
}

int main(int argc, char* argv[]) {
    // 解析命令行参数
    if (argc >= 2) NODE_COUNT = atoi(argv[1]);
    if (argc >= 3) REQUEST_COUNT = atoi(argv[2]);
    
    // 初始化系统
    InitializeCriticalSection(&log_cs);
    nodes = malloc(NODE_COUNT * sizeof(PBFTNode));
    node_threads = malloc(NODE_COUNT * sizeof(HANDLE));
    
    int* ids = malloc(NODE_COUNT * sizeof(int));
    for (int i = 0; i < NODE_COUNT; i++) {
        ids[i] = i;
        init_node(i);
    }
    
    // 启动线程
    for (int i = 0; i < NODE_COUNT; i++) {
        node_threads[i] = CreateThread(NULL, 0, node_process, &ids[i], 0, NULL);
    }
    
    // 性能统计
    perf_stats.start_time = clock();
    
    // 发送请求
    for (int req = 0; req < REQUEST_COUNT; req++) {
        broadcast(-1, &(Message){.type=MSG_REQUEST, .request_id=req});
        Sleep(100); // 控制请求速率
    }
    
    // 等待完成
    Sleep(1000); 
    
    // 终止和清理
    // ...
    
    perf_stats.end_time = clock();
    print_performance_stats();
    
    return 0;
}

2  以下是调试和运行结果:


3.详细版本解释:

这个程序模拟了一个小型区块链网络:

节点:80个节点,其中节点0是主节点(负责发起请求),节点1到79是从节点(负责响应)。
目标:处理100个请求,通过PBFT的三个阶段(Pre-Prepare、Prepare、Commit)达成共识。
通信方式:每个节点有自己的消息队列,主节点和从节点通过队列传递消息,模拟网络通信。
并发性:使用多线程(79个从节点线程+1个主线程),模拟区块链节点的并行工作。
延迟:每个消息传递有0-10ms的随机延迟,模拟真实网络环境。
同步工具:使用互斥锁和条件变量,确保线程安全和消息顺序。
运行过程可以分为五个主要阶段:

初始化阶段:设置节点、队列和线程。
主节点发起请求:发送Pre-Prepare消息,启动共识。
从节点处理消息:接收并回复Prepare和Commit消息。
主节点收集回复:收集足够的消息,完成共识。
统计和清理:输出性能指标,释放资源。



 


http://www.kler.cn/a/597067.html

相关文章:

  • 集成学习之随机森林
  • 全面总结:编程中的上下文(Context)
  • c#获取使用串口信息
  • 新版 React19使用 react-quill
  • 使用C++在Qt框架下调用DeepSeek的API接口实现自己的简易桌面小助手
  • 【transformer理论+实战(三)】必要的 Pytorch 知识
  • qt介绍之qscreen
  • OpenLayers集成天地图服务开发指南
  • uni-app集成保利威直播、点播SDK经验FQ(二)|小程序直播/APP直播开发适用
  • k8s的存储
  • 遇到一个奇怪问题,页面请求不到后端
  • TCP的“四次挥手“与TIME_WAIT状态详解
  • Linux vim mode | raw / cooked
  • 2025:sql注入详细介绍
  • 权限维持—Linux系统Rootkit后门
  • victoriametrics 部署
  • S32k3XX MCU时钟配置
  • 【Linux】达梦数据库图形如何新建表、 插入表
  • 3. 轴指令(omron 机器自动化控制器)——>MC_SetPosition
  • RAG 技术:让大型语言模型更智能