当前位置: 首页 > article >正文

6.824/6.5840 Lab 2: Key/Value Server

故事里能毁坏的只有风景

谁也摧毁不了我们的梦境

弦月旁的流星划过了天际

我许下 的愿望 该向谁 去说明

——我是如此相信

完整代码见: https://github.com/SnowLegend-star/6.824

还是那句话,尽量只是参考思路而不是照抄 

先阅读几遍实验说明的Introduction部分,Lab的核心就在下面这段话:

客户端可以向键值服务器发送三种不同的RPC:Put(key, value)、Append(key, arg)和Get(key)。服务器维护一个内存中的键值对映射。键和值都是字符串。Put(key, value)为特定键安装或替换值,Append(key, arg)将arg追加到键的值上并返回旧值,Get(key)获取键的当前值。对于不存在的键,Get应该返回一个空字符串。对于不存在的键,Append应该表现得像现有值是零长度字符串一样。每个客户端通过Clerk与服务器进行Put/Append/Get方法的RPC交互。Clerk管理与服务器的RPC交互。

Client有三个操作请求,而Server负责接收这三个操作请求并返回。那么该如何着手呢?先给KVServer声明一个map[string]string类型的映射来维护映射。有了操作对象,接下来就可以有的放矢了。

再看看什么是线性化:

操作的线性化(Linearizability)是分布式系统中一致性模型的一种,它确保操作看起来像是一次一个地立即执行的。具体来说,线性化具有以下特性:

  1. 实时性:每个操作在某个瞬间发生,这个瞬间称为线性化点(linearization point)。
  2. 顺序性:如果一个操作在另一个操作之前完成,那么第一个操作的线性化点必须在第二个操作的线性化点之前。

简单来说,线性化保证了系统操作的可见顺序与实际执行顺序一致,这对开发和调试分布式系统非常重要,因为它使系统行为更具可预测性。

线性化的具体要求

  1. 实时顺序:假设有两个操作A和B。如果操作A在操作B之前完成(即,A的调用返回时,B还未开始),那么A的效果对B应该是可见的。换句话说,操作B应该能够观察到操作A的结果。
  2. 原子性:操作要么完全发生,要么完全不发生。任何一个操作都不能部分完成。对于键值存储来说,这意味着读和写操作都必须是原子的。

 

例子

假设我们有一个键值存储系统,系统中有两个客户端,Client 1 和 Client 2。以下是操作序列:

Client 1 向键 "x" 写入值 "A"(Put("x", "A"))。

Client 2 从键 "x" 读取值(Get("x"))。

Client 1 将键 "x" 的值改为 "B"(Put("x", "B"))。

Client 2 再次从键 "x" 读取值(Get("x"))。

线性化要求:

如果 Client 2 在操作3之前进行操作2(即读取操作),它应该读取到的是值 "A"。

如果 Client 2 在操作3之后进行操作2(即读取操作),它应该读取到的是值 "B"。

如果 Client 2 在操作1之后进行操作2(即读取操作),它不能读取到操作1之前的任何值(即不能读取到空值或其他旧值)。

这样,任何并发操作的结果都必须与某个串行化执行顺序一致,即每个操作看起来都是在某个时刻瞬间完成的。

为什么线性化很重要?

线性化使得系统行为对开发人员和用户来说更加直观和可预测。它避免了许多由于并发和网络延迟导致的一致性问题,是构建可靠分布式系统的基础。例如,在银行系统中,如果转账操作不是线性化的,用户可能会看到账户余额不一致的情况,这会导致严重的问题。

总之,线性化确保了系统中的所有操作都可以按某种顺序一一排列,使得分布式系统在面对并发操作时仍然具有一致的行为。

了解了上述两点,可以正式开始阅读part01了。

Key/value server with no network failures (easy)

您的第一个任务是在没有消息丢失的情况下实现一个解决方案。

即不用考虑Server的回复丢失导致Client需要重新发送请求的情况。

您需要在client.go中向Clerk的Put/Append/Get方法添加RPC发送代码,并在server.go中实现Put、Append()和Get() RPC处理程序。

按部就班就行。注意一点——Server中所有涉及并发的方法应当加锁操作

当您通过测试套件中的前两个测试时,您就完成了这个任务:“one client”和“many clients”。

假如part01没有出现眼花的情况应该是很快就可以完成的,但不巧的是我经典看岔代码导致出现了个bug。

  • 要严格注意字符串拼接的顺序。
kv.kvStorage[args.Key] = args.Value + oldValue

       我这里直接把顺序拼接反了,导致最后的输出和要求输出完美形成对称~

Key/value server with dropped messages (easy)

基本上通过了part01后,就只会有 “Test: unreliable net, many clients ...”这一个fail的测试点了。所以我们的注意力就只要集中在处理这一点上。

