在MapReduce的计数器是用于收集关于 MapReduce 工作的统计信息的机制。这个信息在MapReduce的作业处理的问题的诊断是很有用的。 计数器类似于将在 map 或 reduce 在代码日志信息中。
通常情况下,这些计数器在一个程序(map 或 reduce)中定义,当一个特定事件或条件(特定于该计数器)发生执行期间递增。计数器是一个很好的应用来从输入数据集跟踪有效和无效的记录。
有两种类型的计数器:
1. Hadoop 内置计数器: 有一些内置计数器存在每个作业中。下面是内置计数器组:
- MapReduce任务计数器 - 收集任务的具体信息(例如,输入记录的数量)在它的执行期间。
- 文件系统计数器 - 收集信息像由一个任务读取或写入的字节数
- FileInputFormat计数器 - 收集通过FileInputFormat读取的字节数的信息
- FileOutputFormat计数器 - 收集的字节数量的信息通过 FileOutputFormat 写入
- Job 计数器- 这些计数器使用 JobTracker。它们收集统计数据包括如,任务发起了作业的数量。
2. 用户定义的计数器
除了内置的计数器,用户可以定义自己的计数器,通过使用编程语言提供了类似的功能。 例如,在 Java 的枚举用于定义用户定义的计数器。
一个MapClass例子使用计数器计算缺失和无效值的数量:
|
publicstaticclassMapClass
extendsMapReduceBase
implementsMapper<LongWritable, Text, Text, Text>
{
staticenumSalesCounters { MISSING, INVALID };
publicvoidmap ( LongWritable key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throwsIOException
{
//Input string is split using ',' and stored in 'fields' array
String fields[] = value.toString().split(",", -20);
//Value at 4th index is country. It is stored in 'country' variable
String country = fields[4];
//Value at 8th index is sales data. It is stored in 'sales' variable
String sales = fields[8];
if(country.length() == 0) {
reporter.incrCounter(SalesCounters.MISSING, 1);
} elseif(sales.startsWith("\"")) {
reporter.incrCounter(SalesCounters.INVALID, 1);
} else{
output.collect(newText(country), newText(sales + ",1"));
}
}
}
|
上面的代码片段显示在 Map Reduce 实现计数器的示例。
在这里,SalesCounters是用“枚举”定义的计数器。它被用来计算 MISSING 和 INVALID 的输入记录。
在代码段中,如果 “country” 字段的长度为零那么它的值丢失,因此相应的计数器 SalesCounters.MISSING 递增。
接下来,如果 “sales” 字段开头是符号 '' ,则记录被视为无效。这通过递增计数器 SalesCounters.INVALID 来表示。
MapReduce 连接
连接两个大的数据集可以使用 MapReduce Join 来实现。然而,这个过程需要编写大量的代码来执行实际的连接操作。
连接两个数据集开始是通过比较每个数据集的大小。如果因为相比其他数据集一个数据集小,那么小数据集被分布到集群中的每个数据节点。一旦分散,无论是 Mapper 或 Reducer 使用更小的数据集进行查找匹配的大型数据集的记录,然后结合这些记录,形成输出记录。
这取决于在实际连接进行的地方,这个连接分为:
1. 映射端连接 - 当该联接是由映射器执行的,它称为映射端链接。在这种类型中,联结前的数据由映射函数实际来消耗的处理。它是强制性的,输入到每个映射是在分区中的形式,并且是按排序顺序。另外,必须有一个相等数目的分区,它必须由连接键进行排序。
2. Reduce端连接- 当连接是通过减速器进行的,称为reduce端连接。没有必要在此连接有数据集中在以结构化形式(或分区)。
在这里,映射端的处理发出连接这两个表的关键字和对应的元组。作为该处理的效果,所有的元组相同连接键都落在相同的 reducer,然后使用相同的连接键连接记录。
整体处理流程示于下图。