mapreduce
mapreduce官网,mapreduce是什么,原理,编程,說明手册
mapreduce是什么?
MapReduce是一种用于處理和分析大規模資料集的编程模型和计算框架。它最初由Google提出,并在Apache Hadoop專案中得到广泛应用。MapReduce的核心思想是將计算任務分成两个阶段:Map阶段和Reduce阶段。在Map阶段,输入資料被拆分成多个独立的資料块,并由多个Mapper并行處理。每个Mapper將输入資料轉换成键值對的形式,生成中间結果。在Reduce阶段,中间結果被合并和處理,生成最終的输出結果。
mapreduce官网: https://hadoop.apache.org/
MapReduce主要功能
MapReduce框架提供了自动處理任務并行化、資料劃分、跨節点通信和故障恢复等功能。它可以在大規模集群上運行,利用多台计算机的计算能力和存儲空间进行高效的分布式计算。
MapReduce對于處理大規模資料集、并行计算和分布式存儲非常有效。它已經成為處理大資料的重要工具之一,被广泛应用于資料分析、搜索引擎、日志處理等领域。
Hadoop核心组件之一:分布式计算的方案MapReduce,是一种编程模型,用于大規模資料集的并行運算,其中Map(映射)和Reduce(歸約)。
MapReduce既是一个编程模型,也是一个计算组件,處理的过程分為两个阶段,Map阶段:负责把任務分解為多个小任務,Reduce负责把多个小任務的處理結果进行汇总。其中Map阶段主要输入是一對Key-Value,經过map计算后输出一對Key-Value值;然后將相同Key合并,形成Key-Value集合;再將这个Key-Value集合轉入Reduce阶段,經过计算输出最終Key-Value結果集。
MapReduce可以實现基于上千台伺服器并發工作,提供很强大的資料處理能力,如果其中單台服務挂掉,计算任務会自动轉義到另外節点执行,保證高容错性;但是MapReduce不适应于實时计算与流式计算,计算的資料是静態的。
MapReduce官方手册
綜述
Hadoop MapReduce是一个軟體框架。它能够很容易的創建以一种可靠,容错的方式在商用机器上的大集群上并行的處理大量的資料。
一个MapReduce job通常將输入的資料集拆分成独立的块。Map任務以完全并行的方式處理这些块。框架對map的输出进行排序,进而作為输入提供给reduce任務。通常来說,job的输入和输出都保存在一个文件系統中。框架负责调度任務,监控任務并重新执行失败了的任務。
通常来說,计算節点和存儲節点是相同的,也就是說,MapReduce框架和HDFS運行在相同的節点集上。这样的配置能够保證框架在已經存在資料的節点上有效的调度任務,进而在不同集群间獲得一个非常高的总帶宽。
MapReduce框架由一个單一的主ResourceManager,每个集群節点上的一个从NodeManager以及每个应用上一个MRAppMaster组成。
应用至少会指定输入/输出位置以及透過實现合适的接口和抽象类来提供map和reduce功能。这些,以及其他job参數,组成job配置(configuration)。
然后,Hadoop job客户端提交job(jar/可执行的文件等等)以及配置ResourceManger。ResoureManger 负责给从節点分發軟體/配置,调度和监督任務,反馈状態和診斷信息给job客户端。
虽然Hadoop框架是由java實现的,但是MapReduce应用并不需要使用java编寫。
输入和输出
MapReduce框架完全以<键,值>形式操作,也就是說,框架將输入给job的資料視為<键,值>對,并且產生一个<键,值>對集作為job的输出。
键和值类必须透過框架序列化,因此需要實现Writable接口。除此之外,key类必须實现WritableComparable接口以辅助框架的排序。
一个MapReducejob的输入输出类型如下所示:
(input) <k1, v1>-> map -> <k2,v2> -> combine -> <k2,v2> -> reduce -> <k3,v3> (output)
實例:WordCount v1.0
在详細介绍細節前,让我们看一个MapReduce 应用,这對于帮助我们认識它是如何工作的非常有用。
WordCount是一个简單应用,它计算一个输入集中每个單词的出现次數。
这个例子适用于本地單机、偽分布和全分布hadoop安装
源代碼:
-
import java.io.IOException;
-
import java.util.StringTokenizer;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
public class WordCount {
-
-
public static class TokenizerMapper
-
extends Mapper<Object, Text, Text, IntWritable>{
-
-
private final static IntWritable one = new IntWritable(1);
-
private Text word = new Text();
-
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
StringTokenizer itr = new StringTokenizer(value.toString());
-
while (itr.hasMoreTokens()) {
-
word.set(itr.nextToken());
-
context.write(word, one);
-
}
-
}
-
}
-
-
public static class IntSumReducer
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
private IntWritable result = new IntWritable();
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
int sum = 0;
-
for (IntWritable val : values) {
-
sum += val.get();
-
}
-
result.set(sum);
-
context.write(key, result);
-
}
-
}
-
-
public static void main(String[] args) throws Exception {
-
Configuration conf = new Configuration();
-
Job job = Job.getInstance(conf, “word count”);
-
job.setJarByClass(WordCount.class);
-
job.setMapperClass(TokenizerMapper.class);
-
job.setCombinerClass(IntSumReducer.class);
-
job.setReducerClass(IntSumReducer.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
FileInputFormat.addInputPath(job, new Path(args[0]));
-
FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
System.exit(job.waitForCompletion(true) ? 0 : 1);
-
}
-
}
運行:
export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
编译WordCount.java且創建一个jar:
$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class
假設:
/user/joe/wordcount/input –HDFS中的输入目錄
/user/joe/wordcount/output –HDFS中的输出目錄
作為输入的样例文本文件:
$ bin/hadoop fs -ls /user/joe/wordcount/input/ /user/joe/wordcount/input/file01 /user/joe/wordcount/input/file02
$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World
$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
運行应用:
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output
输出:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000`
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
应用能够透過使用選项 –files来在该任務当前工作目錄下指定多个以逗号分開的路徑列表。-libjars選项允許应用给map和reduce的类路徑增加jar。選项 –archives 允許传递以逗号分開的備份作為参數。
運行帶有-libjars,-files和-archives的wordcount例子:
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
其中,myarchive.zip会解压放在已“myarchive.zip”的名称的路徑下。
用户能够使用#号给透過-files和-archives传递的文件和備份指定一个不同的名称。
比如:
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output
解析(walk-through)
WordCount应用是非常简單的。
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
StringTokenizer itr = new StringTokenizer(value.toString());
-
while (itr.hasMoreTokens()) {
-
word.set(itr.nextToken());
-
context.write(word, one);
-
}
-
}
Mapper透過map方法實现,在指定的textInputFormat的辅助下,一次處理一行。然后,透過StringTokenizer,將行按空格拆分成字符,且输出类似<<word>,1>的键值對。
對于给定的例子,第一个map的输出:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
第二个map输出:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
我们已經了解到了為一个给定job而衍生的很多map.至于如何以一种細粒度的方式控制它们,將会在手册的后面介绍。
job.setCombinerClass(IntSumReducer.class);
WordCount 同样指定了一个合并器。因此,在基于key值排序后,每一个map的输出都会传递给本地的合并器以實现本地聚合。
第一个map的输出:
< Bye, 1>
< Hello, 1>
< World, 2>`
第二个map的输出:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>`
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
int sum = 0;
-
for (IntWritable val : values) {
-
sum += val.get();
-
}
-
result.set(sum);
-
context.write(key, result);
-
}
Reducer透過reduce方法實现,目的是對值进行求和,也就是是计算每个键出现的次數(比如,在这个例子中就是單词)
因此,job的输出為:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>`
main 方法指定job的不同方面,比如输入/输出路徑(透過命令行传递),键/值类型,输入/输出格式等等。
在手册的后續我们將会学習更多關于Job,InputFormat,OutoutFormat和其他接口和类。
MapReduce-用户接口
这部分介绍MapReduce框架用户方面的一些細節。这能够帮助用户更細致的實施、配置和调節它们的job.但是,每个类/接口的javadoc依然是最详細可用的文档;这个仅仅是作為一个手册。
让我们首先来看看Mapper和Reducer接口。应用通常透過提供map和reduce方法来實现它们。
然后我们將会討論其他核心接口,包括Job,Partitioner,InputFormat,OutputFormat和其他等。
最后,我们会討論框架的一些用户特性,比如DistributedCache,IsolationRunner等。
应用通常實现Mapper和Reducer接口来提供map和reduce方法。这些组成了job的核心。
Mapper
Mapper 將输入的键/值對映射成一个中间键值對集。
映射是將输入記錄轉换成中间記錄的單个行為。轉换后的中间記錄并不需要和输入記錄类型相同。一个给定的输入對可能会映射成0个或更多的输出對。
Hadoop MapReduce框架给透過InputFormat產生的每个InputSplit衍生一个map任務。
总的来說,Mapper實现透過Job.setMapperClass(Class)方法传递给Job。进而,框架為InputSplit中的每个键/值對调用map(WritableComparable,Writable,Context)。应用能够重载cleanup(Context)方法来操作任何需要的清理。
输出(键值)對的类型不需要和输入對的类型相同。一个给定的输入對可能会映射成0个或更多的输出對。透過调用context.write(WritableComparable,Writable)来收集输出對。
应用能够使用Counter来報告它们的統计。
所有關联一个给定输出键的中间值随后会被框架分组,进而传递给Reducer来决定最后的输出。用户能够透過Job.setGroupingComparatorClass(Class)指定一个Comparator来控制分组。
Mapper输出会被排序进而分發给Reducer。分块的數量和job中reduce任務一样。用户能够透過實现一个订制的Partitioner来控制那些键(也就是記錄)分發给哪些Reducer。
用户能够選择透過Job.setCombinerClass(Class)指定一个combiner来對中间結果进行本地聚合,以帮助減少从Mapper传递给Reducer的資料的數量。
中间以及排序后的输出总是以一种简單的(键长,值长,值)格式存儲。应用能够控制是否,以及如何對输出进行压缩。且透過Configuration使用CompressionCodec。
有多少map?
Map的數量通常决定于输入的总尺寸,也就是输入文件的块的总數。
Map并行性的合适層級一般設置在每个節点大概10-100个maps,尽管對于很多不是很耗CPU資源的map任務可以設置高达300个map.任務启动需要花点时间,因此map花几分鐘后再执行的话是最好的。
因此,如果有10TB的输入資料,且block(块)的尺寸為128MB的话,你將獲得配備82000个map.除非你透過Configuration.set(MRJobConfig.NUM_MAPS, int)来將它設置的更高。
Reducer
Reducer 減少共享一个键的中间值,得到一个更小的值集。
用户透過Job.setNumReduceTasks(int)来設置job的reduce的數量。
总的来說,實施的Reducer透過Job.setReducerClass(Class)传递给Job。然后,框架给分组后的输入中的每一<key,(list of values)>對调用reduce(WritableCoparable,Iterable<Writable>,Context)方法。应用能够重载cleanup(Context)方法来操作任何所需的清理。
Reducer有3个主要阶段:清洗、排序和简化。
清洗
Reducer的输入是mapper排序后的输出。在这一阶段,框架透過HTTP从所有的mapper中抽取相關的分区。
排序
在这一阶段,框架按键對Reducer的输入进行分组(因為不同的mapper可能输出相同的键)。
清洗和排序阶段是同时發生的;map输出抽取后,就会进行合并。
二次排序
如果對中间值进行分组的等量規则需要和reduction前的键值分组的規则不同的话,可以透過Job.setSortComparatorClass(Class)方法指定一个Comparator。因為Job.setGroupingComparatorClass(Class)能够用来控制中间键如何分组,因此能够用来一起模擬對值进行二次排序。
简化
在这阶段,對分完组的输入的每一<key,(list of values)>對调用reduce(WritableComparable,Iterable<Writable>,Context)
Reduce任務的输出通常透過Context.write(WritableComparable,Writable)寫入到文件系統中。
应用能够使用Counter報告它的統计。
Reducer的输出是没經过排序。
多少Reduce?
如果使用的是0.95,map一完成,所有的reduce就会立馬發布并启动传输map的输出。如果使用1.75,最快的節点会完成第一輪的reduce并且發起第二波reduce来做更多更好的负载均衡工作。
增加reduce会增大框架的负载,但是能够负载均衡,且降低失败的代价。
無Reducer
如果無需規約(reduction),設置reduce的任務數為0是合理的。
这种情况map任務的输出会透過输出路徑直接存儲到文件系統中,其中输出路徑透過FileOutputFormat.setOutputPath(Job,Path)設置。框架在將map的输出寫到文件系統中去之前不会對输出进行排序。
分区器
分区器拆分键空间。
分区器控制對中间map输出中的键进行分区。键(或者键的子集)通常用来透過哈希函數来衍生分区,分区的总數和job中的reduce任務相同。因此,这控制中间键会發送到哪些reduce任務来进行規約(reduction).
HashPartitioner是默认的分区器。
Counter
Counter辅助MapReduce应用報告它的統计結果。
Mapper和Reducer實现能够使用Counter来報告統计。
Hadoop MapReduce捆绑一个有用的mapper,reducer和分区器库。
Job 配置
Job是主要接口,提供给用户為Hadoop框架去刻画一个供执行的MapReducejob。框架尝試照Job描述的方式如實的执行job。但是:
l 一些参數可能会被管理者标識為final,因此不能修改。
l 有一些job参數可以直接設置(比如,Job.setNumReduceTasks(int)),而其他一些参數与框架的其他部分或者job配置精密交互,这是很难設置的(比如,Configuration.set(JobContext.NUN_MAPS,int))
Job通常用来指定Mapper,合并器(如果有)、分区器、Reducer、InputFormat、OutputFormat實现。FileInputFormat指定输入文件集(FileInputFormat.setInputPaths(Job,Path…)/FileInputFormat.addInputPath(Job,Path))以及(FileInputFormat.setInputPaths(Job,String…)/FileInputFormat.addInputPaths(Job,String)),同时指定输出文件应该寫入的地址(FileOutFormat.setOutputPath(Path)).
可選地,Job能够用来指定job更高級的一些方面,比如使用的比較器(Comparator),用来存入DistributedCache的文档,是否要對中间或job输出进行压缩以及如何压缩;是否job任務能够以一种投机的(speculative)方式执行(setMapSpeculativeExecution(boolean)/setReduceSpeculativeExcution(Boolean));以及每个人物的尝試次數(setMaxMapAttempts(int)/setMaxReduceAttempts(int))等等。
当然,用户能够使用Configuration.set(String,String)/Configuration.get(String)来設置和獲取应用所需的專用参數。但是,對于大規模(只讀)資料,需要使用DistributedCache.
任務的执行和環境
MRAppMaster 將Mapper/Reducer任務作為一个子进程在一个独立的jvm上执行。
子任務繼承父MRAppMaster的環境。用户能够透過mapreduce.{map|reduce}.java.opts给子-JVM指定额外選项.
記憶體管理
用户/管理者統一能够透過使用mapreduce.{map|reduce}.memory.mb指定發布的子任務,以及任何迭代發布的子进程的最大虚擬記憶體。
Job提交和监控
Job是用户-job与ResourceManager交互的主要接口。
Job提供提交job的基本所需,跟踪job进程,訪問模块-任務的報告和日志,獲取MapReduce集群的状態信息等等。
job的提交涉及:
1. 检查job特定的输入和输出。
2. 為job计算InputSplit值。
3. 如果需要,為job的DistributedCache提供必须的计算信息。
4. 复制job的jar和配置到文件系統上的MapReduce系統目錄。
5. 提交job到Resourcemanager且選择性的监控它的状態。
通常,用户使用Job去創建应用,描述job的不同方面,提交job,并监控它的过程。
Job控制
用户通常需要链接MapReducejob来完成那些透過單一MapReducejob 不能完成的复雜任務。这相對还是比較容易的,因為job的输出通常会进入到分布式文件系統,而输出能輪流作為下一个job的输入。
但是,这同样以為着保存job完整(成功/失败)的责任就完全由客户端来承担。在这种情况下,不同的job控制選择有:
Job.submit():將job提交到集群并立即返回。
Job.waitForCompetion(boolean):提交job到集群并等待完成。
Job输入
InputFormat為一个MapReduce job描述输入指定
MapReduce框架依賴job的InputFormat:
1. 验證job的输入特性。
2. 拆分输入的文本成邏輯InputSplit實体,每个實体之后分配个一个單独的Mapper.
3. 提供RecordReader實现用来从邏輯InputSplit搜集输入記錄给Mapper處理。
基于文件的InputFormat實现,通常是FileInputFormat的子类的默认行為是基于总尺寸按比特將输入拆分邏輯InputSplit實体。但是,输入文件的FileSystem块尺寸会当作输入分区的一个上限。分区尺寸的下限可以透過mapreduce.input.fileinputformat.split.minsize設置。
顯然,因為記錄邊界必须考慮,所以基于输入尺寸的邏輯分区對于很多的应用是不足够的。在这样的例子中,应用必须實现一个RecordReader。RecordReader负责考慮記錄邊界且為个体任務呈现邏輯InputSplit的一个面向記錄的視圖。
TextInputFormat 是默认InputFormat.
如果TextInputFormat是一个给定job的InputFormat,框架会探测擴展名為.gz的输入文件,并使用合适的CompressionCodec自动解压它们。但是,具有以上擴展名的压缩文件不能够分区,每一个压缩文件会被一个單一的mapper当作整体處理。
InputSplit
InputSplit 代表單个Mapper要處理的資料。
通常,InputSplit代表输入的一个面向比特視圖,Recorder负责處理和呈现一个面向記錄的視圖。
FileSplit是默认的InputSplit。透過設置mapreduce.map.input.file為进行邏輯分区的输入文件。
RecordReader
RecordReader从一个InputSplit讀取<key,value>對。
通常,RecordReader轉换InputSplit提供的输入的面向比特視圖,并且呈现给Mapper一个面向記錄的視圖进行處理。总之,RecordReader的责任就是處理記錄邊界和以键值的形式呈现任務。
Job输出
OutputFormat為一个MapReduce job描述一个特定的输出。
MapReduce框架依賴job的outputFormat:
1. 验證job的输出特性;比如,检查输出路徑不存在。
2. 提供RecordWriter實现用来寫job的输出文件。输出文件保存在FileSystem中。
TextOutputFormat是默认的OutputFormat.
OutputCommitter
OutputCommitter為MapReduce job描述任務的提交。
MapReduce 框架依賴job的OutputCommitter:
1. 初始化阶段启动job.比如,在job的初始化阶段為job建立一个臨时的输出路徑。当job處于PREP状態且在初始化任務后,job是透過一个單独的任務启动的。一旦開启任務完成,job就会进入到運行状態。
2. 当job完成的时候清除job。比如,当job完成后移除臨时输出路徑。Job清楚是在job結束时由一个独立的任務完成的。当清除工作完成后,job会声明為SUCCEDED/FAILED/KILLED。
3. 启动任務臨时输出。任務启动在任務初始化时發生,是相同任務的一部分。
4. 检查是否一个任務需要一个提交。这样能够避免不需要提交的任務提交
5. 任務输出的提交。一旦任務完成,如果需要任務会提交它的输出。
6. 放弃任務提交。如果任務失败后者被杀,输出就会被清除。如果不能清除,会發布一个具有相同尝試id的單独任務来执行清除。
FileOutputCommitter 是默认OutCommitter.Job启动/清理任務占用map或者reduce容器,不管哪个在節点管理上都可行。JobCleanup任務、TaskCleanup 任務和JobSetup任務按从高到低,具有最高的優先級。
Task Side-EffectFiles
在一些应用,组合任務需要建立和寫邊文件(side-files)。邊文件和job输出文件(job-output)文件有区别。
RecordWriter
RedordWriter將输出<key,value>對寫入输出文件。
RecordWriter實现將job输出寫入到文件系統(FileSystem)
其他有用的特性
提交job到队列
用户提交job到队列(Queues).队列,作為job的容器,允許系統提供特定功能。比如,队列使用ACLs来控制哪些用户能够將job提交给他们。队列主要被Hadoop调度器使用。
Hadoop配置一个單一强制性队列,称之為“默认”。队列的名字透過mapreduce.job.queuename——Hadoop位置配置的属性来定義。一些job调度器,比如容器调度器(Capacity Scheduler),支持多队列。
定義队列的job需要透過mapreduce.job.queuename属性或者透過Configuration.set(MRJobConfig.QUEUE_NAME,String)API来提交。設置queue名称是可選的。如果一个job没有設置一个相關的队列名称,那就回設置為‘默认’队列。
Counters
Counters代表全局计算器,透過MapReduce框架或者应用定義。每一个计算器可以是任何枚舉类型。特定Enum的计算器会捆绑到计算机组中——Counters.Group
应用能够定義特定Counters,并且透過map和reduce方法的Counters.incrCounter(Enum,long)或Counters.incrCounter(String,String,long)更新。进而框架会汇集这些计算器。
DistributedCache
DistributedCache有效率的分發应用特定、大且只讀的文件。
DistributedCache是MapReduce框架提供的一个工具,用来快取应用需要快取的文件(文本,压缩文件,jar等待)。
应用在Job中透過urls(hdfs://)指定要快取的文件。DistributedCache假定透過hdfs://指定的文件已經存在在文件系統中(FileSystem)。
在job的任意任務在節点上执行时,框架都会將所需的文件复制到该隶属節点。这个过程的效率源于每个job只会复制文件一次,并且有能力快取那些在隶属節点上不解压的压缩文件。
DistributedCache跟踪快取文件的修改时间戳( modification timestamps)。顯然,当job执行的时候,快取文件不会被应用或者外部修改。
DistributedCache能够用来分發简單、只讀資料/文本文件或者更复雜类型,比如压缩文件后jar文件。压缩文件(zip,tar,tgz和tar.gz文件)是在隶属節点不解压的。文件有执行許可設定。
文件/压缩文件能够透過設置mapreduce.job.cache.{files|archives}来分發.如果超过一个文件/压缩文件分發,它们必须以逗号分開路徑来添加。同样可以透過API——Job.addCacheFile(URI)/Job.addCacheArchive(URI)和[Job.setCacheFiles[URI]] (../../api/org/apache/hadoop/mapreduce/Job.html)/ [Job.setCacheArchives(URI[])](../../api/org/apache/hadoop/mapreduce/Job.html) where URI is of the form hdfs://host:port/absolute-path\#link-name.在流中,文件能够透過命令行選项-cacheFile/-cacheArchive来分發。
DistributedCache能够作為初步的軟體分發机制用在map或(和)reduce任務重。DistributedCache能够用来分發jar库和原始库。Job.addArchiveToClassPath或Job.addFileToClassPath(Path) api用来快取文件/jar,且將它们添加到child-jvm的classpath中。这同样可以透過配置属性mapreduce.job.classpath.{files|archives}来實现。相同的,符号連接任務的工作路徑的快取文件能够用来分發源库和加载它们。
私有和公共分發快取文件
分發快取文件可以是私有的或者公共的。这决定它们怎么在从節点间共享。
l “私有”分發快取文件快取在一个本地路徑上,它對那些他们的job需要这些文件的用户是隐私的。这些文件只有在指定的用户的所有job和任務间共享而不能被从節点上的其他用户的job訪問。一个分發快取文件透過在它保存的文件系統——通常是HDFS上實现隐私性設定。如果文件没有可讀的訪問,或者装载文件的路徑没有可执行的接入来查詢,那么文件就会變成私有。
l “公有”分發快取文件快取在一个全局目錄中。對于所有的用户,这些文件都是可見的。这些文件可以被从節点上的所有用户的任務(task)和job共享。一个分發快取文件能够在保存它们的文件系統中實现設置為公共。如果文件可讀可訪問,且装载文件的路徑可执行可訪問来實现查詢,它们就是公有的。换句话說,如果用户想要使一个文件對所有用户公開可用,文件許可必须要設定為全局可讀且装载文件的路徑的目錄許可必须是全局可执行的。
调試(Debugging)
為了调試,MapReduce框架提供了一个工具来運行用户提供的脚本。比如說,当一个MapReduce任務失败时,一个用户能够運行一个调試脚本来處理任務日志。给定的脚本訪問任務的标准输出(stdout)、标准错誤输出(stderr)、系統日志(syslog)和job配置(jobconf)。调試脚本的标准输出和标准错誤输出的输出会展示在控制台的診斷区,同样也会作為jobUI的一部分。
下面会討論如果在一个job中提交debug脚本。脚本文件需要分發给提交给框架。
如何分發脚本文件
用户需要使用DistributedCache来分發和符号链接脚本文件。
如何提交脚本
快速提交调試脚本的一种方式是為调試map任務和reduce任務設置属性mapreduce.map.debug.srcipt和mapreduce.redue.debug.script的值。这些属性同样能够使用APIs Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT,String)和Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT,String)。在流模式下,还可以透過命令行選项-mapdebug和-reducedebug設定。
脚本的参數是:任務的标准输出,标准错誤输出,系統日志和job配置文件。在MapReduce任務失败的節点上执行的调試命令如下:
$script$stdout $stderr $syslog $jobconf
資料压缩
Hadoop MapReduce 提供给应用開發者一个指定中间map输出和job输出——即reduce输出压缩的工具。同时捆绑了zlib压缩算法的CompressionCodec實现。同样支持gzip,bzip2,snappy和lz4文件格式。
中间输出
应用透過Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS,Boolean)控制中间map输出的压缩,且透過Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODES,Class)api能使用CompressionCodec
Job输出
使用FileOutputFormat.setCompressOutput(Job,boolean)控制job输出的压缩;透過FileoutputFormat.setOutputComressorClass(Job,Class)api指定使用CompressionCodec。
忽略错誤記錄
Hadoop提供選择当在處理map输入的时候某些坏的输入記錄能够忽略。应用透過SkipBadRecords类能够控制这特性。
当map任務在某些输入广泛崩溃时能用到这一特性。这通常發生在map函數存在bug时。,通常用户必须修复这些bug。但是这并不总是可能。比方說,bug可能出现在源碼不可用的第三方库中。在这些情况下,即使多次尝試,任務也不能完全成功,最終job失败。如果具有上面的特性,只会損失坏記錄周邊的一小部分資料,这在一些应用中是可以接受的(比如說,在大量資料中执行統计分析时)
默认这一特性是關闭的。如果要開启它,需要SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducemaxSkipGroups(Configuration,long)
如果启动了这一特性框架在几次map失败后就会进入“忽略模式”。更多細節,查看SkipBadRecords.setAttemptsToStartSkipping(Configuration,int)。在“忽略模式”下,map任務保持一个要處理的記錄的范围。為了實现这一点,框架需要依賴處理記錄计算器。查看SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS和SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS。框架透過该计數器能够知道有多少記錄處理成功,因此,也就知道導致任務崩溃的記錄的范围。在进一步尝試时,記錄的这个范围会忽略。
忽略的記錄的數量取决于应用增加處理記錄计數器的频率。推荐在每个記錄處理后就增加该计數器。
為了增加人物尝試次數,可以使用 Job.setMaxMapAttempts(int) and Job.setMaxReduceAttempts(int)
實例:WordCount v2.0
这是一个更完整的WordCount_——采用了目前討論的MapReduce框架的很多特性。
这个例子需要启动并運行HDFS,特别是對于DistributedCache相關的特性。因此,这个例子只能工作在偽分布或全分布的Hadoop安装版本中。
源碼:
-
import java.io.BufferedReader;
-
import java.io.FileReader;
-
import java.io.IOException;
-
import java.net.URI;
-
import java.util.ArrayList;
-
import java.util.HashSet;
-
import java.util.List;
-
import java.util.Set;
-
import java.util.StringTokenizer;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.Counter;
-
import org.apache.hadoop.util.GenericOptionsParser;
-
import org.apache.hadoop.util.StringUtils;
-
-
public class WordCount2 {
-
-
public static class TokenizerMapper
-
extends Mapper<Object, Text, Text, IntWritable>{
-
-
static enum CountersEnum { INPUT_WORDS }
-
-
private final static IntWritable one = new IntWritable(1);
-
private Text word = new Text();
-
-
private boolean caseSensitive;
-
private Set<String> patternsToSkip = new HashSet<String>();
-
-
private Configuration conf;
-
private BufferedReader fis;
-
-
-
public void setup(Context context) throws IOException,
-
InterruptedException {
-
conf = context.getConfiguration();
-
caseSensitive = conf.getBoolean(“wordcount.case.sensitive”, true);
-
if (conf.getBoolean(“wordcount.skip.patterns”, true)) {
-
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
-
for (URI patternsURI : patternsURIs) {
-
Path patternsPath = new Path(patternsURI.getPath());
-
String patternsFileName = patternsPath.getName().toString();
-
parseSkipFile(patternsFileName);
-
}
-
}
-
}
-
-
private void parseSkipFile(String fileName) {
-
try {
-
fis = new BufferedReader(new FileReader(fileName));
-
String pattern = null;
-
while ((pattern = fis.readLine()) != null) {
-
patternsToSkip.add(pattern);
-
}
-
} catch (IOException ioe) {
-
System.err.println(“Caught exception while parsing the cached file ‘”
-
+ StringUtils.stringifyException(ioe));
-
}
-
}
-
-
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
String line = (caseSensitive) ?
-
value.toString() : value.toString().toLowerCase();
-
for (String pattern : patternsToSkip) {
-
line = line.replaceAll(pattern, “”);
-
}
-
StringTokenizer itr = new StringTokenizer(line);
-
while (itr.hasMoreTokens()) {
-
word.set(itr.nextToken());
-
context.write(word, one);
-
Counter counter = context.getCounter(CountersEnum.class.getName(),
-
CountersEnum.INPUT_WORDS.toString());
-
counter.increment(1);
-
}
-
}
-
}
-
-
public static class IntSumReducer
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
private IntWritable result = new IntWritable();
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
int sum = 0;
-
for (IntWritable val : values) {
-
sum += val.get();
-
}
-
result.set(sum);
-
context.write(key, result);
-
}
-
}
-
-
public static void main(String[] args) throws Exception {
-
Configuration conf = new Configuration();
-
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
-
String[] remainingArgs = optionParser.getRemainingArgs();
-
if (!(remainingArgs.length != 2 | | remainingArgs.length != 4)) {
-
System.err.println(“Usage: wordcount <in> <out> [-skip skipPatternFile]”);
-
System.exit(2);
-
}
-
Job job = Job.getInstance(conf, “word count”);
-
job.setJarByClass(WordCount2.class);
-
job.setMapperClass(TokenizerMapper.class);
-
job.setCombinerClass(IntSumReducer.class);
-
job.setReducerClass(IntSumReducer.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
-
List<String> otherArgs = new ArrayList<String>();
-
for (int i=0; i < remainingArgs.length; ++i) {
-
if (“-skip”.equals(remainingArgs[i])) {
-
job.addCacheFile(new Path(remainingArgs[++i]).toUri());
-
job.getConfiguration().setBoolean(“wordcount.skip.patterns”, true);
-
} else {
-
otherArgs.add(remainingArgs[i]);
-
}
-
}
-
FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
-
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
-
-
System.exit(job.waitForCompletion(true) ? 0 : 1);
-
}
-
}
運行實例
输入的文本文件样例:
$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02
$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World, Bye World!
$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.
運行这个应用:
bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output
输出:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1
注意这里的输入和第一版已經不一样。我们同样可以看到这影响到了输出。
现在,让我们透過DistributedCache,插入一个模式文件来列出所有要忽視的字符模式。
$ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to
再执行一次。这次附加上更多選项。
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
如所願,输出如下:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1
去掉大小寫敏感再運行一次:
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
bye 1
goodbye 1
hadoop 2
hello 2
horld 2
原文:http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core
数据评估
本站ai工具百寶箱提供的mapreduce都来源于网络,不保证外部链接的准确性和完整性,同时,对于该外部链接的指向,不由ai工具百寶箱实际控制,在2025年5月16日 下午11:44收录时,该网页上的内容,都属于合规合法,后期网页的内容如出现违规,可以直接联系网站管理员进行删除,ai工具百寶箱不承担任何责任。