首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > 云计算 >

Mapreduce中的RCFile输出RCFileOutputFormat兑现及其应用

2012-09-04 
Mapreduce中的RCFile输出RCFileOutputFormat实现及其应用自定义实现RCFileOutputFormat.java import java.

Mapreduce中的RCFile输出RCFileOutputFormat实现及其应用
自定义实现RCFileOutputFormat.java 

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.compress.DefaultCodec;import org.apache.hadoop.hive.ql.io.RCFile;import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;/** * RCFileOutputFormat. *  */public class RCFileOutputFormatextendsFileOutputFormat<WritableComparable<LongWritable>, BytesRefArrayWritable> {/** * set number of columns into the given configuration. *  * @param conf *            configuration instance which need to set the column number * @param columnNum *            column number for RCFile's Writer *  */public static void setColumnNumber(Configuration conf, int columnNum) {assert columnNum > 0;conf.setInt(RCFile.COLUMN_NUMBER_CONF_STR, columnNum);}/** * Returns the number of columns set in the conf for writers. *  * @param conf * @return number of columns for RCFile's writer */public static int getColumnNumber(Configuration conf) {return conf.getInt(RCFile.COLUMN_NUMBER_CONF_STR, 0);}@Overridepublic RecordWriter<WritableComparable<LongWritable>, BytesRefArrayWritable> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException {Configuration conf = arg0.getConfiguration();conf.setBoolean("mapred.output.compress", true);Path outputPath = FileOutputFormat.getOutputPath(arg0);FileSystem fs = outputPath.getFileSystem(conf);if (!fs.exists(outputPath)) {fs.mkdirs(outputPath);}Path file = getDefaultWorkFile(arg0, "");CompressionCodec codec = null;if (getCompressOutput(arg0)) {Class<?> codecClass = getOutputCompressorClass(arg0,DefaultCodec.class);codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,conf);}final RCFile.Writer out = new RCFile.Writer(fs, conf, file, null, codec);return new RecordWriter<WritableComparable<LongWritable>, BytesRefArrayWritable>() {@Overridepublic void write(WritableComparable<LongWritable> key,BytesRefArrayWritable value) throws IOException {out.append(value);}@Overridepublic void close(TaskAttemptContext arg0) throws IOException,InterruptedException {out.close();}};}}
应用:
 job.setOutputFormatClass(RCFileOutputFormat.class); job.setOutputValueClass(BytesRefArrayWritable.class);
 BytesRefArrayWritable values = new BytesRefArrayWritable(COLUMNS); values.set(0, new BytesRefWritable(fuid.getBytes())); values.set(1, new BytesRefWritable(this.sid.getBytes())); values.set(2, new BytesRefWritable(this.times.getBytes()); context.write(new Text(this.uid), values);

热点排行