当前位置: 首页 > article >正文

mit6824-02-Lab1:MapReduce分布式实现

文章目录

  • 写在前面
  • 总体思路分析
  • 代码实现
  • 参考链接

写在前面

具体上次写6824的第一篇文章已经过去了快一个月,上次学习了MapReduce论文相关理论后一直没有继续写代码实现,自己一边要搞论文没有整块时间实现,这两天抽写了相关代码,算是对Lab1的一个交代。
我写的大部分代码都是参考别人的已经实现的,我写这篇博客的目的也并不是为了传播高深的技术,我只是记录一下自己从代码开始时的一头雾水到参考别人代码实现了感叹Lab1的伟大之处。方便后续自己回顾。

看到别人博客提及,每个hits都有用,自己在没有写代码时并不能完全理解官方页面提到的Hits的具体实现方式。我觉得最重要的自己把这些实现都搞懂了原理,随后自己再去回顾这些提示感觉写的很秒。

总体思路分析

MapReduce论文理解:
https://blog.csdn.net/weixin_45863010/article/details/142641061
官方Lab1链接:
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
在这里插入图片描述

上面图片简单概述:首先协调者Coordinator启动,并且注册RPC调用通道,随后Woker使用RPC调用去Coordinator获取任务。

  • Worker根本不区分是否是Map-Worker还是ReduceWorker。
  • Worker充当Map还是Reduce的角色是根据任务类型来划分的,如果是Map任务就执行Map任务,如果是Reduce任务就执行Reduce任务。「我在这里没有使用不是Map任务就执行Reduce任务,具体原因可以参考下面代码实现
  • Coordinator启动后首先初始化所有Map任务,当且只有所有Map任务全部执行完成,才能进行下一阶段Reduce,才能够初始化Reduce任务给Worker执行。「Reduce的执行依赖与Worker执行产生的中间文件,会将中间文件根据对应的reduceNum分配给一个Worker执行
  • 对于GFS或者本地磁盘存储Map产生的中间文件非常重要,MapWorker执行任务后会将任务执行结果存储到上述文件系统中,随后Coordinator根据ReduceNum分配需要执行的任务到ReduceWorker。在ReduceWorker获取到所有文件内容后,首先会排序。

我根据我写代码的理解画一个流程图如下:
在这里插入图片描述

代码实现

下面我将给出自己参考别人的代码实现,主要涉及到三个文件,我直接贴出对应的三个文件,代码里面附有详细的注释:

rpc.go

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

// 定义任务结构体
type TaskRequest struct{}

type TaskType int

type Task struct {
	TaskType  TaskType
	TaskId    int
	FileSlice []string
	ReduceNum int
}

// 定义各种任务类型
// WaitingTask是否是必要的:是必要的,解决当Map任务没有全部完成成,此时调用GetTask返回WaitingTask,
// Worker在接受到这个任务状态后进行短暂的休眠,随后继续调用任务执行后续流程
const (
	MapTask TaskType = iota
	ReduceTask
	WaitingTask
	ExitTask
)

type ExampleArgs struct {
	X int
}

type ExampleReply struct {
	Y int
}

// Add your RPC definitions here.

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
	s := "/var/tmp/5840-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

coordinator.go

package mr

