经过上一节的操作,我们已经体验过了MapReduceApplication的运行流程,下面就先讲一下与运行过程相关的概念。
MapReduce分布式运算 什么是MapReduce分布式运算?而对于相对复杂的MapReduce分布式运算程序来说,一个Map/Reduce显然是搞不定的,因此在实际运算过程中,处理相对复杂的任务,往往是多组Map/Reduce连接使用的。
MapReduce分布式运算优、缺点体现在哪些方面?在上一节【大数据入门笔记系列】第五节 SpringBoot集成hadoop开发环境(复杂版的WordCount)中,当我们我们将项目启动,最后是通过一个GET请求来触发我们的程序向集群提交MapReduceApplication程序,这里贴一下那么URL:
http://localhost:8080/WordCountController/wordCount?jobName=jack_roy_word_count&inputPath=/0000/00001
可以看到,这段URL中包含了两个参数,它们的含义及作用如下:
第一个参数jobName为我们的作业名称,该名称伴随作业的整个生命历程,出现在WebUi等地方; 第二个参数inputPath是我们的测试数据路径;它告诉了我们程序数据源的位置,然后我们的程序才能向yarn递交规划信息,yarn根据规划信息启动驱动程序(称之为Driver),Driver程序再启动MapTask来装载任务; MapTask 影响MapTask个数的因素?其中第三条挑出来讲一下, 之前在我们的作业工具类JobUtils中我们进行了作业相关信息的配置,这其中有关于split切片大小的设定,设置如下:
/* 4M */
CombineTextInputFormat.setMaxInputSplitSize(job, 4 * 1024 * 1024);
/* 2M */
CombineTextInputFormat.setMinInputSplitSize(job, 2 * 1024 * 1024);
这些配置在工具类JobUtils中初始化以后,通过waitForCompletion(boolean verbose)方法提交给客户端:
/* 提交作业并等待完成 */
job.waitForCompletion(true);
其中,waitForCompletion(boolean verbose)方法内部调用了submit()方法:
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
if (this.state == Job.JobState.DEFINE) {
/* 提交作业 */
this.submit();
}
/* verbose为true表明要打印运行进度,为false就不打印运行日志*/
if (verbose) {
this.monitorAndPrintJob();
} else {
int completionPollIntervalMillis = getCompletionPollInterval(this.cluster.getConf());
while(!this.isComplete()) {
try {
Thread.sleep((long)completionPollIntervalMillis);
} catch (InterruptedException var4) {
}
}
}
/* 返回判断作业是否成功结果 */
return this.isSuccessful();
}
而submit()无非是帮我们初始化提交器,然后提交器提交任务,最后返回一下job信息:
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
/* 判断作业状态 */
this.ensureState(Job.JobState.DEFINE);
/* mapred.mapper.new-api(旧) / mapred.reducer.new-api(新),因为兼容问题设置为新API */
this.setUseNewAPI();
/* 初始化一个cluster实例,用来和ResourceManager通信 */
this.connect();
/* 构建提交器,本地提交还是集群提交,我们选的是提交至yarn */
final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient());
this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
/* 提交任务 */
return submitter.submitJobInternal(Job.this, Job.this.cluster);
}
});
this.state = Job.JobState.RUNNING;
LOG.info("The url to track the job: " + this.getTrackingURL());
}
以WordCount为例,说说Map的过程:
任务通过提交器提交以后,yarn根据规划信息告知Driver需要几台MapTask,且由Driver通知各个机器开启MapTask; 各个MapTask拿到任务,查找自己分得的切片在哪些机器上(通常来说MapTask与文件切片所在的DataNode共属一台机器,除非资源不足,则两者分离); MapTask找到切片之后开始一行一行地读取,每读一行调用一个map获得一个(拿WordCount来说,此时的key就是一行字符),再通过分词器IKSegmenter进行分词,切成一个个单词的,整个过程由for循环一行行读取实现; 被切成单个单词的会被收集器收集,写入一个个分区且排序的文件,文件的每一个分区对应一个ReduceTask。 ReduceTask以WordCount为例,说说Reduce的过程:
等各个MapTask均执行完成,会生成一个个分区且排序的文件之后,由ReduceTask前去这些分区且排序的文件分区中拿属于自身的数据,这个过程各个ReduceTask拿到的分区内容不会重复; ReduceTask拿到分区数据以后再调用迭代方法来计算词频,最后提交结果。 split分片规则前面交代=影响MapTask个数的因素时,我们提到其中很重要的一点是split分片规则,分片这个事情是在FileInputStream类的getSplit()方法中实现的,其细则主要有下:
需要得知文件所在目录(去哪里拿数据); 开始变量目录下的文件(规划切片); 遍历第一个文件X; 获知X文件的大小(fs.sizeOf(X)); 计算切片大小,先以一个固定长度为偏移量a切分,然后判断剩下下的数据是否满足偏移量(偏移量a*1.1),若满足,则继续切,然后再判断,否则不切,直接以剩下的数据为一个切片。 将切片信息写入到规划文件中(供yarn使用)。如果存在较为特殊的业务需求,普通的分片规则无法满足,或这些固有的规则对我们的任务性能影响过大时,可以通过重写Partitioner类设计自定义分割规则,以满足需求。
MapReduce工作原理经过以上的铺垫,我们可以开始以图解的方式扒一扒MapReduce的工作流程。
在HDFS /0000/00001目录下上有一批待处理的数据,数据的内容都是一些诸如“as soon as”之类的单词语句。
14) ResourceManager收到请求后,MrApp的请求再次初始化成一个个task,并放入队列中(图中画不下了,就不画这一步了),NodeManager4、NodeManager5领取自己的task任务,并开启容器装载ReduceTask ;
15) MapReduceApplication向各个ReduceTask发送相关参数(包含MapTask生成文件在各个NodeManager上的位置);
16) 各个ReduceTask去拿MapTask的生成结果,各个ReduceTask拿到的分区不重复,然后开始计算;
17) ReduceTask经过计算后,将结果存储到我们定义好的输出路径中。
这个章节从我的笔记里看记录的十分杂乱,可想而知当初学习的时候也是一头雾水,这篇博客的撰写也花费了近一天的时间,我踌躇了很久才从那些杂乱的记录中总结出上述的运行流程,过程很是艰辛,但这对我来说也是温故知新的过程,我也由此发现了MapReduce源码的魅力,所以采用Debug的方式跟了一下job的提交流程,这个过程被我记录下来了:
【Debug阅读Hadoop3.0.0源码之MapReduce Job提交流程】第一节 Configuration和Job对象的初始化
【Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程】第二节 jobSubmitter(提交器对象)的初始化
【Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程】第三节 Job提交前的初始化
感兴趣的朋友可以在环境搭建好了之后,跟着讲解Debug一遍,加深理解,下一节我们试着讲述洗牌(Shuffle)过程发生的事,以及我们如何干预这个过程。
【大数据入门笔记系列】写在前面
【大数据入门笔记系列】第一节 大数据常用组件
【大数据入门笔记系列】第二节 Zookeeper简介
【大数据入门笔记系列】第三节 Hdfs读、写数据处理流程
【大数据入门笔记系列】第四节 NameNode元数据缓存机制
【大数据入门笔记系列】第五节 SpringBoot集成hadoop开发环境(复杂版的WordCount)
【大数据入门笔记系列】第六节 分布式计算框架MapReduce的工作流程