MapReuce的笔记

下面是MapReduce的图例:

MapReduce

一个MapReduce的Map任务数量由输入文件大小/块的大小来决定。而Reduce的个数由用户来指定。

The number of partitions (R) and the partitioning function are specified by the user.

MapReduce任务的执行流程:

  1. MapReduce将输入文件分割为16MB到64MB的块(块的大小也是由用户来控制)。然后将任务复制到集群的各个节点去。
  2. 其中有一份程序是特殊的--Master。剩下的节点是由master来分配任务的。总共有M(等于split的个数)个map task和R个 reduce task(由用户来指定需要多少个reduce task)需要分配。master挑选集群当中空闲的机器来为其分配map task or reduce task
  3. 一个被分配了map task的worker读取对应的input split。它从输入数据中得到key/value然后传给map funciton

    It parses key/value pairs out of the input data and passes each pair to the user-defined Map function
    这里的意思是,map function并不是直接输入文件路径的。map function的输入也是key/value pairs。比如说,wordcount读取文件的输入key/value都是(offset,line)。Map function输出的key/value pairs暂时缓存到内存当中

  4. 每隔一段时间,buffered pairs被写入到本地硬盘当中,根据partition function这些pairs被切分为R份。这些buffer的位置都返回到了master当中,master负责将这些数据的位置发送给reduce workers.
    默认情况下,partition是用hash来做的。比如说,我们想让结果在两个partition当中。hash(key) mod 2。
    那么key就会被分为两半,一般是对2取余后=0,一个是对2取余后=1的。同样的如果是3,那么keyspace就被划为了三块。
  5. 当一个reduce worker被master通知得到这些块的位置后,它使用RPC(remote procedure call)来读取存在map wokers硬盘当中的buffered data。当recue worker将所有的数据都读入的时候,reduce worker将根据 intermediate keys来排序。那么相同key的数据就会在相同的group当中。论文中排序是必要的,因为不同的key会映射到相同的reduce task去。所以要排序,来决定何时运行新的reduce function().

    The sorting is needed because typically
    many different keys map to the same reduce task

why need sort? 为什么需要sort,看这个回答。简单的来说,只有将数据排序过后,我们才知道何时去将来创建一个新的reduce任务。
PS:需要说明的一点是,上面回答的评论区部分提到。如果我们有10个reducer,这并不意味着我们只调用reduce() function 10次。partition中可能存在其他keys,因为经过parition function后它们被归到了同一个partion中,因此会执行reduce() function 多次。注意reduce task表示执行reduce 任务的worker,reduce function表示代码中的reduce(),两者并不相同。

The sorting is needed because typically many different keys map to the same reduce task

这就是论文中所说的意思,reduce task是处理它自己所属的partition的。所以对Keys排序是必要的,因为我们对当前partition中的数据,如果遇到不同的key之后,那么说明需要去执行新的reduce() function

  1. redue worker 遍历整个已经排好序的数据然后为每一个它所遇到的unique key,然后将这个key以及其所对应的数据传给reduce function()。然后该reduce function的输出追加到属于该reduce task对应的partition之后。
    PS: 这里的潜台词就是说,一个partition中可能有各个不同的key的对应数据,每一个keys将其对应的value传给reduce function()
  2. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code

在执行完成后,mapreduce 的执行结果存在R个文件当中(每一个reduce task对应一个,file name可以用用户来指定)。一般情况下我们不需要将这些文件重新合并为单个文件,因为输出结果都是存在分布式文件系统当中的,比如说HDFS,其它一些应用需要HDFS的数据时候,又是以分块来读取的。所以没必要。

Shuffle and Sort

The process by which the system performs the sort — and transfers the map outputs to the reducers as inputs — is known as the shuffle -----------Hadoop definitive guide

shuffle and sort

MapReduce论文中没有提到在将map的输出存到硬盘中的时候是否进行了排序。但是可以保证的是,MapReduce和Hadoop将存入到硬盘的Map output是分parition的。 redcue task需要在集群当中需要为它自己的partition从集群当中收集过来。然后拼接。最后在重新排序。然后第一个reduce task处理第一个partition,注意每个map output的值都是以partition为单位的,所以第一个reduce task需要将散播在多个map output中的值拼接起来。

As the copies accumulate on disk, a background thread merges them into larger, sorted files ---- Hadoop definitive guilde

Moreover, the reduce task needs the map output for its particular partition from several map tasks across the cluster.---- Hadoop definitive guilde

第二个reduce task处理第二个partition。以此类推。经过partition function计算后,得到的值如果是相同的话才会放到相同的partition当中。

merge

图片的源地址
PS
这里有一点小问题,可能是我的理解有些问题。经过partition function可能得到的数据都在只有一个partition,那么在拼接的时候是否会出现问题?没有理解。

总结

本来只是想了解下MapReudce的具体过程,shuffle 和sort的工作是什么。仔细看了论文发现也是没有很细致的讲解。从更高层次来说,MapReduce的工作就如同论文中所讲的那样。
Map的工作:

Map, written by the user, takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key I and passes them to the Reduce function

Reduce的工作

The Reduce function, also written by the user, accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values.

将map 产生的数据传到reduce,这个过程叫做shuffle。mapreduce在这部分做的最主要工作是:将map产生的输出数据以key来group,然后将这些数据发送给reduce。这部分就是shuffle。


这张图里面好像没涉及到partition的操作,不过对于shuffle的描述还是比较直观的。

Combiner:
论文里面对于combiner说的很好。以一个WordCount为例子,英语文本中the是非常多的,所以一个文本当中可能会输出很多<the,1>。所以在发送到reduce之间,可以现在运行map task的机器上先做部分合并(partial merge)。

The Combiner function is executed on each machine that performs a map task. Typically the same code is used to implement both the combiner and the reduce functions。

差不多是这样,以后再补充吧

暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