import (
	"fmt"
	"io/ioutil"
	"log"
	"strconv"
	"strings"
	"sync"
	"time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"

// Phase 定义当前总体进度,分为三个阶段:Map阶段、Reduce阶段、Done阶段
type Phase int

// State 定义任务状态
type State int

// 当前整体进度,分为三个阶段
const (
	MapPhase Phase = iota
	ReducePhase
	AllDone
)

// 任务状态类型
// 任务对应的三种状态如何切换的:初始任务时将所有任务状态设置为Waiting,
// worker调用rpc执行某个任务时,任务状态由Waiting==>Working
// worker执行任务完成后调用rpc将任务状态有Working==>Done,至此任务完成,上面后两条暂时仅仅针对Map任务
// worker执行Reduce任务时,原理仍然同上,状态由 Waiting==>Working==>Done之间切换
const (
	Working State = iota // 此阶段在工作
	Waiting              // 此阶段在等待执行, 当Map任务没有执行完成此时Reduce任务需要等待
	Done                 // 此阶段已经做完
)

// TaskMetaInfo 保存任务的元数据,任务开始时间、任务状态、任务对应的指针以便后续找到任务
// 思考:为什么不将任务执行状态以及任务执行开始时间,定义到具体任务Task结构体中,也可以这样定义,但不符合要求
// 这些信息不需要暴露给Worker,只要Coordinator维护这些信息,并且能够根据这些信息判断出哪些任务需要执行即可
type TaskMetaInfo struct {
	// 添加任务开始执行时间
	StartTime time.Time
	State     State
	// 传入任务的指针,为了任务从通道中取出来之后,能够通过地址标记这个任务已经完成
	TaskAdr *Task
}

// TaskMetaHolder 任务元信息结构体,主要存储任务进行的状态、任务开始时间以及任务对应的地址「以能够随时更改任务状态」
type TaskMetaHolder struct {
	TaskMeta map[int]*TaskMetaInfo
}

func (t *TaskMetaHolder) acceptTaskMetaInfo(taskMetaInfo *TaskMetaInfo) bool {
	taskId := taskMetaInfo.TaskAdr.TaskId
	meta, _ := t.TaskMeta[taskId]
	if meta != nil {
		fmt.Printf("[acceptTaskMetaInfo] the taskId :%v have contain metaInfo", taskId)
		return false
	} else {
		t.TaskMeta[taskId] = taskMetaInfo
	}
	return true
}

// the function is to judge waiting task to working
func (t *TaskMetaHolder) judgeTaskState(taskId int) bool {
	taskInfo, ok := t.TaskMeta[taskId]
	if !ok || taskInfo.State != Waiting {
		return false
	}
	taskInfo.StartTime = time.Now()
	taskInfo.State = Working
	return true
}

// the function is to judge if all tasks have done, server subsequent phase
func (t *TaskMetaHolder) allTaskDone() bool {
	var (
		mapDoneNum      = 0
		mapUnDoneNum    = 0
		reduceDoneNum   = 0
		reduceUnDoneNum = 0
	)
	taskMeta := t.TaskMeta
	for _, taskMetaInfo := range taskMeta {
		if taskMetaInfo.TaskAdr.TaskType == MapTask {
			if taskMetaInfo.State == Done {
				mapDoneNum++
			} else {
				mapUnDoneNum++
			}
		} else if taskMetaInfo.TaskAdr.TaskType == ReduceTask {
			if taskMetaInfo.State == Done {
				reduceDoneNum++
			} else {
				reduceUnDoneNum++
			}
		}
	}
	if (mapDoneNum > 0 && mapUnDoneNum == 0) && (reduceDoneNum == 0 && reduceUnDoneNum == 0) {
		return true
	} else {
		if reduceDoneNum > 0 && reduceUnDoneNum == 0 {
			return true
		}
	}
	return false
}

type Coordinator struct {
	// Your definitions here.
	Phase          Phase
	MapChan        chan *Task
	ReduceChan     chan *Task
	ReduceNum      int
	Files          []string
	TaskId         int // 这个字段主要作用生成递增ID
	TaskMetaHolder TaskMetaHolder
	mu             sync.Mutex
}

// Your code here -- RPC handlers for the worker to call.

// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}

// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
	rpc.Register(c)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := coordinatorSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
	//The coordinator, as an RPC server, will be concurrent; don't forget to lock shared data
	c.mu.Lock()
	defer c.mu.Unlock()
	ret := false

	// Your code here.
	if c.Phase == AllDone {
		fmt.Println("All tasks have Done")
		ret = true
	} else {
		//fmt.Println("Not All tasks have Done")
	}
	return ret
}

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{
		Phase:      MapPhase,
		MapChan:    make(chan *Task, len(files)),
		ReduceChan: make(chan *Task, nReduce),
		ReduceNum:  nReduce,
		Files:      files,
		TaskMetaHolder: TaskMetaHolder{
			TaskMeta: make(map[int]*TaskMetaInfo, nReduce+len(files)),
		},
		TaskId: 0,
	}

	// Your code here.
	c.MakeMapTask(files)
	c.server()
	// 开启一个探测器,监测任务执行时间是否过长
	go c.crashHandler()
	return &c
}

