- 将block块数据进行逻辑切片计算,每个切片(split)对应一个map任务
- 切片是为了将block数量和map任务数量解耦
- map读取切片数据,默认按行读取,作为键值对浇给map方法,其中key是当前读取的行在文件中的字节偏移量,value就是读取的当前行的内容
- map开始,执行Map任务的自定义实现逻辑
- map将输出的kv首先写到环形缓冲区,在写之前计算分区号(默认按照key的hash值对reducer的个数取模)
- 环形缓冲区默认100MB,预制80%. 如果写入的kv达到了80%则发生溢写,溢写的时候要先对键值对按照分区号进行分区,相同分区按照key的字典顺序排序,溢写到磁盘。如果溢写的文件数量达到了三个,则发生map段归并操作,此时如果指定了combiner, 则按照combiner合并数据
- 当一个map任务完成之后,所有的reducertask向其发送http get请求,下载它们所属分区数据。此过程成为shuffle, 洗牌。
- 当所有的map任务运行结束,开始执行reduce任务
- 在reduce开始之前,根据设定的归并因子,进行多轮的归并操作,非最后一轮的归并的结果被存入到磁盘上,最后一轮归并的结果直接传递给reduce, reduce迭代计算
- reduce计算结束后将结果写到HDFS文件中,每一个reducer task任务都会在作业输出路径下产生一个结果文件part-r-00000. 同时执行成功时会产生一个空的_SUCCESS文件,该文件是一个标识文件.
作业提交流程
- 客户端向ResourceManager获取Job的ID
- 客户端检查作业输入输出(如果输入路径不存在则抛出异常;如果输出路径存在同样抛出异常)计算切片,解析配置信息
- 客户端将jar包、配置信息、切片信息上传到HDFS
- 客户端向ResourceManager发送提交作业的请求
- ResourceManager调度一个NodeManager, 在NodeManager上的一个容器中运行MRAppMaster, 一个作业对应一个MRAppMaster
- MRAppMaster首先获取HDFS中的作业信息,计算㜗当前作业需要map的数量,reduce数量
- MrAppMaster向ResourceManager为map任务申请容器,MRAppMaster跟NodeManager通信启动容器,运行map任务,容器中的YARNChild会首先本地化conf、切片信息以及jar包
- 当map任务完成达到5%的时候,MRAppMaster向ResourceManager为reduce任务申请容器
- 当MapReduce中最后一个任务运行结束,MRAppMaster向客户端发送作业完成信息。MapReduce的中间数据销毁,容器销毁,计算结果存储到历史服务器。
统计单词数量MR任务实例
Map任务
package org.hadoop.learn.mp.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 第一个参数KEYIN: 读取文件的偏移量
* 第二个参数VALUEIN: 代表了这一行的文本内容,输入的value类型
* 第三个参数KEYOUT: 输出的key的value类型
* 第四个擦拿书VALUEOUT 输出的value类型
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable inKey, Text inValue, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
// 获取当前行文本内容
String line = inValue.toString();
// 按照空行进行拆分
String[] words = line.split(" ");
for (String word : words) {
if (word.isEmpty()) {
continue;
}
context.write(new Text(word), new LongWritable(1));
}
}
}
Reduce任务
package org.hadoop.learn.mp.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
// 定义当前单词出现的总次数
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
Main类,启动任务
package org.hadoop.learn.mp.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
public class WordCountMain {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
if (args == null || args.length == 0 || args.length != 2) {
System.out.println("请输入输入/输出路径");
return;
}
System.setProperty("HADOOP_HOME", "H:\\xianglujun\\hadoop-2.6.5");
System.setProperty("hadoop.home.dir", "H:\\xianglujun\\hadoop-2.6.5");
System.setProperty("HADOOP_USER_NAME", "root");
Configuration configuration = new Configuration();
// 设置本地运行
configuration.set("mapreduce.framework.name", "local");
JobConf jobConf = new JobConf(configuration);
// 设置作业的输入输出路径
FileInputFormat.addInputPath(jobConf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
Job job = Job.getInstance(jobConf);
job.setJarByClass(WordCountMain.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setJobName("wordcount");
// 设置输出key的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置reducer的相关
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 提交作业并等待作业结束
boolean b = job.waitForCompletion(true);
System.out.println("任务是否执行完成: " + b);
}
}
问题解决
1. (null) entry in command string: null chmod 0700
这个问题主要出现在windows上开发的时候会出现,这个主要是因为windows中bin版本不支持windows导致的,可以有以下处理步骤
- GitHub – cdarlint/winutils: winutils.exe hadoop.dll and hdfs.dll binaries for hadoop windows下载对应hadoop版本的文件到本地,并替换windows上的bin文件目录
- 在windows中配置HADOOP_HOME的环境变量
- 重启idea,然后在执行程序,这个问题就可以解决了
