hadoop离线 MapReduce中的分区和排序

1391-王同学

发表文章数:51

首页 » 大数据 » 正文

1、MapReduce的分区与reduceTask的数量

在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理,例如我们为了数据的统计,我们可以把一批类似的数据发 送到同一个reduce当中去,在同一个reduce当中统计相同类型的数据,就可以实现类似数据的分区,统计等
说白了就是相同类型的数据,送到一起去处理,在reduce当中默认分区只有1个。
MapReduce当中的分区类图

需求:将以下数据进行分开处理
详细数据参见partition.csv 这个文本文件,其中第五个字段表示开奖结果数值,现在需求将15以上的结果以及15以下的结果进行分开成两个文件进行保存

hadoop离线 MapReduce中的分区和排序

注意:分区的案例,只能打成jar包发布到集群上面去运行,本地模式已经不能正常运行了

第一步:定义我们的mapper

我们这里的mapper程序不做任何逻辑,也不对key,与value做任何改变,只是接收我们的数据,然后往下发送

public class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

第二步:定义我们的reducer逻辑

我们的reducer也不做任何处理,将我们的数据原封不动的输出即可

public class MyReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

第三步:自定义partitioner

/**
 * 这里的输入类型与我们map阶段的输出类型相同
 */
public class MyPartitioner extends Partitioner<Text,NullWritable>{
    /**
     * 返回值表示我们的数据要去到哪个分区
     * 返回值只是一个分区的标记,标记所有相同的数据去到指定的分区
     */
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
        String result = text.toString().split("/t")[5];
        System.out.println(result);
        if (Integer.parseInt(result) > 15){
            return 1;
        }else{
            return 0;
        }
    }
}

第四步:程序main函数入口

public class PartitionMain  extends Configured implements Tool {
    public static void main(String[] args) throws  Exception{
        int run = ToolRunner.run(new Configuration(), new PartitionMain(), args);
        System.exit(run);
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), PartitionMain.class.getSimpleName());
        job.setJarByClass(PartitionMain.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://192.168.52.100:8020/partitioner"));
        TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.52.100:8020/outpartition"));
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setReducerClass(MyReducer.class);
        /**
         * 设置我们的分区类,以及我们的reducetask的个数,注意reduceTask的个数一定要与我们的
         * 分区数保持一致
         */
        job.setPartitionerClass(MyPartitioner.class);
        job.setNumReduceTasks(2);
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

2、MapReduce排序以及序列化

3、计数器

计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到map 或reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。
hadoop内置计数器列表

MapReduce任务计数器 org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器 org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat计数器 org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat计数器 org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器 org.apache.hadoop.mapreduce.JobCounter

每次mapreduce执行完成之后,我们都会看到一些日志记录出来,其中最重要的一些日志记录如下截图

所有的这些都是MapReduce的计数器的功能,既然MapReduce当中有计数器的功能,我们如何实现自己的计数器???

需求:以上面排序以及序列化为案例,统计map接收到的数据记录条数

第一种方式定义计数器,通过context上下文对象可以获取我们的计数器,进行记录
通过context上下文对象,在map端使用计数器进行统计

public class SortMapper extends Mapper<LongWritable,Text,PairWritable,IntWritable> {

    private PairWritable mapOutKey = new PairWritable();
    private IntWritable mapOutValue = new IntWritable();

    @Override
    public  void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//自定义我们的计数器,这里实现了统计map数据数据的条数
        Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter");
        counter.increment(1L);

        String lineValue = value.toString();
        String[] strs = lineValue.split("/t");

        //设置组合key和value ==> <(key,value),value>
        mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
        mapOutValue.set(Integer.valueOf(strs[1]));
        context.write(mapOutKey, mapOutValue);
    }
}

运行程序之后就可以看到我们自定义的计数器在map阶段读取了七条数据

hadoop离线 MapReduce中的分区和排序
第二种方式定义计数器
通过enum枚举类型来定义计数器
统计reduce端数据的输入的key有多少个,对应的value有多少个

public class SortReducer extends Reducer<PairWritable,IntWritable,Text,IntWritable> {

    private Text outPutKey = new Text();
    public static enum Counter{
        REDUCE_INPUT_RECORDS, REDUCE_INPUT_VAL_NUMS,
    }
    @Override
    public void reduce(PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        context.getCounter(Counter.REDUCE_INPUT_RECORDS).increment(1L);
        //迭代输出
        for(IntWritable value : values) {
            context.getCounter(Counter.REDUCE_INPUT_VAL_NUMS).increment(1L);
            outPutKey.set(key.getFirst());
            context.write(outPutKey, value);
        }
    }
}

4、规约(combiner)

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一。

combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件

  • combiner 组件的父类就是 Reducer
  • combiner 和 reducer 的区别在于运行的位置:
  • combiner 是在每一个 maptask 所在的节点运行
  • Reducer 是接收全局所有 Mapper 的输出结果;
  • combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量

具体实现步骤:
1、自定义一个 combiner 继承 Reducer,重写 reduce 方法
2、在 job 中设置: job.setCombinerClass(CustomCombiner.class)

combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来

未经允许不得转载:作者:1391-王同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《hadoop离线 MapReduce中的分区和排序》 发布于2020-12-13

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录