func (c *Coordinator) MakeMapTask(files []string) {
	for _, file := range files {
		taskID := c.genTaskId()
		task := Task{
			TaskType:  MapTask,
			TaskId:    taskID,
			FileSlice: []string{file},
			ReduceNum: c.ReduceNum,
		}

		// 保存任务初始状态
		taskMetaInfo := TaskMetaInfo{State: Waiting, TaskAdr: &task}
		c.TaskMetaHolder.acceptTaskMetaInfo(&taskMetaInfo)
		c.MapChan <- &task
	}
}

func (c *Coordinator) MakeReduceTask() {
	for reduceNum := 0; reduceNum < c.ReduceNum; reduceNum++ {
		taskID := c.genTaskId()
		task := Task{
			TaskType:  ReduceTask,
			TaskId:    taskID,
			FileSlice: c.selectReduceNum(reduceNum),
			ReduceNum: c.ReduceNum,
		}
		// 保存任务初始状态
		taskMetaInfo := TaskMetaInfo{State: Waiting, TaskAdr: &task}
		c.TaskMetaHolder.acceptTaskMetaInfo(&taskMetaInfo)
		c.ReduceChan <- &task
	}
}

/*
*构建Reduce任务,需要将Map任务存储的所有中间文件按照ReduceNum构建任务:
一个File也就是一个Map任务,执行结果会根据Key的哈希值将一个File分散为ReduceNums个任务,如:mr-0-0\mr-0-1\mr-0-2
末尾数字为需要分配给某个Reduce执行的文件,中间的数字为对于的Map的任务标识,也就是TaskId
*/
func (c *Coordinator) selectReduceNum(reduceNum int) []string {
	var res []string
	path, _ := os.Getwd()
	files, err := ioutil.ReadDir(path)
	if err != nil {
		log.Fatal("[selectReduceNum] failure", err)
		return res
	}
	for _, file := range files {
		if strings.HasPrefix(file.Name(), "mr-") && strings.HasSuffix(file.Name(), strconv.Itoa(reduceNum)) {
			res = append(res, file.Name())
		}
	}
	return res
}

/*
* 为什么需要一个全局唯一ID生成器,主要Map的worker个数为len(files), Reduce的worker个数为ReduceNum个,
* Coordinator中有一个属性TaskMetaHolder用于保存任务的元数据,更内层使用一个map表格存储各个任务的元信息,key为任务ID,同时任务总数为
* Map对应的worker+Reduce对应的worker,所以需要使用一个全局任务Id生成器,生成递增的任务ID
 */
func (c *Coordinator) genTaskId() int {
	res := c.TaskId
	c.TaskId++
	return res
}

func (c *Coordinator) PullTask(taskReq *TaskRequest, taskResp *Task) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	switch c.Phase {
	case MapPhase:
		{
			if len(c.MapChan) > 0 {
				*taskResp = *<-c.MapChan
				if !c.TaskMetaHolder.judgeTaskState(taskResp.TaskId) {
					fmt.Println("[PullTask] task state is ", c.TaskMetaHolder.TaskMeta[taskResp.TaskId].State)
				}
			} else {
				// Map对应的任务被分发完了,但此时任务并没有全部完成,此时将任务状态设置为waiting状态
				taskResp.TaskType = WaitingTask
				// 检查Map任务是否都完成,完成后将流程进入Reduce阶段
				if c.TaskMetaHolder.allTaskDone() {
					c.toNextPhase()
				}
				return nil
			}
		}
	case ReducePhase:
		{
			if len(c.ReduceChan) > 0 {
				*taskResp = *<-c.ReduceChan
				if !c.TaskMetaHolder.judgeTaskState(taskResp.TaskId) {
					fmt.Println("[PullTask] task state is ", c.TaskMetaHolder.TaskMeta[taskResp.TaskId].State)
				}
			} else {
				taskResp.TaskType = WaitingTask
				if c.TaskMetaHolder.allTaskDone() {
					c.toNextPhase()
				}
				return nil
			}
		}
	case AllDone:
		{
			taskResp.TaskType = ExitTask
		}
	default:
		panic("[PullTask] invalid Phase")
	}
	return nil
}