下面来看实验说明:

现在,您应该修改解决方案以在消息丢失的情况下继续运行(例如,RPC请求和RPC回复)。如果消息丢失,那么客户端的ck.server.Call()将返回false(更准确地说,Call()等待回复消息的时间间隔,如果在该时间内没有收到回复,则返回false)。您将面临的问题之一是Clerk可能需要多次发送RPC直到成功。然而,每次调用Clerk.Put()或Clerk.Append()都应该只执行一次,因此您必须确保重发不会导致服务器执行请求两次。向Clerk添加代码以在未收到回复时重试,并向server.go添加代码以在需要时过滤重复。以下是处理重复检测的指导。

开始我还以为要自己设置时间间隔来处理Call()出现消息丢失的情况。后来才发现Call()是自动处理信息丢失的情况从而返回false,我属实是多虑了。

Hints

1、您需要唯一标识客户端操作,以确保键值服务器只执行每个操作一次。

每个客户端都有唯一的Clerk。每个操作都有唯一的id。在Server维护一个操作请求处理情况的映射即可。

是不是Server端又要添加一个处理Client端请求操作成功从而返回消息的handler?可要可不要。

2、您需要仔细考虑服务器必须维护的状态以处理重复的Get()、Put()和Append()请求,如果有的话。

对这个hint的理解很重要。所谓重复处理,就是Server会再次调用资源去处理已经处理好的operation。

我们发现Get()这个operation就是从Server中获取一个对应的Value即可。无论是否重复处理都只要一步操作,所以在这里进行过滤Get()重复与否其实结果都是一致的。但是,如果Client需要通过中转才会访问到Server,我们倒是可以在中转处设置一个cahce来减少对Server的重复访问

Put()的重复请求过滤与否也无所谓。毕竟它也只有一步操作哈哈哈。

重点是Append()。如果是第一次处理Append(),那Server需要获得对应Key的oldValue,并且修改给定Key对应的Value。但是对于重复的Append(),我们只需要返回oldValue即可,不用重复修改Key对应的Value了。

3、您的重复检测方案应该快速释放服务器内存,例如每个RPC隐含着客户端已经看到了其上一个RPC的回复 。可以假设一个客户端一次只会调用一个Clerk。

上述要求的意思是,在设计重复检测机制时,服务器需要高效地管理和释放内存,避免长时间保留无用的记录。具体来说,每个RPC请求隐含地表示客户端已经成功处理了之前的RPC请求的回复。因此,服务器可以安全地删除那些已经被客户端确认接收的RPC请求记录。

这意味着服务器只需要保留每个客户端最近处理的RPC请求记录,不需要保存所有的历史记录

一言蔽之,part02要完成的工作就两点:

  • Server维护每个operation的完成情况opComplete
  • Server维护每个operation得到的处理结构opReply
  • Server收到Client发来的operation已完成的消息后,删除这个operation的完成情况和处理结果,即删除opComplete和opReply对应的表项。

最后贴一张Lab通过的截图~

具体代码

这里贴份代码吧,具体代码可以去github看。

server.go

package kvsrv

import (
	"log"
	"sync"
)

const Debug = true

func DPrintf(format string, a ...interface{}) (n int, err error) {
	if Debug {
		log.Printf(format, a...)
	}
	return
}

type KVServer struct {
	mu sync.Mutex

	// Your definitions here.
	kvStorage  map[string]string //存储键值对映射
	opReply    map[int64]string  //维护每个请求应该收到的回复
	opComplete map[int64]int     //维护每个请求是否完成
}

func (kv *KVServer) TaskComplete_Handeler(args *TaskCompleteArgs, reply *TaskCompleteReply) {
	kv.mu.Lock()
	defer kv.mu.Unlock()
	delete(kv.opComplete, args.Identifier) //处理完这个请求后直接删除记录
	delete(kv.opReply, args.Identifier)    //及时删除记录
}

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	// Your code here.
	kv.mu.Lock()
	defer kv.mu.Unlock()

	//只有未完成的操作才可以被执行
	reply.Value = kv.kvStorage[args.Key]

}

func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) { //Put方法不需要返回值
	// Your code here.
	kv.mu.Lock()
	defer kv.mu.Unlock()

	if kv.opComplete[args.Identifier] == 0 {
		kv.opComplete[args.Identifier] = 1
		kv.kvStorage[args.Key] = args.Value
	}

}

func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
	// Your code here.
	kv.mu.Lock()
	defer kv.mu.Unlock()

	if kv.opComplete[args.Identifier] == 0 {
		kv.opComplete[args.Identifier] = 1
		oldValue := kv.kvStorage[args.Key]
		kv.kvStorage[args.Key] = oldValue + args.Value
		reply.Value = oldValue

		kv.opReply[args.Identifier] = oldValue //保存这个operation处理的结果
	} else {
		reply.Value = kv.opReply[args.Identifier]
	}

}

