MapReduce详解
MapReduce作业涉及的实体
MapReduce作业执行涉及4个独立的实体:
- 客户端(client):编写MapReduce程序,配置作业,提交作业,这就是程序员完成的工作;
- JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行;
- TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个(JobTracker只能有一个,就和HDFS里namenode一样存在单点故障)
- HDFS:保存作业的数据、配置信息等等,最后的结果也是保存在HDFS上面
Map函数和Reduce函数的输入输出
函数 | 输入 | 输出 | 说明 |
---|---|---|---|
Map | <k1,v1> (如: <行号,”a b c”>) | List(<k2,v2>) (如:<“a”,1> <“b”,1> <“c”,1>) | 1.将小数据集进一步解析成一批<key,value>对,输入Map函数中进行处理; 2.每一个输入的<k1,v1>会输出一批<k2,v2>,<k2,v2>是计算的中间结果。 |
Reduce | <k2,List(v2)> (如:<“a”,<1,1,1>>) | <k3,v3>(如: <“a”,3>) | 输入的中间结果<k2,List(v2)>中的List(v2)表示是一批属于同一个k2的value |
MapReduce工作流程概述
MapReduced的核心思想是“分而治之”:把一个大的数据集拆分成多个小数据块在多台机器上并行处理。
一个大的MapReduce作业,首先会被拆分成许多个Map任务在多台机器上并行执行,每个Map任务通常运行在数据存储的节点上,这样计算和数据就可以放在一起运行,不需要额外的数据传输开销。当Map任务结束后,会生成以<key,value>形式表示的许多中间结果。然后,这些中间结果会被分发到多个Reduce任务在多台机器上并行执行,具有相同key的<key,value>会被发送到同一个Reduce任务那里,Reduce任务会对中间结果进行汇总计算得到最后结果,并输出到分布式文件系统中。
要点:
- 不同的Map任务之间不会进行通信
- 不同的Reduce任务之间也不会发生任何信息交换
- 用户不能显式地从一台机器向另一台机器发送消息
- 所有的数据交换都是通过MapReduce框架自身去实现的
MapReduce各个执行阶段
- MapReduce框架使用InputFormat模块做Map前的预处理,比如验证输人的格式是否符合输人定义;然后,将输入文件切分为逻辑上的多个Iputplit,InpuSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InpuSplit并没有对文件进行实际切割,只是记录了要处理的数据的位置和长度。
- 因为InputSplit是逻辑切分而非物理切分,所以还需要通过RecordReader(RR)根据InputSplit中的信息来处理InputSplit中的具体记录,加载数据并转换为适合Map任务读取的键值对,输入给Map任务。
- Map任务会根据用户自定义的映射规则,输出一系列的<key,value>作为中间结果。
- 为了让Reduce可以并行处理Map的结果,需要对Map的输出进行一定的分区(Partition)、排序(Sort)、合并(Combine)、归并(Merge)等操作,得到<key,value-list> 形式的中间结果,再交给对应的Reduce 进行处理,这个过程称为Shuffle。从无序的<key,value>到有序的<key,value-list>,这个过程用Shuffle(洗牌)来称呼是非常形象的。
- Reduce以一系列<key,value-list>中间结果作为输入,执行用户定义的逻辑,输出结果给OutputFormat模块。
- OutputFormat模块会验证输出目录是否已经存在以及输出结果类型是否符合配置文件中的配置类型,如果都满足,就输出Reduce的结果到分布式文件系统。
Shuffle过程详解
Shuffle过程概述
所谓Shuffle,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。
在Map端的Shuffle过程
Map的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。当启动溢写操作时,首先需要把缓存中的数据进行分区,然后对每个分区的数据进行排序(Sort)和合并(Combine),之后再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束之前,这些溢写文件会被归并(Merge)成一个大的磁盘文件,然后通知相应的Reduce任务来领取属于自已处理的数据。
输入数据和执行Map任务
输入数据一般保存在HDFS文件块中,格式是任意的。Map任务接收<key,value>作为输入后,按一定的映射规则转换成一批<key,value>进行输出。
写入缓存
- 每个Map任务分配一个缓存,在缓存中积累一定数量的Map输出结果之后再一次性批量写入磁盘(一次寻址,连续写入)
- MapReduce默认100MB缓存
- 写入缓存前,key与value值都被序列化成字节数组
溢写(分区、排序和合并)
Spill
当Map结果占满整个缓存后,会启动溢写(Spill)操作,把缓存中的内容一次性写入磁盘,并清空缓存。
溢写过程由另一个单独的后台线程完成,但为了保证Map结果写入缓存不受溢写的影响,必须让缓存中一直有可用的空间,一般设置溢写比例为0.8。
Partition
溢写之前数据会被分区,把<key,value>键值对交给不同的Reduce任务,默认采用Hash函数对key进行哈希后再用Reduce任务的数量进行取模(hash(key) mod R),把Map输出结果均匀地分配给R个Reduce任务去并行处理。
Sort
对于每个分区内的所有键值对,根据key进行内存排序,排序是MapReduce的默认操作。
Combine
排序后的合并是一个可选的操作,为了减少需要溢写到磁盘的数据量,如果事先没有定义Combiner函数就不用合并。
所谓合并,是指将那些具有相同key的<key,value>的value加起来。
Map端的这种合并操作和Reduce功能相似,但由于发生在Map端,只能称之为“合并”。一般用在累加、最大值等场景。
文件归并
每次溢写操作都会在磁盘生成一个新的溢写文件,在Map任务全部结束之前系统会进行归并,得到一个大的溢写文件。
所谓归并,是指对于具有相同key的键值对归并成一个新的键值对。
合并(Combine)和归并(Merge)的区别:
两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
进行文件归并时,如果磁盘中已经生成的溢写文件数量大于预定值(默认是3),可以再次启动Combiner,对数据进行合并操作,从而减少写入磁盘的数据量。
Map端Shuffle的完成
上述4个步骤之后生成一个大文件存在磁盘中,这个大文件的数据是被分区的,不同的分区会被发送到不同的Reduce任务进行并行处理。JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据。
在Reduce端的Shuffle过程
Reduce任务从Map端的不同Map机器领回属于自己处理的那部分数据,然后对数据进行归并(Merge)后交给Reduce处理。
“领取”数据
Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取(Fetch)数据。Reduce任务会使用多个线程同时从多个Map机器领回数据。
归并数据
Reduce领取的数据(来自不同Map机器)先放入缓存,若缓存被占满,启动溢写过程。具有相同key的键值对会被归并,若有定义Combiner则归并后的数据还会再合并。
当所有Map端数据领回后,多个溢写文件归并成一个或多个大文件,归并时对文件中的键值对排序。
当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce。
把磁盘上多个溢写文件归并成一个大文件可能需要执行多轮归并操作,每轮可以归并的文件数量由参数决定(默认是10)。
把数据输入给Reduce任务
磁盘中经过多轮归并后得到的若干个大文件不会继续归并,而是直接输入给Reduce任务,可以减少磁盘读写开销。
由此,整个Shuffle过程完成。接下来Reduce任务会执行Reduce函数中定义的各种映射,输出最终结果,并保存到HDFS里。
WordCount实例分析
对于WordCount程序任务,整个MapReduce过程实际的执行顺序如下:
- 执行WordCount的用户程序(采用MapReduce编写),会被系统分发部署到集群中的多台机器上,其中一个机器作为Master,负责协调调度作业的执行,其余机器作为Worker,可以执行Map或Reduce任务。
- 系统分配一部分Worker执行Map任务,一部分Worker执行Reduce任务;MapReduce将输入文件切分成M个分片,Master将M个分片分给处于空闲状态的N个Worker来处理。
- 执行Map任务的Worker读取输入数据,执行Map操作,生成一系列<key,value>形式的中间结果,并将中间结果保存在内存的缓冲区中。
- 缓冲区中的中间结果会被定期刷写到本地磁盘上,并被划分为R个分区,这R个分区会被分发给R个执行Reduce任务的Worker进行处理;Master会记录这R个分区在磁盘上的存储位置,并通知R个执行Reduce任务的Worker来“领取”属于自已处理的那些分区的数据。
- 执行Reduce任务的Worker收到Master的通知后,就到相应的Map机器上“领回”属于自己处理的分区。需要注意的是,正如之前在Shufle过程闸述的那样,可能会有多个Map机器通知某个Reduce机器来领取数据,因此一个执行Reduce任务的Worker可能会从多个Map机器上领取数据。当位于所有Map机器上的、属于自己处理的数据都已经领取回来以后,这个执行Reduce任务的Worker会对领取到的键值对进行排序(如果内存中放不下需要用到外部排序),使得具有相同key的键值对聚集在一起,然后就可以开始执行具体的Reduce操作了。
- 执行Reduce任务的Worker遍历中间数据,对每一个唯一key执行Reduce函数,结果写入到输出文件中;执行完毕后,唤醒用户程序,返回结果。