func (c *Coordinator) MarkDone(task *Task, taskResp *Task) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	switch task.TaskType {
	case MapTask:
		{
			metaInfo, ok := c.TaskMetaHolder.TaskMeta[task.TaskId]
			if ok && metaInfo.State == Working {
				metaInfo.State = Done
				fmt.Printf("[MarkDone] task is done, the taskId is: %v, the taskType is %v\n", task.TaskId, task.TaskType)
			} else {
				fmt.Printf("[MarkDone] error, the task not to be done, taskId id : %v, the tasktype is %v\n ", task.TaskId, task.TaskType)
			}
			break
		}
	case ReduceTask:
		{
			metaInfo, ok := c.TaskMetaHolder.TaskMeta[task.TaskId]
			if ok && metaInfo.State == Working {
				metaInfo.State = Done
				fmt.Printf("[MarkDone] task is done, the taskId is: %v, the taskType is %v\n", task.TaskId, task.TaskType)
			} else {
				fmt.Printf("[MarkDone] error, the task not to be done, taskId id : %v, the tasktype is %v\n ", task.TaskId, task.TaskType)
			}
			break
		}
	default:
		{
			panic("[MarkDone] invalid TaskType")
		}
	}
	return nil
}

func (c *Coordinator) toNextPhase() {
	switch c.Phase {
	case MapPhase:
		{
			c.MakeReduceTask()
			c.Phase = ReducePhase
		}
	case ReducePhase:
		{
			c.Phase = AllDone
		}
	default:
		panic("[toNextPhase] invalid phase")
	}
}

/*
*
为什么要设置这个探测器:参考如下回答。自己写的时候考虑到这个仅仅将当前对应的任务状态由Working更改为Waiting,并将其加入到对应的chan通道中
但worker中并没有终止相应任务的执行,此举是否会造成一个worker执行多次???
1.无论worker中一个任务是否执行多次,对于Map来说产生的中间文件名称是一样的,后续分配给新的worker后输出文件会覆盖前面的worker输出的文件,并且是从头填写
The coordinator can't reliably distinguish between crashed workers, workers that are alive but have stalled for some reason,
and workers that are executing but too slowly to be useful. The best you can do is have the coordinator wait for some amount of time,
and then give up and re-issue the task to a different worker. For this lab, have the coordinator wait for ten seconds;
after that the coordinator should assume the worker has died (of course, it might not have).
*/
func (c *Coordinator) crashHandler() {
	for {
		// 关于这个休眠时间的思考:
		// 如果不设置这个休眠时间,可能导致探测器协程不断获取锁,释放锁,不断循环,从而导致分发任务的方法PullTask无法获取锁
		// 从而无法执行后续任务,这里类似时间片算法的使用了。
		time.Sleep(2 * time.Second)
		c.mu.Lock()
		if c.Phase == AllDone {
			c.mu.Unlock()
			break
		}
		for _, metaInfo := range c.TaskMetaHolder.TaskMeta {
			if metaInfo.State == Working && time.Since(metaInfo.StartTime) > 9*time.Second {
				if metaInfo.TaskAdr.TaskType == MapTask {
					c.MapChan <- metaInfo.TaskAdr
					metaInfo.State = Waiting
				} else if metaInfo.TaskAdr.TaskType == ReduceTask {
					c.ReduceChan <- metaInfo.TaskAdr
					metaInfo.State = Waiting
				}
			}
		}
		c.mu.Unlock()
	}
}

worker.go

package mr

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"os"
	"sort"
	"strconv"
	"time"
)
import "log"
import "net/rpc"
import "hash/fnv"

// Map functions return a slice of KeyValue.
type KeyValue struct {
	Key   string
	Value string
}

// for sorting by key.
type BySort []KeyValue

// for sorting by key.
func (a BySort) Len() int           { return len(a) }
func (a BySort) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a BySort) Less(i, j int) bool { return a[i].Key < a[j].Key }

// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.

	// uncomment to send the Example RPC to the coordinator.
	//CallExample()
	// ----------------------------------------第一次实现----------------------------------
	// GetTask
	//for i := 0; i < 2; i++ {
	//	task := GetTask()
	//	DoMapTask(&task, mapf)
	//}
	// ----------------------------------------第二次调整-----------------------------------
	// 对任务状态加入枚举
	flag := true
	for flag {
		task := GetTask()
		switch task.TaskType {
		case MapTask:
			{
				DoMapTask(&task, mapf)
				TaskDone(&task)
			}
		case ReduceTask:
			{
				DoReduceTask(&task, reducef)
				TaskDone(&task)
			}
		case WaitingTask:
			{
				//执行到此处说明Map任务有部分尚未完成,此时不能继续执行后续的Reduce任务,只有等待所有Map任务全部完成后才能执行Reduce
				//所以在此处休眠一会
				fmt.Printf("[Worker] the task is waiting, taskId : %v\n\n", task.TaskId)
				time.Sleep(2 * time.Second)
			}
		case ExitTask:
			{
				fmt.Println("[Worker] Exit Task")
				flag = false
			}
		}
	}
}

// GetTask :Get a Task
func GetTask() Task {
	taskReq := TaskRequest{}
	taskResp := Task{}
	ok := call("Coordinator.PullTask", &taskReq, &taskResp)
	if ok {
		fmt.Printf("[GetTask] success get task : %v\n", taskResp)
	} else {
		fmt.Printf("[GetTask] call failed!\n")
	}
	return taskResp
}

func DoMapTask(task *Task, mapf func(string, string) []KeyValue) {
	/***
	Map任务主要流程分为三部分:
	1. 根据RPC调用获取到文件名,调用已经写好的Map函数生成中间文件(intermediate)
	2. 根据RPC返回的任务参数中的ReduceNum的数值对中间文件(intermediate)进行分组,
		分组依据根据字母结构体的Key取Hash值随后对ReduceNum取余。
	3. 将分组后的中间文件保存到临时文件中
	---------------------个人理解-----------------------------------------
	生成的临时文件名:mr-X-Y(X是文件id,Y是哈希后对应ReduceNum)
	对于文件Id就是运行coordinator传入第二个参数files(文件集)中文件的顺序
	也就是说Map函数的主要功能是将传入的某个文件的词频统计出来(准确的说并没有统计词频,可以阅读wc.go源代码)仅仅将「单词-1」统计出来
		for _, w := range words {
		kv := mr.KeyValue{w, "1"}
		kva = append(kva, kv)
	随后将某个文件的所有词频根据Key也就是单词,根据单词的哈希值将单词分组,分组个数为ReduceNum
	*/
	var intermediate []KeyValue
	fmt.Printf("[DoMapTask] worker is map taskId : %v, fileName : %v\n", task.TaskId, task.FileSlice[0])
	file, err := os.Open(task.FileSlice[0])
	if err != nil {
		log.Fatalf("[DoMapTask] cannot open %v", task.FileSlice[0])
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("[DoMapTask] cannot read %v", task.FileSlice[0])
	}
	file.Close()
	intermediate = mapf(task.FileSlice[0], string(content))
	reduceNum := task.ReduceNum
	hashKV := make([][]KeyValue, reduceNum)
	for _, value := range intermediate {
		index := ihash(value.Key) % reduceNum
		hashKV[index] = append(hashKV[index], value)
	}
	// 放入中间文件
	for i := 0; i < reduceNum; i++ {
		fileName := "mr-" + strconv.Itoa(task.TaskId) + "-" + strconv.Itoa(i)
		tempFile, err := os.Create(fileName)
		if err != nil {
			log.Fatalf("[DoMapTask] create temp file: %v failed.", fileName)
		}
		enc := json.NewEncoder(tempFile)
		for _, kv := range hashKV[i] {
			err = enc.Encode(&kv)
			if err != nil {
				log.Fatalf("[DoMapTask] encode error: %v", err)
			}
		}
		tempFile.Close()
	}
}

