Hadoop MapReduce
MapReduce它可以编写应用程序来处理海量数据,并行,大集群的普通硬件,以可靠的方式的框架。
MapReduce是什么?
MapReduce是一种处理技术和程序模型基于Java的分布式计算。 MapReduce算法包含了两项重要任务,即Map 和 Reduce。Map采用了一组数据,并将其转换成另一组数据,其中,各个元件被分解成元组(键/值对)。其次,减少任务,这需要从Map 作为输入并组合那些数据元组成的一组小的元组输出。作为MapReduce暗示的名称的序列在Map作业之后执行reduce任务。
MapReduce主要优点是,它很容易大规模数据处理在多个计算节点。下面MapReduce模型中,数据处理的原语被称为映射器和减速器。分解数据处理应用到映射器和减速器有时是普通的。但是编写MapReduce形式的应用,扩展应用程序运行在几百,几千,甚至几万机集群中的仅仅是一个配置的更改。这个简单的可扩展性是吸引了众多程序员使用MapReduce模型。
算法
-
通常MapReduce范例是基于向发送计算机数据的位置!
-
MapReduce计划分三个阶段执行,即映射阶段,shuffle阶段,并减少阶段。
-
映射阶段:映射或映射器的工作是处理输入数据。一般输入数据是在文件或目录的形式,并且被存储在Hadoop的文件系统(HDFS)。输入文件被传递到由线映射器功能线路。映射器处理该数据,并创建数据的若干小块。
-
减少阶段:这个阶段是:Shuffle阶段和Reduce阶段的组合。减速器的工作是处理该来自映射器中的数据。处理之后,它产生一组新的输出,这将被存储在HDFS。
-
-
在一个MapReduce工作,Hadoop的发送Map和Reduce任务到集群的相应服务器。
-
框架管理数据传递例如发出任务的所有节点之间的集群周围的详细信息,验证任务完成,和复制数据。
-
大部分的计算发生在与在本地磁盘上,可以减少网络通信量数据的节点。
-
给定的任务完成后,将群集收集并减少了数据,以形成一个合适的结果,并且将其发送回Hadoop服务器。
输入和输出(Java透视图)
MapReduce框架上的<key, value>对操作,也就是框架视图的输入工作作为一组<key, value>对,并产生一组<key, value>对作为作业的输出可以在不同的类型。
键和值类在框架连载的方式,因此,需要实现接口。此外,键类必须实现可写,可比的接口,以方便框架排序。MapReduce工作的输入和输出类型:(输入)<k1, v1> ->映射 - ><k2, v2>-> reduce - ><k3, v3>(输出)。
输入 | 输出 | |
---|---|---|
Map | <k1, v1> | list (<k2, v2>) |
Reduce | <k2, list(v2)> | list (<k3, v3>) |
术语
-
PayLoad - 应用程序实现映射和减少功能,形成工作的核心。
-
Mapper - 映射器的输入键/值对映射到一组中间键/值对。
-
NamedNode - 节点管理Hadoop分布式文件系统(HDFS)。
-
DataNode - 节点数据呈现在任何处理发生之前。
-
MasterNode - 节点所在JobTracker运行并接受来自客户端作业请求。
-
SlaveNode - 节点所在Map和Reduce程序运行。
-
JobTracker - 调度作业并跟踪作业分配给任务跟踪器。
-
Task Tracker - 跟踪任务和报告状态的JobTracker。
-
Job -程序在整个数据集映射器和减速的执行。
-
Task - 一个映射程序的执行或对数据的一个片段的减速器。
-
Task Attempt - 一种尝试的特定实例在SlaveNode执行任务。
示例场景
下面给出是关于一个组织的电消耗量的数据。它包含了每月的用电量,各年的平均。
Jan | Feb | Mar | Apr | May | Jun | Jul | Aug | Sep | Oct | Nov | Dec | Avg | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
如果上述数据作为输入,我们需要编写应用程序来处理它而产生的结果,如发现最大使用量,最低使用年份,依此类推。这是一个轻松取胜用于记录有限数目的编程器。他们将编写简单地逻辑,以产生所需的输出,并且将数据传递到写入的应用程序。
但是,代表一个特定状态下所有的大规模产业的电力消耗数据。
当我们编写应用程序来处理这样的大量数据,
- 他们需要大量的时间来执行。
- 将会有一个很大的网络流量,当我们将数据从源到网络服务器等。
为了解决这些问题,使用MapReduce框架。
输入数据
上述数据被保存为 sample.txt 并作为输入。输入文件看起来如下所示。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25 1980 26 27 28 28 28 30 31 31 31 30 30 30 29 1981 31 32 32 32 33 34 35 36 36 34 34 34 34 1984 39 38 39 39 39 41 42 43 40 39 38 38 40 1985 38 39 39 39 39 41 41 41 00 40 39 39 45
示例程序
下面给出的是使用MapReduce框架的样本数据的程序。
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,/*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line,"\t"); String year = s.nextToken(); while(s.hasMoreTokens()) { lasttoken=s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce( Text key, Iterator <IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg=30; int val=Integer.MIN_VALUE; while (values.hasNext()) { if((val=values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(Eleunits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
保存上述程序作为ProcessUnits.java。编译和执行的程序如下的说明。
编译和执行进程单位程序
让我们假设是在Hadoop的用户(如/home/hadoop)的主目录。
按照下面给出编译和执行上面程序的步骤。
第1步
下面的命令是创建一个目录来存储编译的Java类。
$ mkdir units
第2步
下载Hadoop-core-1.2.1.jar,它用于编译和执行MapReduce程序。访问以下链接 http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下载JAR。假设下载的文件夹是 /home/hadoop/.
第3步
下面的命令用于编译ProcessUnits.java程序并创建一个jar程序。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/ .
第4步
下面的命令用来创建一个输入目录在HDFS中。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
第5步
下面的命令用于复制命名sample.txt在HDFS输入目录中输入文件。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
第6步
下面的命令用来验证在输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
第7步
下面的命令用于通过从输入目录以输入文件来运行Eleunit_max应用。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段时间,直到执行文件。在执行后,如下图所示,输出将包含输入分割的数目,映射任务数,减速器任务的数量等。
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=61 FILE: Number of bytes written=279400 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=546 HDFS: Number of bytes written=40 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=146137 Total time spent by all reduces in occupied slots (ms)=441 Total time spent by all map tasks (ms)=14613 Total time spent by all reduce tasks (ms)=44120 Total vcore-seconds taken by all map tasks=146137 Total vcore-seconds taken by all reduce tasks=44120 Total megabyte-seconds taken by all map tasks=149644288 Total megabyte-seconds taken by all reduce tasks=45178880 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=67 Input split bytes=208 Combine input records=5 Combine output records=5 Reduce input groups=5 Reduce shuffle bytes=6 Reduce input records=5 Reduce output records=5 Spilled Records=10 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=948 CPU time spent (ms)=5160 Physical memory (bytes) snapshot=47749120 Virtual memory (bytes) snapshot=2899349504 Total committed heap usage (bytes)=277684224 File Output Format Counters Bytes Written=40
第8步
下面的命令用来验证在输出文件夹所得文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
第9步
下面的命令是用来查看输出Part-00000文件。该文件由HDFS产生。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
下面是由MapReduce的程序所产生的输出。
1981 34 1984 40 1985 45
第10步
以下命令用于从HDFS输出文件夹复制到本地文件系统进行分析。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
重要命令
所有的Hadoop命令是由$HADOOP_HOME/bin/hadoop命令调用。不带任何参数运行Hadoop脚本打印所有命令的描述。
Usage : hadoop [--config confdir] COMMAND
下表列出了可用的选项及其说明。
操作 | 描述 |
---|---|
namenode -format | 格式化DFS文件系统。 |
secondarynamenode | 运行DFS二次名称节点。 |
namenode | 运行DFS名称节点。 |
datanode | 运行DFS的Datanode。 |
dfsadmin | 运行DFS管理客户端。 |
mradmin | 运行映射,减少管理客户端。 |
fsck | 运行DFS文件系统检查工具。 |
fs | 运行一个通用的文件系统的用户客户端。 |
balancer | 运行集群平衡工具。 |
oiv | 适用于离线FsImage查看器的fsimage。 |
fetchdt | 从NameNode获取团令牌。 |
jobtracker | 运行MapReduce工作跟踪节点。 |
pipes | 运行管道的工作。 |
tasktracker | 运行MapReduce任务跟踪节点。 |
historyserver | 运行作业历史记录服务器作为一个独立的守护进程。 |
job | 操纵MapReduce工作。 |
queue | 获取有关作业队列信息。 |
version | 打印版本。 |
jar <jar> | 运行一个jar文件。 |
distcp <srcurl> <desturl> | 复制文件或目录的递归。 |
distcp2 <srcurl> <desturl> | DistCp第2版。 |
archive -archiveName NAME -p | 创建一个Hadoop的归档。 |
<parent path> <src>* <dest> | |
classpath | 打印需要得到Hadoop jar和所需要的库的类路径。 |
daemonlog | 为每个守护进程获取/设置日志级别 |
如何与MapReduce工作互动
Usage: hadoop job [GENERIC_OPTIONS]
以下是在一个Hadoop的作业可用通用选项。
GENERIC_OPTIONS | 描述 |
---|---|
-submit <job-file> | 提交作业。 |
status <job-id> | 打印映射,并减少完成的百分比以及所有的工作的计数器。 |
counter <job-id> <group-name> <countername> | 打印的计数器值。 |
-kill <job-id> | 终止任务。 |
-events <job-id> <fromevent-#> <#-of-events> | 打印接收到JobTracker为给定范围内的事件的详细信息。 |
-history [all] <jobOutputDir> - history < jobOutputDir> | 打印作业的详细信息,未能终止提示详细信息。有关作业的更多详细信息,如每个任务取得成功的任务,任务可以尝试通过指定[all]选项中查看。 |
-list[all] | 显示所有作业。-list 只显示尚未完成的作业。 |
-kill-task <task-id> | 终止任务。终止任务不计入失败的尝试。 |
-fail-task <task-id> | 失败的任务。失败的任务都算对失败的尝试。 |
set-priority <job-id> <priority> | 更改作业的优先级。允许优先级值:VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
要查看作业的状态
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
要查看作业历史在output-dir
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g. $ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
终止任务
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004