`
zy19982004
  • 浏览: 653834 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
F6f66edc-1c1a-3859-b76b-a22e740b7aa7
Hadoop学习
浏览量:249704
社区版块
存档分类
最新评论

Hadoop学习三十四:Hadoop-MapReduce Job本地运行流程

 
阅读更多

一. 版本环境

     以前工作的过程中,陆陆续续看过一些Hadoop1.0 MapReduce的源码,但没有形成体系。现在再次来看,顺便记录。此次学习版本的是Hadoop2.2.0 MapReduce。环境为直接在Win7下Local模式调试。MapReduce。

 

二. Job提交流程

     从Job waitForCompletion开始

1 Job submit

1.1 JobSubmitter submitJobInternal

1.1.1 JobSubmissionFiles.getStagingDir 初始化Job系统工作目录jobStagingArea。如D:\tmp\hadoop-root\mapred\staging\rootXXXXXXXXXX\.staging。

1.1.2 获得JobID。如job_localXXXXXXXXXX_0001。

1.1.3 copyAndConfigureFiles copy Job Jar到submitJobDir = new Path(jobStagingArea, jobId.toString())。

1.1.4 writeSplits,将input划分为split,并将split数据和split元数据写入系统工作目录,最后返回split的数目。input如下

1.1.4.1 调用TextInputFormat getSplits方法获得split,集群环境BlockSize为128M,所以145M的test-data.txt被划分为两个split。相关算法自己去看,提供两个数据BlockLocation[0,134217728,201slave,203slave,202slave, 134217728,18093772,201slave,203slave,202slave],InputSplit[hdfs://192.168.1.200:9000/user/root/input/test-data.txt:0+134217728, hdfs://192.168.1.200:9000/user/root/input/test-data.txt:134217728+18093772]

1.1.4.2 JobSplitWriter.createSplitFiles将split数据和split元数据写入系统工作目录。

1.1.5 writeConf,将配置文件写到系统工作目录。此时系统工作目录如下

1.1.6 LocalJobRunner submitJob

1.1.6.1 new Job

1.1.6.1.1 Job初始化

1.6.1.1.1.1 systemJobDir就是上面的submitJobDir,systemJobFile = submitJobDir\job.xml

1.1.6.1.1.2 将配置文件写入本地工作目录localJobDir\localJobFile。如D:\tmp\hadoop-root\mapred\local\localRunner\root\job_localXXXXXXXXXX_0001\job_localXXXXXXXXXX_0001.xml。此时,本地工作目录如下

1.1.6.1.2 Job run

 

三. Job run流程

 

     Job run方法很大,是整个Job执行的核心框架,自定义的Mapper和Reduce都会在这里被调起。我把这个方法单独拿出来说。

1 创建OutputCommitter

2 从系统工作目录split数据和元数据文件里获得split信息TaskSplitMetaInfo[]

3 根据TaskSplitMetaInfo[]创建List<MapTaskRunnable>,显然会有两个MapTaskRunnable

4 ExecutorService运行每个MapTaskRunnable

4.1创建MapTask并执行run

4.2 runNewMapper

4.2.1 反射创建自定义的Mapper mapper

4.2.2 反射创建InputFormat

4.2.3 从系统工作目录文件里获得此MapTask的split

4.2.4 反射创建RecordReader

4.2.5 反射创建RecordWriter output

4.2.6 创建MapContextImpl

4.2.7 mapper.run(mapperContext),可能涉及到数据的spill。

4.2.8 output.close(mapperContext),涉及到数据的sort spill combin merge。

5 等待每个MapTaskRunnable运行完。但两个MapTaskRunnable都运行完,如下图

6 将Mapper的结果mv & rename到Reduce的本地工作目录,此时

7 创建ReduceTask并执行run

7.1 merge & sort

7.2 runNewReducer

7.2.1 反射创建Reducer

7.2.2 反射创建RecordWriter,准备好临时目录流。参考类FileOutputFormat

/**
   * Get the default path and filename for the output format.
   * @param context the task context
   * @param extension an extension to add to the filename
   * @return a full path $output/_temporary/$taskid/part-[mr]-$id
   * @throws IOException
   */
  public Path getDefaultWorkFile(TaskAttemptContext context,
                                 String extension) throws IOException{
    FileOutputCommitter committer = 
      (FileOutputCommitter) getOutputCommitter(context);
    return new Path(committer.getWorkPath(), getUniqueFile(context, 
      getOutputName(context), extension));
  }

 

7.2.3 调用自己的Reduce,将结果输出到临时目录下

   

8 OutputCommitter将Reduce的结果mv到output下

9 清理以下目录

9.1 系统工作目录systemJobFile.getParent()

9.2 本地工作目录localJobFile 

 

四. 大流程

     最后用一张图总结本文

 

       再补充一下:本地MapReduce执行时  ,有几个线程来运行MapTask
int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);

maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.

ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
   实际上就是Math.max(Math.min(maxMapThreads, this.numMapTasks), 1)

 

     不断学习不断补充

     当map和reduce同时存在时,map的结果先flush到硬盘上,reduce时以此为输入,计算完后由FileOutputFormat直接写到临时目录里,最后OutputCommitter将此结果mv到output下;

     如果只有map没有reduce阶段即 job.setNumReduceTasks(0)时又如何。这个时候map的计算结果将通过FileOutputFormat直接写到临时目录里,最后OutputCommitter将此结果mv到output下。

MapTask.runNewMapper

// get an output object
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

NewDirectOutputCollector and NewOutputCollector all extends from RecordWriter, different RecordWriter decision to different wirtelocation

 

 

 

1
2
分享到:
评论

相关推荐

    Hadoop-MapReduce-学习资料及文档.zip

    内容概要: mr执行笔记; mapreduce框架的规范; wc流程.xls; wordcount的伪代码; yarn提交job的源码流程; YARN中提交job的详细流程; 打开流的关键代码; 打开流的调用流程; 日志格式;

    hadoop 2.7.6 eclipse插件

    18/05/25 19:51:49 INFO mapreduce.Job: Job job_1527248744555_0001 running in uber mode : false 18/05/25 19:51:49 INFO mapreduce.Job: map 0% reduce 0% 18/05/25 19:52:20 INFO mapreduce.Job: map 100% ...

    MapReduce-Demo:一个演示MapReduce(Hadoop)程序,用于处理少量数据

    MapReduce-Demo 一个演示...使用罐子执行作业并提供粗化(输入和输出文件路径) COMMAND TO EXECUTE THE MAPREDUCE JOB: hadoop jar students-high-mark.jar hdfs:/new-students.txt hdfs:/student-out9.txt* 5。 验证

    Hadoop应用系列2--MapReduce原理浅析(上)

    NULL 博文链接:https://eastzhang.iteye.com/blog/1775734

    hadoop-2.6.0-hadoop.dll-winutils.exe

    windows上eclipse运行hadoop程序报NullPointerException错 log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j ...

    CustomInputFormatCollection:Hadoop Mapreduce InputFormat 集合

    job.setInputFormatClass(SmallFileCombineTextInputFormat.class); 当前Maven提加如下依赖 讲无法从reposity中找到直接jar,需手动编译下载,后续加入到 sohu 公司内部repo二方库中 org.apache.hadoop hadoop-...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    有前三章的内容前言第一部分 基础篇第1章 阅读源代码前的准备1.1 准备源代码学习环境1.1.1 基础软件下载1.1.2 如何准备Windows环境1.1.3 如何准备Linux环境1.2 获取Hadoop源代码1.3 搭建Hadoop源代码阅读...

    Optimizing Hadoop for MapReduce(PACKT,2014)

    MapReduce is the distribution system that the Hadoop MapReduce engine uses to distribute work around a cluster by working parallel on smaller data sets. It is useful in a wide range of applications, ...

    hadoop-mapreduce

    #Hadoop-Mapreduce从Hadoop集群中的CDX文件中提取德国报纸和新闻站点的子集 怎么跑 使用以下代码编译和打包代码 mvn package 现在,目录target中应该有一个名为gen.sub-0.0.1-SNAPSHOT-job.jar的.jar,其中包含运行...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    hadoop段海涛老师八天实战视频

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    java下hadoop开发使用jar包

    java 下,开发hadoop时,可能用到的jar包。可以尝试下载

    Hadoop开发者第四期

    Hadoop开发者第四期: 海量数据处理平台架构演变; 计算不均衡问题在Hive 中的解决办法; Join 算子在Hadoop 中的实现; 配置Hive 元数据DB 为PostgreSQL; ZooKeeper 权限管理机制; ZooKeeper 服务器工作原理和...

    MapReduce Job本地提交过程源码跟踪及分析

    MapReduce Job本地提交过程源码跟踪及分析

    第一个Mapreduce程序.pdf

    本文介绍了用Java编写并运行第一个mapreduce作业的步骤及遇到的问题和解决方案。

    HadoopHA集群部署、YARNHA测试Job教学课件.pptx

    运行MapReduce程序测试Job 任务一 准备MapReduce输入文件 在master主节点,使用 root 用户登录,然后切换为 hadoop用户 [root@master ~]# su – hadoop --从root用户切换为hadoop身份 [hadoop@master ~]$ 创建新文件...

    spark-2.2.0-bin-hadoop2.7.zip

    Apache Spark 是专为大规模数据处理而设计的快速通用...但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    spark-2.4.3-bin-hadoop2.7.tgz

    Apache Spark 是专为大规模数据处理而设计的快速通用...但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    编译过的hadoop-core-1.0.4.jar,可以在本地模拟MapReduce

    编译过的hadoop-core-1.0.4.jar,可以在本地模拟MapReduce 如果Eclipse workspace在d:,则我们可以把d:的某个目录,比如d:\input作为输入目录;d:\output作为输出目录。 MapReduce编程模型里面这样写就可以了: ...

    Hadoop从入门到上手企业开发

    049 MapReduce编程模型讲解及运行PI程序和JobWebUI监控Job运行 050 如何按照【八股文】方式编写MapReduce 051 复习MapReduce编写模型和【八股文】方式编写MapReduce 052 完成MyWordCount程序编写 053 打包运行...

Global site tag (gtag.js) - Google Analytics