超硬核区块链算法仿真:联盟链PBFT多线程仿真实现 :c语言完全详解版
1 22年年底想用gpt做出一个pbft的算法仿真,到了25年终于可以结合gpt grok perplexcity deepseek等实现了!!!!!

1.1简化版
// 定义 Windows 版本,确保条件变量相关函数可用
#define _WIN32_WINNT 0x0600
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <time.h>
#include <locale.h>
// 定义常量:节点数、请求数
#define NODE_COUNT 4 // 1 个主节点(编号 0)和 3 个副本
#define REQUEST_COUNT 10 // 模拟 10 个请求
// 消息类型枚举:PBFT 的三个阶段
typedef enum {
MSG_PREPREPARE, // 主节点发送,初始广播请求
MSG_PREPARE, // 副本收到 Pre-Prepare 后回复 Prepare 消息
MSG_COMMIT // 副本收到 Commit 后回复 Commit 消息
} MsgType;
// 消息结构体:用于节点间通信
typedef struct Message {
int request_id; // 请求编号
MsgType type; // 消息类型
int sender; // 发送节点编号
struct Message *next; // 链表指针(用于构建消息队列)
} Message;
// 消息队列结构体:每个节点拥有一个独立的队列
typedef struct {
Message *head;
Message *tail;
CRITICAL_SECTION cs; // 互斥锁,保证队列操作的线程安全
CONDITION_VARIABLE cv; // 条件变量,用于等待队列不为空
} MessageQueue;
// 初始化消息队列
void init_queue(MessageQueue *q) {
q->head = q->tail = NULL;
InitializeCriticalSection(&q->cs);
InitializeConditionVariable(&q->cv);
}
// 入队操作:将消息加入队列尾部
void enqueue(MessageQueue *q, Message *msg) {
EnterCriticalSection(&q->cs);
msg->next = NULL;
if (q->tail == NULL) {
q->head = q->tail = msg;
} else {
q->tail->next = msg;
q->tail = msg;
}
WakeConditionVariable(&q->cv);
LeaveCriticalSection(&q->cs);
}
// 出队操作:若队列为空,则阻塞等待
Message* dequeue(MessageQueue *q) {
EnterCriticalSection(&q->cs);
while (q->head == NULL) {
SleepConditionVariableCS(&q->cv, &q->cs, INFINITE);
}
Message *msg = q->head;
q->head = msg->next;
if (q->head == NULL)
q->tail = NULL;
LeaveCriticalSection(&q->cs);
return msg;
}
// 模拟网络延迟(简单随机延迟,单位毫秒)
DWORD simulate_delay() {
return (DWORD)(rand() % 50); // 延迟 0-49 毫秒
}
// 全局消息队列数组:每个节点一个队列
MessageQueue nodeQueues[NODE_COUNT];
// 非主节点(副本)的线程函数
DWORD WINAPI node_thread_proc(LPVOID param) {
int node_id = *(int*)param;
while (1) {
Message *msg = dequeue(&nodeQueues[node_id]);
// 检查终止信号:当 request_id 和 type 均为 -1 时退出
if (msg->request_id == -1 && msg->type == (MsgType)-1) {
free(msg);
break;
}
// 模拟网络延迟
Sleep(simulate_delay());
// 副本处理消息:收到 Pre-Prepare 回复 Prepare;收到 Commit 回复 Commit
if (msg->type == MSG_PREPREPARE) {
Message *reply = (Message*)malloc(sizeof(Message));
reply->request_id = msg->request_id;
reply->type = MSG_PREPARE;
reply->sender = node_id;
enqueue(&nodeQueues[0], reply); // 回复发送给主节点(节点0)
} else if (msg->type == MSG_COMMIT) {
Message *reply = (Message*)malloc(sizeof(Message));
reply->request_id = msg->request_id;
reply->type = MSG_COMMIT;
reply->sender = node_id;
enqueue(&nodeQueues[0], reply);
}
free(msg);
}
return 0;
}
int main() {
// 设置控制台编码和区域,支持中文输出
SetConsoleOutputCP(CP_UTF8);
setlocale(LC_ALL, "zh_CN.UTF-8");
srand((unsigned int)time(NULL));
// 初始化每个节点的消息队列
for (int i = 0; i < NODE_COUNT; i++) {
init_queue(&nodeQueues[i]);
}
// 创建非主节点线程(节点1到 NODE_COUNT-1)
HANDLE nodeThreads[NODE_COUNT - 1];
int nodeIds[NODE_COUNT - 1];
for (int i = 1; i < NODE_COUNT; i++) {
nodeIds[i - 1] = i;
nodeThreads[i - 1] = CreateThread(NULL, 0, node_thread_proc, &nodeIds[i - 1], 0, NULL);
}
// 定义 PBFT 共识阈值(对于 4 个节点,f = floor((n-1)/3)=1,因此阈值可以设为 2,即需要至少 2 个 Prepare/Commit 回复)
int threshold = 2;
// 模拟 PBFT 共识过程,每个请求依次执行
for (int req = 0; req < REQUEST_COUNT; req++) {
printf("\n====== 请求 %d ======\n", req);
// 主节点(节点0)发送 Pre-Prepare 消息给所有副本
for (int i = 1; i < NODE_COUNT; i++) {
Message *msg = (Message*)malloc(sizeof(Message));
msg->request_id = req;
msg->type = MSG_PREPREPARE;
msg->sender = 0;
enqueue(&nodeQueues[i], msg);
}
// 主节点自身模拟处理 Pre-Prepare 延迟
Sleep(simulate_delay());
// 收集 Prepare 消息
int prepareCount = 0;
while (prepareCount < threshold) {
Message *reply = dequeue(&nodeQueues[0]); // 从主节点的队列中取消息
if (reply->request_id == req && reply->type == MSG_PREPARE) {
prepareCount++;
printf("收到 Prepare 消息,发送者: %d\n", reply->sender);
}
free(reply);
}
printf("Pre-Prepare 阶段完成,收到 %d 个 Prepare 回复\n", prepareCount);
// 主节点发送 Commit 消息给所有副本
for (int i = 1; i < NODE_COUNT; i++) {
Message *msg = (Message*)malloc(sizeof(Message));
msg->request_id = req;
msg->type = MSG_COMMIT;
msg->sender = 0;
enqueue(&nodeQueues[i], msg);
}
// 主节点自身模拟处理 Commit 延迟
Sleep(simulate_delay());
// 收集 Commit 消息
int commitCount = 0;
while (commitCount < threshold) {
Message *reply = dequeue(&nodeQueues[0]);
if (reply->request_id == req && reply->type == MSG_COMMIT) {
commitCount++;
printf("收到 Commit 消息,发送者: %d\n", reply->sender);
}
free(reply);
}
printf("Commit 阶段完成,收到 %d 个 Commit 回复,共识达成!\n", commitCount);
}
// 发送终止信号给所有非主节点线程
for (int i = 1; i < NODE_COUNT; i++) {
Message *term = (Message*)malloc(sizeof(Message));
term->request_id = -1;
term->type = (MsgType)-1;
term->sender = -1;
enqueue(&nodeQueues[i], term);
}
// 等待所有非主节点线程结束
WaitForMultipleObjects(NODE_COUNT - 1, nodeThreads, TRUE, INFINITE);
// 清理:删除每个队列的互斥锁和关闭线程句柄
for (int i = 0; i < NODE_COUNT; i++) {
DeleteCriticalSection(&nodeQueues[i].cs);
}
for (int i = 0; i < NODE_COUNT - 1; i++) {
CloseHandle(nodeThreads[i]);
}
printf("\nPBFT 仿真结束!\n");
return 0;
}
1.2deepseek 改进版:
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <stdbool.h>
// 可配置参数
#define MIN_NODES 4
#define DEFAULT_NODES 10
#define DEFAULT_REQUESTS 20
#define LOG_LEVEL 1
// 自动计算共识阈值
#define CONSENSUS_THRESHOLD (2 * ((NODE_COUNT - 1) / 3))
// 类型定义
typedef enum { MSG_REQUEST, MSG_PREPREPARE, MSG_PREPARE, MSG_COMMIT } MsgType;
typedef struct Message {
MsgType type;
int sender;
int request_id;
int sequence;
struct Message* next;
} Message;
typedef struct {
Message* head;
Message* tail;
CRITICAL_SECTION cs;
CONDITION_VARIABLE cv;
} MessageQueue;
typedef struct {
int id;
bool is_primary;
int last_sequence;
MessageQueue queue;
int* received_prepare; // 动态数组
int* received_commit;
} PBFTNode;
// 全局变量
PBFTNode* nodes = NULL;
HANDLE* node_threads = NULL;
CRITICAL_SECTION log_cs;
int NODE_COUNT = DEFAULT_NODES;
int REQUEST_COUNT = DEFAULT_REQUESTS;
PerformanceStats perf_stats;
// 简洁日志系统
#if LOG_LEVEL >= 1
void node_log(int node_id, const char* format, ...) {
EnterCriticalSection(&log_cs);
va_list args;
printf("[N%d] ", node_id);
va_start(args, format);
vprintf(format, args);
va_end(args);
printf("\n");
LeaveCriticalSection(&log_cs);
}
#else
#define node_log(...)
#endif
// 初始化节点
void init_node(int id) {
nodes[id].id = id;
nodes[id].is_primary = (id == 0);
nodes[id].last_sequence = -1;
nodes[id].received_prepare = calloc(NODE_COUNT, sizeof(int));
nodes[id].received_commit = calloc(NODE_COUNT, sizeof(int));
InitializeCriticalSection(&nodes[id].queue.cs);
InitializeConditionVariable(&nodes[id].queue.cv);
nodes[id].queue.head = nodes[id].queue.tail = NULL;
}
// 高效广播(减少内存复制)
void broadcast(int sender, Message* msg) {
Message template = *msg;
for (int i = 0; i < NODE_COUNT; i++) {
if (i == sender) continue;
Message* copy = malloc(sizeof(Message));
*copy = template;
EnterCriticalSection(&nodes[i].queue.cs);
if (nodes[i].queue.tail) {
nodes[i].queue.tail->next = copy;
} else {
nodes[i].queue.head = copy;
}
nodes[i].queue.tail = copy;
WakeConditionVariable(&nodes[i].queue.cv);
LeaveCriticalSection(&nodes[i].queue.cs);
LOG_DEBUG(sender, "Sent to N%d", i);
}
free(msg);
}
// 共识处理线程
DWORD WINAPI node_process(LPVOID param) {
int id = *(int*)param;
while (1) {
// 消息处理逻辑(同前)
// ...
// 性能统计
if (msg->type == MSG_COMMIT && count >= CONSENSUS_THRESHOLD) {
InterlockedIncrement(&perf_stats.completed_requests);
}
}
return 0;
}
int main(int argc, char* argv[]) {
// 解析命令行参数
if (argc >= 2) NODE_COUNT = atoi(argv[1]);
if (argc >= 3) REQUEST_COUNT = atoi(argv[2]);
// 初始化系统
InitializeCriticalSection(&log_cs);
nodes = malloc(NODE_COUNT * sizeof(PBFTNode));
node_threads = malloc(NODE_COUNT * sizeof(HANDLE));
int* ids = malloc(NODE_COUNT * sizeof(int));
for (int i = 0; i < NODE_COUNT; i++) {
ids[i] = i;
init_node(i);
}
// 启动线程
for (int i = 0; i < NODE_COUNT; i++) {
node_threads[i] = CreateThread(NULL, 0, node_process, &ids[i], 0, NULL);
}
// 性能统计
perf_stats.start_time = clock();
// 发送请求
for (int req = 0; req < REQUEST_COUNT; req++) {
broadcast(-1, &(Message){.type=MSG_REQUEST, .request_id=req});
Sleep(100); // 控制请求速率
}
// 等待完成
Sleep(1000);
// 终止和清理
// ...
perf_stats.end_time = clock();
print_performance_stats();
return 0;
}
2 以下是调试和运行结果:
3.详细版本解释:
这个程序模拟了一个小型区块链网络:
节点:80个节点,其中节点0是主节点(负责发起请求),节点1到79是从节点(负责响应)。
目标:处理100个请求,通过PBFT的三个阶段(Pre-Prepare、Prepare、Commit)达成共识。
通信方式:每个节点有自己的消息队列,主节点和从节点通过队列传递消息,模拟网络通信。
并发性:使用多线程(79个从节点线程+1个主线程),模拟区块链节点的并行工作。
延迟:每个消息传递有0-10ms的随机延迟,模拟真实网络环境。
同步工具:使用互斥锁和条件变量,确保线程安全和消息顺序。
运行过程可以分为五个主要阶段:
初始化阶段:设置节点、队列和线程。
主节点发起请求:发送Pre-Prepare消息,启动共识。
从节点处理消息:接收并回复Prepare和Commit消息。
主节点收集回复:收集足够的消息,完成共识。
统计和清理:输出性能指标,释放资源。