func StartKVServer() *KVServer {
	kv := new(KVServer)

	// You may need initialization code here.
	kv.kvStorage = make(map[string]string)
	kv.opReply = make(map[int64]string)
	kv.opComplete = make(map[int64]int)
	return kv
}

client.go

package kvsrv

import (
	"crypto/rand"
	"math/big"

	"6.5840/labrpc"
)

type Clerk struct {
	server *labrpc.ClientEnd
	// You will have to modify this struct.
	identifier []int64 //每个请求的标识符集合
}

func nrand() int64 {
	max := big.NewInt(int64(1) << 62)
	bigx, _ := rand.Int(rand.Reader, max)
	x := bigx.Int64()
	return x
}

func MakeClerk(server *labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.server = server
	// You'll have to add code here.
	ck.identifier = make([]int64, 0)
	return ck
}

// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string {

	// You will have to modify this function.
	id := nrand()
	args := GetArgs{
		Key:        key,
		Identifier: id,
	}
	reply := GetReply{}

	ok := ck.server.Call("KVServer.Get", &args, &reply)

	for ok == false {
		// fmt.Println("Get操作失败,自动重试")
		ok = ck.server.Call("KVServer.Get", &args, &reply)
	}

	argsTaskComplete := TaskCompleteArgs{
		Identifier: id,
	}
	replyTaskComplete := TaskCompleteReply{}
	ok = ck.server.Call("KVServer.TaskComplete_Handeler", &argsTaskComplete, &replyTaskComplete)

	for ok == false {
		// fmt.Println("Server没有收到Get操作完成的通知")
		ok = ck.server.Call("KVServer.TaskComplete_Handeler", &argsTaskComplete, &replyTaskComplete)
	}

	return reply.Value
}

// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer."+op, &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) PutAppend(key string, value string, op string) string {
	// You will have to modify this function.
	id := nrand()
	args := PutAppendArgs{
		Key:        key,
		Value:      value,
		Identifier: id,
	}
	reply := PutAppendReply{}

	ret := ""
	ok := ck.server.Call("KVServer."+op, &args, &reply)

	for ok == false {
		// fmt.Printf("%v操作失败\n", op)
		ok = ck.server.Call("KVServer."+op, &args, &reply)
	}
	ret = reply.Value

	argsTaskComplete := TaskCompleteArgs{
		Identifier: id,
	}
	replyTaskComplete := TaskCompleteReply{}
	ok = ck.server.Call("KVServer.TaskComplete_Handeler", &argsTaskComplete, &replyTaskComplete)

	for ok == false {
		// fmt.Println("Server没有收到PutAppend操作完成的通知")
		ok = ck.server.Call("KVServer.TaskComplete_Handeler", &argsTaskComplete, &replyTaskComplete)
	}

	return ret
}

func (ck *Clerk) Put(key string, value string) {
	ck.PutAppend(key, value, "Put")
}

// Append value to key's value and return that value
func (ck *Clerk) Append(key string, value string) string {
	return ck.PutAppend(key, value, "Append")
}


http://www.kler.cn/a/419646.html

相关文章:

  • 如何加强游戏安全,防止定制外挂影响游戏公平性
  • autoResultMap = true 作用, @TableName(value = ““, autoResultMap = true)
  • CAD 二次开发入门与实践:以 C# 为例
  • Centos7安装MySQL8.0详细教程(压缩包安装方式)
  • ESP32-S3模组上跑通ES8388(13)
  • node.js @ffmpeg-installer/ffmpeg 桌面推流
  • 农业强国助农平台:科技赋能,助力乡村振兴
  • 【学习笔记】检测基于RTOS的设计中的堆栈溢出-第2部分
  • 威胁驱动的网络安全方法论
  • 如何在 Ubuntu 18.04 上设置 Apache 虚拟主机
  • 家校通小程序实战教程03学生管理
  • 【特斯拉的自动驾驶好在哪】
  • kernel crash数据解析
  • 【Linux】————(日志、线程池及死锁问题)
  • 贪心算法专题(四)
  • Linux的奇妙冒险——进程PCB第一讲
  • 前缀和篇——繁星斗斗数字交织中,觅得效率明月辉光(1)
  • 利用oracle spool配置数据导出脚本
  • 5.2.2 动作标记 getproperty
  • Linux的基本操作及虚拟机设置
  • Spring中@Transactional注解与事务传播机制
  • 【小记】如何刷机
  • Linux:内存文件 基础io
  • 【云原生系列】如何判断哪家云服务器提供商更适合我
  • 基于Matlab BP神经网络的电力负荷预测模型研究与实现
  • 大数据技术Kafka详解 ② | Kafka基础与架构介绍