首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > 云计算 >

地图reduce实例-Join连接 (reduce Side Join)

2013-09-07 
mapreduce实例-Join连接 (reduce Side Join)public class ReduceSideJoin extends Configured implements

mapreduce实例-Join连接 (reduce Side Join)

public class ReduceSideJoin extends Configured implements Tool {    public static class UserJoinMapper extends Mapper<Object, Text, Text, Text> {        private Text outkey = new Text();        private Text outvalue = new Text();        @Override        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {            try {                String[] sp = value.toString().split(",");                String userid = sp[0];                outkey.set(userid);                outvalue.set("A" + value.toString());                context.write(outkey, outkey);            } catch (Exception e) {                context.getCounter("UserJoinMapper", "errorlog").increment(1);            }        }    }    public static class CommentJoinMapper extends Mapper<Object, Text, Text, Text> {        private Text outkey = new Text();        private Text outvalue = new Text();        @Override        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {            try {                String[] sp = value.toString().split(",");                String userid = sp[0];                outkey.set(userid);                outvalue.set("B" + value.toString());                context.write(outkey, outkey);            } catch (Exception e) {                context.getCounter("UserJoinMapper", "errorlog").increment(1);            }        }    }    public static class UserJoinReducer extends Reducer<Text, Text, Text, Text> {        private static final Text EMPTY_TEXT = new Text("");        private ArrayList<Text> listA = new ArrayList<Text>();        private ArrayList<Text> listB = new ArrayList<Text>();        private String joinType = null;        @Override        protected void setup(Context context) throws IOException, InterruptedException {            joinType = context.getConfiguration().get("join.type");        }        @Override        protected void reduce(Text key, Iterable<Text> values, Context context)                throws IOException, InterruptedException {            listA.clear();            listB.clear();            for (Text value : values) {                if (value.charAt(0) == 'A') {                    listA.add(new Text(value.toString().substring(1)));                } else if (value.charAt(0) == 'B') {                    listB.add(new Text(value.toString().substring(1)));                }            }            executeJoinLogic(context);        }        //根据连接类型做不同处理        private void executeJoinLogic(Context context) throws IOException, InterruptedException {            if (joinType.equalsIgnoreCase("inner")) {                if (listA.isEmpty() && listB.isEmpty()) {                    for (Text A : listA) {                        for (Text B : listB) {                            context.write(A, B);                        }                    }                }            } else if (joinType.equalsIgnoreCase("leftouter")) {                for (Text A : listA) {                    if (!listB.isEmpty()) {                        for (Text B : listB) {                            context.write(A, B);                        }                    } else {                        context.write(A, EMPTY_TEXT);                    }                }            } else if (joinType.equalsIgnoreCase("rightouter")) {                for (Text B : listB) {                    if (!listA.isEmpty()) {                        for (Text A : listA) {                            context.write(A, B);                        }                    } else {                        context.write(EMPTY_TEXT, B);                    }                }            } else if (joinType.equalsIgnoreCase("fullouter")) {                if (!listA.isEmpty()) {                     for (Text A : listA) {                        if (!listB.isEmpty()) {                            for (Text B : listB) {                                context.write(A, B);                            }                        }else{                            context.write(A, EMPTY_TEXT);                        }                    }                } else {                    for (Text B : listB) {                       context.write(EMPTY_TEXT, B);                    }                }            } else if (joinType.equalsIgnoreCase("anti")) {                if (listA.isEmpty() ^ listB.isEmpty()) {                    for (Text A : listA) {                        context.write(A, EMPTY_TEXT);                    }                    for (Text B : listB) {                        context.write(EMPTY_TEXT, B);                    }                }            }        }    }    @Override    public int run(String[] args) throws Exception {        Configuration conf = getConf();        conf.set("join.type", args[2]);        Job job = new Job(conf, "ReduceSideJoin");        job.setJarByClass(ReduceSideJoin.class);                job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);                //设置不同map处理不同输入                MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserJoinMapper.class);        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CommentJoinMapper.class);                FileOutputFormat.setOutputPath(job, new Path(args[3]));        job.setOutputFormatClass(TextOutputFormat.class);        job.setReducerClass(UserJoinReducer.class);        return job.waitForCompletion(true) ? 0 : 1;    }    public static void main(String[] args) throws IOException, InterruptedException {        try {            if (args.length < 4) {                System.err.println("ERROR: Parameter format length ");                System.exit(0);            }            int ret = ToolRunner.run(new ReduceSideJoin(), args);            System.exit(ret);        } catch (Exception e) {            e.printStackTrace();        }    }}

外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接。

热点排行