位置:首页 > 大数据在线教程 > Hadoop在线教程 > MapReduce计数器和连接

MapReduce计数器和连接


在MapReduce的计数器是用于收集关于 MapReduce 工作的统计信息的机制。这个信息在MapReduce的作业处理的问题的诊断是很有用的。 计数器类似于将在 map 或 reduce 在代码日志信息中。

通常情况下,这些计数器在一个程序(map 或 reduce)中定义,当一个特定事件或条件(特定于该计数器)发生执行期间递增。计数器是一个很好的应用来从输入数据集跟踪有效和无效的记录。

有两种类型的计数器:

1. Hadoop 内置计数器: 有一些内置计数器存在每个作业中。下面是内置计数器组:

  • MapReduce任务计数器 - 收集任务的具体信息(例如,输入记录的数量)在它的执行期间。
  • 文件系统计数器 - 收集信息像由一个任务读取或写入的字节数
  • FileInputFormat计数器 - 收集通过FileInputFormat读取的字节数的信息
  • FileOutputFormat计数器 - 收集的字节数量的信息通过 FileOutputFormat 写入
  • Job 计数器- 这些计数器使用 JobTracker。它们收集统计数据包括如,任务发起了作业的数量。

2. 用户定义的计数器

除了内置的计数器,用户可以定义自己的计数器,通过使用编程语言提供了类似的功能。 例如,在 Java 的枚举用于定义用户定义的计数器。

一个MapClass例子使用计数器计算缺失和无效值的数量:

 
publicstaticclassMapClass
            extendsMapReduceBase
            implementsMapper<LongWritable, Text, Text, Text>
{
    staticenumSalesCounters { MISSING, INVALID };
    publicvoidmap ( LongWritable key, Text value,
                 OutputCollector<Text, Text> output,
                 Reporter reporter) throwsIOException
    {
        
        //Input string is split using ',' and stored in 'fields' array
        String fields[] = value.toString().split(",", -20);
        //Value at 4th index is country. It is stored in 'country' variable
        String country = fields[4];
        
        //Value at 8th index is sales data. It is stored in 'sales' variable
        String sales = fields[8];
      
        if(country.length() == 0) {
            reporter.incrCounter(SalesCounters.MISSING, 1);
        } elseif(sales.startsWith("\"")) {
            reporter.incrCounter(SalesCounters.INVALID, 1);
        } else{
            output.collect(newText(country), newText(sales + ",1"));
        }
    }
}

上面的代码片段显示在 Map Reduce 实现计数器的示例。

在这里,SalesCounters是用“枚举”定义的计数器。它被用来计算 MISSING 和 INVALID 的输入记录。

在代码段中,如果 “country” 字段的长度为零那么它的值丢失,因此相应的计数器 SalesCounters.MISSING 递增。

接下来,如果 “sales” 字段开头是符号 '' ,则记录被视为无效。这通过递增计数器 SalesCounters.INVALID 来表示。

MapReduce 连接

连接两个大的数据集可以使用 MapReduce Join 来实现。然而,这个过程需要编写大量的代码来执行实际的连接操作。

连接两个数据集开始是通过比较每个数据集的大小。如果因为相比其他数据集一个数据集小,那么小数据集被分布到集群中的每个数据节点。一旦分散,无论是 Mapper 或 Reducer 使用更小的数据集进行查找匹配的大型数据集的记录,然后结合这些记录,形成输出记录。

这取决于在实际连接进行的地方,这个连接分为:

1. 映射端连接 - 当该联接是由映射器执行的,它称为映射端链接。在这种类型中,联结前的数据由映射函数实际来消耗的处理。它是强制性的,输入到每个映射是在分区中的形式,并且是按排序顺序。另外,必须有一个相等数目的分区,它必须由连接键进行排序。

2. Reduce端连接- 当连接是通过减速器进行的,称为reduce端连接。没有必要在此连接有数据集中在以结构化形式(或分区)。

在这里,映射端的处理发出连接这两个表的关键字和对应的元组。作为该处理的效果,所有的元组相同连接键都落在相同的 reducer,然后使用相同的连接键连接记录。

整体处理流程示于下图。