首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

地图reduce 实现内连接,左连接,右连接,全连接,反连接

2013-11-23 
mapreduce 实现内连接,左连接,右连接,全连接,反连接测试数据more user.txt (用户id,用户名)1用户12用户23

mapreduce 实现内连接,左连接,右连接,全连接,反连接

测试数据more user.txt (用户id,用户名)1用户12用户23用户3more post.txt (用户id,帖子id,标题)11贴子112贴子223帖子344贴子455贴子556贴子657贴子7

?

?

查询结果内连接1用户111贴子11用户112贴子22用户223帖子3左外连接1用户111贴子11用户112贴子22用户223帖子33用户3   右外连接1用户111贴子11用户112贴子22用户223帖子3  44贴子4  55贴子5  56贴子6  57贴子7全外连接1用户111贴子11用户112贴子22用户223帖子33用户3    44贴子4  55贴子5  56贴子6  57贴子7反连接3用户3     44贴子4  55贴子5  56贴子6  57贴子7

?

代码如下:

package mapreduce.pattern.join;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import java.util.ArrayList;import java.util.List;import multiinput.post.PostJob;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** * mapreduce 实现内连接,左连接,右连接,全连接,反连接 * user.txt 用户表 * post.txt 帖子表 * 关联字段 userId * @author wxj * */public class UserAndPostJoinJob{static class UserAndPostWritable implements Writable{/** * 类型 U表示用户,P表示帖子 */private String type;private String data;public UserAndPostWritable(){}public UserAndPostWritable(String type, String data){super();this.type = type;this.data = data;}public String getType(){return type;}public void setType(String type){this.type = type;}public String getData(){return data;}public void setData(String data){this.data = data;}@Overridepublic void readFields(DataInput input) throws IOException{type = input.readUTF();data = input.readUTF();}@Overridepublic void write(DataOutput output) throws IOException{output.writeUTF(type);output.writeUTF(data);}}static class UserMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable>{@Overrideprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{String[] arr = value.toString().split("\t");Text userId = new Text(arr[0]);context.write(userId, new UserAndPostWritable("U",value.toString()));}}static class PostMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable>{@Overrideprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{String[] arr = value.toString().split("\t");Text userId = new Text(arr[0]);context.write(userId, new UserAndPostWritable("P",value.toString()));System.out.println(userId);}}static class PostReducer extends Reducer<Text, UserAndPostWritable, Text, Text> {private List<Text> users = new ArrayList<Text>();private List<Text> posts = new ArrayList<Text>();private String joinType; @Overrideprotected void setup(Context context) throws IOException,InterruptedException{super.setup(context);joinType = context.getConfiguration().get("joinType");//System.out.println("joinType: " + joinType);}protected void reduce(Text key, Iterable<UserAndPostWritable> iterable,Context context)throws IOException, InterruptedException {users.clear();posts.clear(); for(UserAndPostWritable data : iterable) { //System.out.println(data.getType() + "," + data.getData()); if(data.getType().equals("U")) { users.add(new Text(data.getData())); } else {posts.add(new Text(data.getData()));} }  if(joinType.equals("innerJoin"))//内连接 { if(users.size() > 0 && posts.size() > 0) { for(Text user : users) { for(Text post : posts) { context.write(new Text(user),new Text(post)); } } } } else if(joinType.equals("leftOuter"))//左外连接 { for(Text user : users) { if(posts.size() > 0) { for(Text post : posts) { context.write(new Text(user),new Text(post)); } } else { context.write(new Text(user),createEmptyPost());} } } else if(joinType.equals("rightOuter"))//右外连接 { for(Text post : posts) { if(users.size() > 0) { for(Text user : users) { context.write(new Text(user),new Text(post)); } } else {context.write(createEmptyUser(), post);} } } else if(joinType.equals("allOuter"))//全外连接 { if(users.size() > 0) { for(Text user : users) { if(posts.size() > 0) { for(Text post : posts) { context.write(new Text(user),new Text(post)); } } else{ context.write(new Text(user),createEmptyUser()); } } }else { for(Text post : posts) { if(users.size() > 0) { for(Text user : users) { context.write(new Text(user),new Text(post)); } } else{ context.write(createEmptyUser(), post); } }} } else if(joinType.equals("anti"))//反连接 { if(users.size() == 0 ^ posts.size() == 0) { for(Text user : users) { context.write(new Text(user),createEmptyPost()); } for(Text post : posts) { context.write(createEmptyUser(),new Text(post)); } } } }private Text createEmptyUser(){return new Text(" \t ");}private Text createEmptyPost(){return new Text(" \t \t ");}  }  public static void main(String[] args) {Configuration configuration = new Configuration();try{FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),configuration);Job job = new Job(configuration);job.setJarByClass(PostJob.class);//设置连接类型//innerJoin,leftOuter,rightOuter,allOuter,antijob.getConfiguration().set("joinType", "anti");//设置输出到part-r-00000时的分隔符job.getConfiguration().set("mapred.textoutputformat.separator", "\t");MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/user.txt"),TextInputFormat.class,UserMapper.class);MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/post.txt"), TextInputFormat.class, PostMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(UserAndPostWritable.class);job.setReducerClass(PostReducer.class);    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(Text.class);Path outPath = new Path("hdfs://master:9000/output/userandpost");if(fs.exists(outPath)){fs.delete(outPath,true);}TextOutputFormat.setOutputPath(job, outPath);job.setOutputFormatClass(TextOutputFormat.class);job.waitForCompletion(true);} catch (Exception e){e.printStackTrace();} }}

??

?

?

热点排行