mapReduce

Posted by chinaljr on August 1, 2016

Programming Model

  • map
    • 将输入变成一堆 KV 对
    • 然后按照 Key 分组
    • pass them to the reduce function
  • Reduce
    • 把这些合并
    • 每个reduce function 产生0 or 1 个 结果
  • 想法简单,具体系统的实现需要根据环境的不同
  word1 word2 wordn
file1 x11 x12 x1n
file2 x21 x22 x2n
filem xm1 xm2 xmn
SUM sum1 sum2 sumn
  • map : 一行一行填表
  • reduce :合并每一行,生成最后一行

Implementation

  • master 保存每个 map-task 和 reduce-task 的状态
  • master 是 map-tasks 和 reduce-tasks 的桥梁

Fault Tolerance

  • Worker Failure
    • Master 不断询问 ,出错然后标记
    • 当前worker完成的内容需要重新算
      • map task 重算,因为outpur 位于 问题节点
      • reduce task 不用,因为output位于全局存储位置
  • Master Failure
    • 周期的 checkpoints
  • Semantics in the presense of Failure
    • 原子性

Locality

布局配置,影响网络资源的使用

Task Granularity

  • M 个 Map task
  • R 个 Reduce task
  • M 和 R 都远大于 worker 的数目
  • O(M+R) 的调度
  • O(M * R)的状态

Backup Tasks

  • straggler :运行的瓶颈,拖后腿
  • 速度过慢,然后新开一个新的backup-task , 两个其中之一结束就算结束

Refinements 细化

mit 6.824 lab1

PART I: MAP/REDUCE input and output

  • 实现domap()
    • 读取 mapID 的文件,对文件进行 mapF(用户定义) 操作
    • 按照key 分类存储在 nReduce 个文件中
  • 实现doreduce()
    • 根据自己的 ReduceID 读取 nMap个 中间文件根据key 进行reduceF的操作
    • 将最后结果排序输出,作为ReduceID 的最终结果

Part II: Single-worker word count

  • 实现mapF()
    • 统计每个文件里各个字符出现的次数
  • 实现reduceF()
    • 汇总每个词汇

Part III: Distributing MapReduce tasks

这部分就是实现调度算法,用到的GO函数都是在Hint中给的,但是遇到一个非常奇怪的bug,释放worker的chan如果放在进程计数之前,会导致bug,分析是可能在最后执行完最后一个任务之后,chan中内容被改变了(被填满了).很奇怪. 正确代码

var wgcnm sync.WaitGroup
wgcnm.Add(ntasks)
for i:= 0 ; i < ntasks ; i++{
	go func(i int){
		var file string
		if phase == mapPhase{
			file = mapFiles[i]
		} 
		var worker string
		//fmt.Println(file)
		var ok bool
		ok = false 
		for !ok{
			worker = <-registerChan
			ok = call(worker,"Worker.DoTask",&DoTaskArgs{jobName, file, phase, i, n_other},nil)

		}
		//fmt.Printf("\n %d ok\n", i)
		wgcnm.Done()
		registerChan <- worker

	}(i)
}
wgcnm.Wait()

BUG代码

var wgcnm sync.WaitGroup
wgcnm.Add(ntasks)
for i:= 0 ; i < ntasks ; i++{
	go func(i int){
		var file string
		if phase == mapPhase{
			file = mapFiles[i]
		} 
		var worker string
		//fmt.Println(file)
		var ok bool
		ok = false 
		for !ok{
			worker = <-registerChan
			ok = call(worker,"Worker.DoTask",&DoTaskArgs{jobName, file, phase, i, n_other},nil)

		}
		//fmt.Printf("\n %d ok\n", i)
		registerChan <- worker
		wgcnm.Done()

	}(i)
}
wgcnm.Wait()

Part IV: Handling worker failures

在上面那一部分中添加一个while循环即可