func DoReduceTask(task *Task, reducef func(string, []string) string) {
	// 关于这个Reduce任务为什么需要先写入临时中间文件,随后等中间文件写完后,在重命名临时文件为最终文件
	/**
	To ensure that nobody observes partially written files in the presence of crashes,
	the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written.
	You can use ioutil.TempFile (or os.CreateTemp if you are running Go 1.17 or later) to create a temporary file
	and os.Rename to atomically rename it.

	最上面一句:确保发生崩溃时没有人观察到部分写入的文件
	使用中间临时文件在发生崩溃时临时文件不会保存,此外使用中间文件命令和最终输出文件名称不一样
	能够保证输出的符合要求格式的文件都是完整的,不完整的文件即使没有删除,由于命名和要求格式不一样也不会考虑
	*/
	reduceFileNum := task.TaskId
	intermediate := shuffle(task.FileSlice)
	dir, _ := os.Getwd()
	tempFile, err := os.CreateTemp(dir, "mr-tmp-*")
	if err != nil {
		log.Fatal("[DoReduceTask] Failed to create temp file", err)
	}
	i := 0
	// Debug
	//fmt.Printf("the intermediate length is %v\n", len(intermediate))
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		var values []string
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)
		fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
		i = j
	}
	tempFile.Close()
	fn := fmt.Sprintf("mr-out-%d", reduceFileNum)
	os.Rename(tempFile.Name(), fn)
}

func shuffle(files []string) []KeyValue {
	var kva []KeyValue
	for _, filepath := range files {
		file, _ := os.Open(filepath)
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
		file.Close()
		// 删除临时文件
		//os.Remove(filepath)
	}
	sort.Sort(BySort(kva))
	return kva
}

func TaskDone(task *Task) {
	taskReq := task
	taskResp := Task{}
	ok := call("Coordinator.MarkDone", &taskReq, &taskResp)
	if ok {
		fmt.Printf("[TaskDone] success mark task : %v\n", taskReq)
	} else {
		fmt.Printf("[TaskDone] call failed!\n")
	}
}

// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
func CallExample() {

	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	// the "Coordinator.Example" tells the
	// receiving server that we'd like to call
	// the Example() method of struct Coordinator.
	ok := call("Coordinator.Example", &args, &reply)
	if ok {
		// reply.Y should be 100.
		fmt.Printf("reply.Y %v\n", reply.Y)
	} else {
		fmt.Printf("call failed!\n")
	}
}

// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := coordinatorSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

参考链接

https://blog.csdn.net/weixin_51322383/article/details/132068745
https://blog.csdn.net/weixin_45938441/article/details/124018485


http://www.kler.cn/news/364157.html

相关文章:

  • Python poetry 虚拟环境
  • 参加了十多个面试,一个offer也没拿到...为什么?
  • Perl打印9x9乘法口诀
  • (gersemi) CMake 格式化工具
  • 从0开始学python-day14-pandas1
  • PoissonRecon学习笔记
  • Spring--4
  • C++与现代开发实践第三节:多线程与并发编程
  • 充电器插拔引起电量跳变怎么办?
  • FPGA开发verilog语法基础1
  • springboot033小徐影城管理系统(论文+源码)_kaic
  • 【Jenkins】解决在Jenkins Agent节点容器内无法访问物理机的docker和docker compose的问题
  • 使用 Python 解析火狐浏览器的 SQLite3 数据库
  • C++,STL 047(24.10.24)
  • Mysql-count(1)、count(*)和count(列名)的区别?
  • Kafka Tool(Offset Explorer)在windows下配置访问kerberos认证Kafka
  • 【Ubuntu20.04 Visual Studio Code安装】【VSCODE】
  • 相对定位和绝对定位,使得图片在中间 ( html css )
  • Codeforces Round 981(Div. 3)
  • 【开源免费】基于SpringBoot+Vue.JS校园美食分享平台 (JAVA毕业设计)
  • [Python学习日记-53] Python 中的正则表达式模块 —— re
  • docker国内镜像仓库地址
  • Linux Shell 实现一键部署mariadb11.6
  • Hugging Face HUGS 加快了基于开放模型的AI应用的开发
  • 相机外参与相机位姿深度理解
  • 115页PPT华为管理变革:制度创新与文化塑造的核心实践