Hadoop Writable深度复制及读取任意<key,value>序列文件
上次留了一个问题如何实现Writable的深度复制,上网找了下,还真有这个类,叫做WritableDeepCopier,可以在http://mvnrepository.com/artifact/org.apache.crunch/crunch/0.5.0-incubating 进行下载;下载导入,然后编程调用,但是如何调用?网上找了很多,但是都没有例子,哎,还是自己摸索吧,结果搞了一点时间还是不行,调用出错。然后就去看源码,它的deepCopy方法可以直接借鉴即可
package mahout.fansy.utils.read;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.DataInput;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.net.URI;import java.util.HashMap;import java.util.Map;import org.apache.crunch.CrunchRuntimeException;import org.apache.crunch.types.writable.WritableDeepCopier;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Writable;import org.apache.hadoop.util.ReflectionUtils;public class ReadArbiKV { /** * 读取任意<key,value>序列文件 */public static Configuration conf=new Configuration();public static WritableDeepCopier<Writable> wdc;static String fPath="";static String trainPath="";static{conf.set("mapred.job.tracker", "ubuntu:9001");fPath="hdfs://ubuntu:9000/home/mahout/mahout-work-mahout/labelindex"; // 数据文件}public static void main(String[] args) throws IOException {readFromFile(fPath);//readFromFile(trainPath);}/** * 读取序列文件 * @param fPath * @return * @throws IOException */public static Map<Writable,Writable> readFromFile(String fPath) throws IOException{FileSystem fs = FileSystem.get(URI.create(fPath), conf); Path path = new Path(fPath); Map<Writable,Writable> map=new HashMap<Writable,Writable>(); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); @SuppressWarnings("unchecked")Class<Writable> writableClassK=(Class<Writable>) reader.getKeyClass(); @SuppressWarnings("unchecked")Class<Writable> writableClassV=(Class<Writable>) reader.getValueClass(); while (reader.next(key, value)) { // Writable k=; // 如何实现Writable的深度复制? Writable k=deepCopy(key, writableClassK); // Writable 的深度复制 Writable v=deepCopy(value,writableClassV); map.put(k, v); // System.out.println(key.toString()+", "+value.toString()); // System.exit(-1);// 只打印第一条记录 } } finally { IOUtils.closeStream(reader); } return map;}/** * Writable 的深度复制 * 引自WritableDeepCopier * @param fPath * @return * @throws IOException */public static Writable deepCopy(Writable source,Class<Writable> writableClass) { ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(byteOutStream); Writable copiedValue = null; try { source.write(dataOut); dataOut.flush(); ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); DataInput dataInput = new DataInputStream(byteInStream); copiedValue = writableClass.newInstance(); copiedValue.readFields(dataInput); } catch (Exception e) { throw new CrunchRuntimeException("Error while deep copying " + source, e); } return copiedValue; }}
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990