mapreduce高阶内容(三) 自定义inputFormat

首页 » 大数据 » 正文

1、需求

现在有一些数据,将数据原封不动地输出到两个不同的文件夹当中。

2、分析

程序的关键点是要在一个mapreduce程序中根据数据输出结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现

3、实现

实现要点:

  1. 在mapreduce中访问外部资源
  2. 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()

第一步:自定义一个outputformat

package cn.laojiajun.myformat.demo3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MyOutputFormat extends FileOutputFormat {

    FSDataOutputStream path1_out;
    FSDataOutputStream path2_out;

    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        //从这个方法里面可以获取一个configuration
        Configuration configuration = taskAttemptContext.getConfiguration();

        try {
            //获取文件系统对象
            FileSystem fs = FileSystem.get(configuration);

            //输出文件1路径
            Path path1 = new Path("hdfs://192.168.88.3:8020/src_output1/1.txt");
            //输出文件2路径
            Path path2 = new Path("hdfs://192.168.88.3:8020/src_output2/2.txt");

            this.path1_out = fs.create(path1);
            this.path2_out = fs.create(path2);

        } catch (IOException e) {
            e.printStackTrace();
        }

        return new MyRecordWriter(path1_out,path2_out);
    }


}

第二步:自定义MyRecordWriter

package cn.laojiajun.myformat.demo3;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class MyRecordWriter extends RecordWriter {

    private FSDataOutputStream path1;
    private FSDataOutputStream path2;

    public MyRecordWriter(){}

    public MyRecordWriter(FSDataOutputStream path1,FSDataOutputStream path2) {
        this.path1 = path1;
        this.path2 = path2;
    }

    @Override
    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
        /**
         * 这个write方法就是往外写出去数据,我们可以根据这个key,来判断文件究竟往哪个目录下面写
         *
         * 这里可以写业务逻辑判断
         * 例如
         * 现在有一些订单的评论数据,需求,将订单的好评与差评进行区分开来,将最终的数据分开到不同的文件夹下面去
         *
         * 这里什么都不做,直接原封不动输出数据
         */
        path1.write(text.toString().getBytes());
        path1.write("/r/n".getBytes());
        path2.write(text.toString().getBytes());
        path2.write("/r/n".getBytes());
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        IOUtils.closeStream(path1);
        IOUtils.closeStream(path2);
    }
}

第三步:自定义mapper 和 reducer 逻辑

package cn.laojiajun.myformat.demo1;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyMapper extends Mapper {
    private Text filenameKey;

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(filenameKey, value);
    }

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit)split).getPath();
        filenameKey = new Text(path.toString());
    }
}
package cn.laojiajun.myformat.demo3;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyOutputReduce extends Reducer {

    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        // 输出key3 value3 类型 直接原封不动输出数据
        context.write(key,NullWritable.get());
    }
}

第四步:主运行程序

package cn.laojiajun.myformat.demo3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyOutputMain extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //注意,这里一定要设置!否则文件系统会找不到地址!教学视频和网站大多数资料是没有写上的!
        conf.set("fs.defaultFS","hdfs://192.168.88.3:8020");
        int run = ToolRunner.run(conf,new MyOutputMain(),args);
        System.exit(run);
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = super.getConf();

        Job job = Job.getInstance(conf,"ownOutputFormat");
        job.setJarByClass(MyOutputMain.class);

        TextInputFormat.addInputPath(job,new Path("hdfs://192.168.88.3:8020/test_dir"));
        job.setInputFormatClass(TextInputFormat.class);


        job.setMapperClass(MyOutputMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(MyOutputReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(MyOutputFormat.class);
        //下面这个路径随意填,因为已经自定义了MyOutputFormat文件输出路径了
        MyOutputFormat.setOutputPath(job,new Path("hdfs://192.168.88.3:8020/src_output1"));

        boolean b = job.waitForCompletion(true);

        return b?0:1;
    }
}

运行结果截图:

mapreduce高阶内容(三) 自定义inputFormat

mapreduce高阶内容(三) 自定义inputFormat

 

 

未经允许不得转载:作者:1643-劳同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《mapreduce高阶内容(三) 自定义inputFormat》 发布于2021-09-12

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录