从MergeSort看HBase 二
接着前一篇 从MergeSort看HBase 一:http://blog.csdn.net/mrtitan/article/details/8457080
在hbase中scan是最核心的算法,从直接的读操作get,scan到一些hbase的大操作compact、split、flush都是使用各种各样的scanner实现的,可以说region的一切读操作都是scan。scan的实现可以参考:http://blog.csdn.net/mrtitan/article/details/8284569
而其实scan也使用了mergesort的思想,下面我们模拟hbase中scanner实现从多个各自有序的文件中以流的方式顺序读出。
SortedFile类
public class SortedFile{ private String filePath; public SortedFile(String path){ this.filePath = path; }public Reader getReader() throws IOException {return new Reader(this.filePath);}public Writer getWriter() throws IOException {return new Writer(this.filePath);}}
SortedFile的writer类,通过writer的initRandom方法会产生大量随即数,排序后依次存入文件:
public class Writer { private RandomAccessFile writeFile; private FileChannel writeChannel; private MappedByteBuffer writeMbb; private final int FILE_SIZE = 128; private final int INT_SIZE = 4; public Writer(String filePath) throws IOException { writeFile = new RandomAccessFile(filePath, "rw"); writeChannel = writeFile.getChannel(); writeMbb = writeChannel.map(FileChannel.MapMode.READ_WRITE,0,FILE_SIZE); }public void initFileRandom(){Random random = new Random(); int write_time = FILE_SIZE / INT_SIZE; List<Integer> valueList = new ArrayList<Integer>(write_time); for(int i=0;i<write_time;i++){ int randomValue = Math.abs(random.nextInt(100000)); valueList.add(randomValue); } Collections.sort(valueList); for (Integer value : valueList) { writeMbb.putInt(value); }} public void close() throws IOException { if (writeMbb != null) { writeMbb.force(); unmap(writeMbb); } closeResource(writeChannel); closeResource(writeFile); } private void unmap(MappedByteBuffer buffer) { if (buffer == null) return; sun.misc.Cleaner cleaner = ((DirectBuffer) buffer).cleaner(); if (cleaner != null) { cleaner.clean(); } } private void closeResource(Closeable c) throws IOException { if (c != null) { c.close(); } }}
SortedFile的Reader类,提供scanner依次读出文件中的数字。
public class Reader { private RandomAccessFile readFile; private String filePath;public Reader(String filePath) throws IOException { readFile = new RandomAccessFile(filePath, "rw"); this.filePath = filePath;} public Scanner getScanner() throws IOException { return new Scanner(readFile); } class Scanner { private FileChannel readChannel; private MappedByteBuffer readMbb; private final int FILE_SIZE = 128; private final int INT_SIZE = 4; private int readIndex ; private int valueNow; public Scanner(RandomAccessFile readFile) throws IOException { readChannel = readFile.getChannel(); readMbb = readChannel.map(FileChannel.MapMode.READ_WRITE,0,FILE_SIZE); readIndex = 0; } public String getFilePath(){ return filePath; } public Integer peek(){ valueNow = readMbb.getInt(); readMbb.position(readIndex); return valueNow; } public boolean hasNext(){ return readIndex + INT_SIZE < FILE_SIZE; } public Integer next(){ if (hasNext()){ valueNow = readMbb.getInt(); readIndex += INT_SIZE; } return null; } }}
MergeHandler类,核心类。将多个scanner整合为一个有序序列。
public class MergeHandler { Queue<Reader.Scanner> valueHeap; IntegerScannerComparator comparator; Reader.Scanner current; public MergeHandler(SortedFile[] sortedFiles) throws IOException { comparator = new IntegerScannerComparator(); valueHeap = new PriorityQueue<Reader.Scanner>(sortedFiles.length, comparator); for (SortedFile sortedFile : sortedFiles) { Reader.Scanner scanner = sortedFile.getReader().getScanner(); if (scanner.hasNext()) { valueHeap.add(scanner); } } } public ResultScanner getResultScanner() { return new ResultScanner(); } class ResultScanner implements Iterable<Integer> { @Override public Iterator<Integer> iterator() { return new Iterator<Integer>() { @Override public boolean hasNext() { return !valueHeap.isEmpty(); } @Override public Integer next() { if (hasNext()) { current = valueHeap.poll(); //System.out.println(current.getFilePath()); int currentValue = current.peek(); if (current.hasNext()) { current.next(); valueHeap.add(current); } return currentValue; } return null; } @Override public void remove() { //To change body of implemented methods use File | Settings | File Templates. } }; } }}
Scanner的comparator,通过加载此类后prorityQueue就能分辨scanner的大小关系
public class IntegerScannerComparator implements Comparator<Reader.Scanner> { @Override public int compare(Reader.Scanner o1, Reader.Scanner o2) { return o1.peek().compareTo(o2.peek()); }}
测试类:
public class MergeSortTest { public static SortedFile[] itemList = new SortedFile[10]; /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int filenum = 10; SortedFile[] files = new SortedFile[filenum]; for (int i = 0; i < filenum; i++) { SortedFile sortedFile = new SortedFile("D:/queue/" + i + ".txt"); files[i] = sortedFile; Writer writer = sortedFile.getWriter(); writer.initFileRandom(); writer.close(); } MergeHandler mergeHandler = new MergeHandler(files); MergeHandler.ResultScanner resultScanner = mergeHandler.getResultScanner(); int num = 0; for (int value : resultScanner) { num++; System.out.println("value :" + value + " num :" + num); } }}
运行测试程序后,resultScanner会将所有文件中的数据排序。
总结:
1.此程序可以很好的解决一个经典的面试题:排序10亿个数值。
2.mergesort的使用前提是每个小序列本身必须有序。
3.实现的关键在于使用prorityqueue来获得所有scanner当前的最小值
4.这种scanner的思想也在lucent等分布式计算领域中大量借鉴,值得学习~~