首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > 云计算 >

从MergeSort瞧HBase 二

2013-01-18 
从MergeSort看HBase 二接着前一篇 从MergeSort看HBase 一:http://blog.csdn.net/mrtitan/article/details/

从MergeSort看HBase 二

接着前一篇 从MergeSort看HBase 一:http://blog.csdn.net/mrtitan/article/details/8457080

 

在hbase中scan是最核心的算法,从直接的读操作get,scan到一些hbase的大操作compact、split、flush都是使用各种各样的scanner实现的,可以说region的一切读操作都是scan。scan的实现可以参考:http://blog.csdn.net/mrtitan/article/details/8284569

而其实scan也使用了mergesort的思想,下面我们模拟hbase中scanner实现从多个各自有序的文件中以流的方式顺序读出。

 

SortedFile类

public class SortedFile{    private String filePath;    public SortedFile(String path){        this.filePath = path;    }public Reader getReader() throws IOException {return new Reader(this.filePath);}public Writer getWriter() throws IOException {return new Writer(this.filePath);}}

SortedFile的writer类,通过writer的initRandom方法会产生大量随即数,排序后依次存入文件:

public class Writer {    private RandomAccessFile writeFile;    private FileChannel writeChannel;    private MappedByteBuffer writeMbb;    private final int FILE_SIZE = 128;    private final int INT_SIZE = 4;    public Writer(String filePath) throws IOException {        writeFile = new RandomAccessFile(filePath, "rw");        writeChannel = writeFile.getChannel();        writeMbb = writeChannel.map(FileChannel.MapMode.READ_WRITE,0,FILE_SIZE);    }public void initFileRandom(){Random random = new Random();        int write_time = FILE_SIZE / INT_SIZE;        List<Integer> valueList = new ArrayList<Integer>(write_time);        for(int i=0;i<write_time;i++){            int randomValue = Math.abs(random.nextInt(100000));            valueList.add(randomValue);        }        Collections.sort(valueList);        for (Integer value : valueList) {            writeMbb.putInt(value);        }}    public void close() throws IOException {        if (writeMbb != null) {            writeMbb.force();            unmap(writeMbb);        }        closeResource(writeChannel);        closeResource(writeFile);    }    private void unmap(MappedByteBuffer buffer)    {        if (buffer == null) return;        sun.misc.Cleaner cleaner = ((DirectBuffer) buffer).cleaner();        if (cleaner != null) {            cleaner.clean();        }    }    private void closeResource(Closeable c) throws IOException {        if (c != null) {            c.close();        }    }}

SortedFile的Reader类,提供scanner依次读出文件中的数字。

public class Reader {    private RandomAccessFile readFile;    private String filePath;public Reader(String filePath) throws IOException {        readFile = new RandomAccessFile(filePath, "rw");        this.filePath = filePath;}    public Scanner getScanner() throws IOException {        return new Scanner(readFile);    }    class Scanner {        private FileChannel readChannel;        private MappedByteBuffer readMbb;        private final int FILE_SIZE = 128;        private final int INT_SIZE = 4;        private int readIndex ;        private int valueNow;        public Scanner(RandomAccessFile readFile) throws IOException {            readChannel = readFile.getChannel();            readMbb = readChannel.map(FileChannel.MapMode.READ_WRITE,0,FILE_SIZE);            readIndex = 0;        }        public String getFilePath(){            return filePath;        }        public Integer peek(){            valueNow = readMbb.getInt();            readMbb.position(readIndex);            return valueNow;        }        public boolean hasNext(){            return readIndex + INT_SIZE < FILE_SIZE;        }        public Integer next(){            if (hasNext()){                valueNow = readMbb.getInt();                readIndex += INT_SIZE;            }            return null;        }    }}

MergeHandler类,核心类。将多个scanner整合为一个有序序列。

public class MergeHandler {    Queue<Reader.Scanner> valueHeap;    IntegerScannerComparator comparator;    Reader.Scanner current;    public MergeHandler(SortedFile[] sortedFiles) throws IOException {        comparator = new IntegerScannerComparator();        valueHeap = new PriorityQueue<Reader.Scanner>(sortedFiles.length, comparator);        for (SortedFile sortedFile : sortedFiles) {            Reader.Scanner scanner = sortedFile.getReader().getScanner();            if (scanner.hasNext()) {                valueHeap.add(scanner);            }        }    }    public ResultScanner getResultScanner() {        return new ResultScanner();    }    class ResultScanner implements Iterable<Integer> {        @Override        public Iterator<Integer> iterator() {            return new Iterator<Integer>() {                @Override                public boolean hasNext() {                    return !valueHeap.isEmpty();                }                @Override                public Integer next() {                    if (hasNext()) {                        current = valueHeap.poll();                        //System.out.println(current.getFilePath());                        int currentValue = current.peek();                        if (current.hasNext()) {                            current.next();                            valueHeap.add(current);                        }                        return currentValue;                    }                    return null;                }                @Override                public void remove() {                    //To change body of implemented methods use File | Settings | File Templates.                }            };        }    }}

Scanner的comparator,通过加载此类后prorityQueue就能分辨scanner的大小关系

public class IntegerScannerComparator implements Comparator<Reader.Scanner> {    @Override    public int compare(Reader.Scanner o1, Reader.Scanner o2) {        return o1.peek().compareTo(o2.peek());    }}

测试类:

public class MergeSortTest {    public static SortedFile[] itemList = new SortedFile[10];    /**     * @param args     * @throws Exception     */    public static void main(String[] args) throws Exception {        // TODO Auto-generated method stub        int filenum = 10;        SortedFile[] files = new SortedFile[filenum];        for (int i = 0; i < filenum; i++) {            SortedFile sortedFile = new SortedFile("D:/queue/" + i + ".txt");            files[i] = sortedFile;            Writer writer = sortedFile.getWriter();            writer.initFileRandom();            writer.close();        }        MergeHandler mergeHandler = new MergeHandler(files);        MergeHandler.ResultScanner resultScanner = mergeHandler.getResultScanner();        int num = 0;        for (int value : resultScanner) {            num++;            System.out.println("value :" + value + " num :" + num);        }    }}

运行测试程序后,resultScanner会将所有文件中的数据排序。

 

总结:

1.此程序可以很好的解决一个经典的面试题:排序10亿个数值。

2.mergesort的使用前提是每个小序列本身必须有序。

3.实现的关键在于使用prorityqueue来获得所有scanner当前的最小值

4.这种scanner的思想也在lucent等分布式计算领域中大量借鉴,值得学习~~






 

 

 

 

热点排行