首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > 其他数据库 >

HBase Compaction (一)

2013-11-29 
HBase Compaction (1)?2.Get CompactionRequest from the compaction queue?二、功能详解1.Put CompactionR

HBase Compaction (1)

?

2.Get CompactionRequest from the compaction queue


HBase Compaction (一)
?

二、功能详解

1.Put CompactionRequest to the compaction queue

?1.1.HRegionServer

1.run()

1.1.preRegistrationInitialization()

Do pre-registration initializations; zookeeper, lease threads, etc.

1.1.1.initializeThreads()

    // Compaction thread    this.compactSplitThread = new CompactSplitThread(this);    // Background thread to check for compactions; needed if region    // has not gotten updates in a while. Make it run at a lesser frequency.    int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY +      ".multiplier", 1000);    this.compactionChecker = new CompactionChecker(this,      this.threadWakeFrequency * multiplier, this);

hbase.server.thread.wakefrequency.multiplier = 1000

hbase.server.thread.wakefrequency = 10 * 1000

?

1.2.CompactionChecker

Inner class that runs on a long period checking?if regions need compaction.

1.CompactionChecker()

      sleepTime = 10000s      LOG.info("Runs every " + StringUtils.formatTime(sleepTime));

hbase.regionserver.compactionChecker.majorCompactPriority = Integer.MAX_VALUE

2.chore()

CompactSplitThread.requestCompaction()

?

1.3.CompactSplitThread

1.requestCompaction()

1.1.

    CompactionRequest cr = s.requestCompaction(priority, request);      ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())          ? largeCompactions : smallCompactions;      pool.execute(cr);      if (LOG.isDebugEnabled()) {        String type = (pool == smallCompactions) ? "Small " : "Large ";        LOG.debug(type + "Compaction requested: " + cr            + (why != null && !why.isEmpty() ? "; Because: " + why : "")            + "; " + this);      }

1.2.Store.requestCompaction()

1.3.Store.throttleCompaction()

?

1.4.Store

1.requestCompaction()

1.1.

        // candidates = all storefiles not already in compaction queue        CompactSelection filesToCompact;          filesToCompact = compactSelection(candidates, priority);

1.2.compactSelection()

Algorithm to choose which files to compact

参考资料:

? ? ? ?compaction配置参数

? ? ? ?Hbase选择Store file做compaction的算法

        // major compaction if all StoreFiles are included        boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());        if (isMajor) {          // since we're enqueuing a major, update the compaction wait interval          this.forceMajor = false;        }

1.3.isMajor

2.throttleCompaction()

  boolean throttleCompaction(long compactionSize) {    long throttlePoint = conf.getLong(        "hbase.regionserver.thread.compaction.throttle",        2 * this.minFilesToCompact * this.region.memstoreFlushSize);    return compactionSize > throttlePoint;  }

2.Get CompactionRequest from the compaction queue

2.1.CompactionRequest

1.run()

1.1.

        long start = EnvironmentEdgeManager.currentTimeMillis();        boolean completed = r.compact(this);        long now = EnvironmentEdgeManager.currentTimeMillis();        LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +              this + "; duration=" + StringUtils.formatTimeDiff(now, start));      LOG.debug("CompactSplitThread status: " + server.getCompactSplitThread());

1.2.HRegion.compact()

?

2.2.HRegion

1.compact()

Called by compaction thread and after region is opened to compact the HStores if necessary.

1.1.

        LOG.info("Starting compaction on " + cr.getStore() + " in region "            + this + (cr.getCompactSelection().isOffPeakCompaction()?" as an off-peak compaction":""));

1.2.Store.compact()

?

2.3.Store

1.compact()

Compact the StoreFiles. ?This method may take some time, so the calling thread must be able to block for long periods.

During this time, the Store can work as usual, getting values from StoreFiles and writing new StoreFiles from the memstore.

Existing StoreFiles are not destroyed until the new compacted StoreFile is completely written-out to disk.

1.1.

    // Max-sequenceID is the last key in the files we're compacting    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);    // Ready to go. Have list of files to compact.    LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "        + this + " of "        + this.region.getRegionInfo().getRegionNameAsString()        + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="        + StringUtils.humanReadableInt(cr.getSize()));      StoreFile.Writer writer = this.compactor.compact(cr, maxId);

1.2.StoreFile.getMaxSequenceIdInList()

1.3.Compactor.compact()

1.4.completeCompaction()

1.5.

    LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "        + filesToCompact.size() + " file(s) in " + this + " of "        + this.region.getRegionInfo().getRegionNameAsString()        + " into " +        (sf == null ? "none" : sf.getPath().getName()) +        ", size=" + (sf == null ? "none" :          StringUtils.humanReadableInt(sf.getReader().length()))        + "; total size for store is "        + StringUtils.humanReadableInt(storeSize));

?

2.4.Compactor

1.compact()

Do a minor/major compaction on an explicit set of storefiles from a Store.

1.1.

    for (StoreFile file : filesToCompact) {      StoreFile.Reader r = file.getReader();      if (LOG.isDebugEnabled()) {        LOG.debug("Compacting " + file +          ", keycount=" + keyCount +          ", bloomtype=" + r.getBloomFilterType().toString() +          ", size=" + StringUtils.humanReadableInt(r.length()) +          ", encoding=" + r.getHFileReader().getEncodingOnDisk() +          (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));      }    }    StoreFile.Writer writer = null;            writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);            writer.append(kv);            writer.close();

1.2.Store.createWriterInTmp()

1.3.StoreFile.append()

1.4.StoreFile.close()

      // Log final Bloom filter statistics. This needs to be done after close()      // because compound Bloom filters might be finalized as part of closing.      StoreFile.LOG.info((hasGeneralBloom ? "" : "NO ") + "General Bloom and "          + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily"          + " was added to HFile (" + getPath() + ") ");

?

2.5.Store

1.createWriterInTmp()

1.1.

    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,        fs, blocksize)            .withOutputDir(region.getTmpDir())            .withDataBlockEncoder(dataBlockEncoder)            .withComparator(comparator)            .withBloomType(family.getBloomFilterType())            .withMaxKeyCount(maxKeyCount)            .withChecksumType(checksumType)            .withBytesPerChecksum(bytesPerChecksum)            .withCompression(compression)            .build();

1.1.1.StoreFile.build()

      return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder,          conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType,          bytesPerChecksum);

1.1.1.1.Writer

Creates an HFile.Writer that also write helpful meta data.

1.1.1.1.1.

      writer = HFile.getWriterFactory(conf, cacheConf)          .withPath(fs, path)          .withBlockSize(blocksize)          .withCompression(compress)          .withDataBlockEncoder(dataBlockEncoder)          .withComparator(comparator.getRawComparator())          .withChecksumType(checksumType)          .withBytesPerChecksum(bytesPerChecksum)          .create();

1.1.1.1.1.1.HFile.create()

createWriter()

1.1.1.1.1.1.1.HFileWriterV2.createWriter()

    finishInit(conf);    LOG.debug("Initialized with " + cacheConf);

1.1.1.1.2.

      generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(          conf, cacheConf, bloomType,          (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);        LOG.info("Bloom filter type for " + path + ": " + this.bloomType + ", "            + generalBloomFilterWriter.getClass().getSimpleName());

?

2.6.StoreFile

1.append()

    public void append(final KeyValue kv) throws IOException {      appendGeneralBloomfilter(kv);      appendDeleteFamilyBloomFilter(kv);      writer.append(kv);      trackTimestamps(kv);    }

1.1.appendGeneralBloomfilter()

1.1.1.CompoundBloomFilterWriter.add()

1.1.1.1.enqueueReadyChunk()

    ReadyChunk readyChunk = new ReadyChunk();    readyChunk.chunkId = numChunks - 1;    readyChunk.chunk = chunk;    readyChunk.firstKey = firstKeyInChunk;    readyChunks.add(readyChunk);    if (LOG.isDebugEnabled() && prevByteSize != chunk.getByteSize()) {      LOG.debug("Compacted Bloom chunk #" + readyChunk.chunkId + " from ["          + prevMaxKeys + " max keys, " + prevByteSize + " bytes] to ["          + chunk.getMaxKeys() + " max keys, " + chunk.getByteSize()          + " bytes]");    }

?

2.7.Store

1.completeCompaction()

1.1.validateStoreFile()

    StoreFile storeFile = null;      storeFile = new StoreFile(this.fs, path, this.conf,          this.cacheConf, this.family.getBloomFilterType(),          NoOpDataBlockEncoder.INSTANCE);      passSchemaMetricsTo(storeFile);      storeFile.createReader();

1.2.

      // Move the file into the right spot      Path origPath = compactedFile.getPath();      Path destPath = new Path(homedir, origPath.getName());      LOG.info("Renaming compacted file at " + origPath + " to " + destPath);

1.3.

    StoreFile result = null;      result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,          this.family.getBloomFilterType(), this.dataBlockEncoder);      passSchemaMetricsTo(result);      result.createReader();

1.3.1.StoreFile.createReader()

1.3.1.1.open()

Opens reader on this store file.

      reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);

1.3.1.1.1.StoreFile.loadBloomfilter()

              LOG.info("Loaded " + bloomFilterType.toString() + " ("                  + generalBloomFilter.getClass().getSimpleName()                  + ") metadata for " + reader.getName());

?

三、功能日志

1.HRegionServer

2013-11-04 19:20:07,196 INFO org.apache.hadoop.hbase.regionserver.HRegionServer: Runs every 2hrs, 46mins, 40sec
2013-11-03 21:21:12,492 DEBUG org.apache.hadoop.hbase.regionserver.CompactSplitThread: Small Compaction requested: regionName=gprs_traffic_billing_detail,25875,1377500471367.3790b731b02a0bdbf682553b4cab835f., storeName=g, fileCount=4, fileSize=188.3m (23.2m, 29.6m, 29.9m, 105.5m), priority=3, time=17819361919784975; Because: regionserver60020.compactionChecker requests compaction; compaction_queue=(19:0), split_queue=02013-11-03 21:21:12,493 DEBUG org.apache.hadoop.hbase.regionserver.CompactSplitThread: Large Compaction requested: regionName=gn_traffic_detail,29375,1379837155328.c8388b789007b3f3f7655e5bf92e48ab., storeName=g, fileCount=3, fileSize=362.9m (52.0m, 284.8m, 26.1m), priority=3, time=17819361921288456; Because: regionserver60020.compactionChecker requests compaction; compaction_queue=(20:0), split_queue=0
2013-06-05 15:45:23,627 INFO org.apache.hadoop.hbase.regionserver.HRegion: Starting compaction on urlresult in region gnurlresult,05882352940_00000000,1370415394696.77a9acd00f11140531a0123cf8054c89.2013-06-05 15:45:23,629 INFO org.apache.hadoop.hbase.regionserver.Store: Starting compaction of 5 file(s) in urlresult of gnurlresult,05882352940_00000000,1370415394696.77a9acd00f11140531a0123cf8054c89. into tmpdir=hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/.tmp, seqid=0, totalSize=1.4g2013-06-05 15:45:23,629 DEBUG org.apache.hadoop.hbase.regionserver.Store: Compacting hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/urlresult/23959a0af18147518a50680f05974d92, keycount=52048, bloomtype=NONE, size=315.6m2013-06-05 15:45:23,629 DEBUG org.apache.hadoop.hbase.regionserver.Store: Compacting hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/urlresult/09d761dd46cc472d8f0a2acdb7ae915d, keycount=51168, bloomtype=NONE, size=314.8m2013-06-05 15:45:23,630 DEBUG org.apache.hadoop.hbase.regionserver.Store: Compacting hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/urlresult/34fce86597174c85940d686c0509b0da, keycount=51607, bloomtype=NONE, size=315.6m2013-06-05 15:45:23,630 DEBUG org.apache.hadoop.hbase.regionserver.Store: Compacting hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/urlresult/eaad1df984be4cc18470a2600703d1bd, keycount=55978, bloomtype=NONE, size=317.2m2013-06-05 15:45:23,630 DEBUG org.apache.hadoop.hbase.regionserver.Store: Compacting hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/urlresult/9e26e0e923b044e5bec71320303a54c7, keycount=28363, bloomtype=NONE, size=162.2m2013-06-05 15:45:23,775 DEBUG org.apache.hadoop.hbase.io.hfile.HFileWriterV2: Initialized with CacheConfig:enabled [cacheDataOnRead=true] [cacheDataOnWrite=false] [cacheIndexesOnWrite=false] [cacheBloomsOnWrite=false] [cacheEvictOnClose=false] [cacheCompressed=false]2013-06-05 15:45:23,776 INFO org.apache.hadoop.hbase.regionserver.StoreFile: Bloom filter type for hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/.tmp/f0d5c1d4bbec48ecbb1c397d2d0d8ff5: ROW, CompoundBloomFilterWriter2013-06-05 15:46:41,564 DEBUG org.apache.hadoop.hbase.util.CompoundBloomFilterWriter: Compacted Bloom chunk #2 from [109306 max keys, 131072 bytes] to [27326 max keys, 32768 bytes]2013-06-05 15:46:41,583 INFO org.apache.hadoop.hbase.regionserver.StoreFile: Bloom added to HFile (hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/.tmp/f0d5c1d4bbec48ecbb1c397d2d0d8ff5): org.apache.hadoop.hbase.util.CompoundBloomFilterWriter@78ad03952013-06-05 15:46:41,597 INFO org.apache.hadoop.hbase.regionserver.StoreFile$Reader: Loaded ROW CompoundBloomFilter metadata for f0d5c1d4bbec48ecbb1c397d2d0d8ff52013-06-05 15:46:41,597 INFO org.apache.hadoop.hbase.regionserver.Store: Renaming compacted file at hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/.tmp/f0d5c1d4bbec48ecbb1c397d2d0d8ff5 to hdfs://tbe010154145117:9000/hbase/gnurlresult/77a9acd00f11140531a0123cf8054c89/urlresult/f0d5c1d4bbec48ecbb1c397d2d0d8ff52013-06-05 15:46:41,616 INFO org.apache.hadoop.hbase.regionserver.StoreFile$Reader: Loaded ROW CompoundBloomFilter metadata for f0d5c1d4bbec48ecbb1c397d2d0d8ff52013-06-05 15:46:41,715 INFO org.apache.hadoop.hbase.regionserver.Store: Completed major compaction of 5 file(s) in urlresult of gnurlresult,05882352940_00000000,1370415394696.77a9acd00f11140531a0123cf8054c89. into f0d5c1d4bbec48ecbb1c397d2d0d8ff5, size=1.4g; total size for store is 1.4g2013-06-05 15:46:41,716 INFO org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest: completed compaction: regionName=gnurlresult,05882352940_00000000,1370415394696.77a9acd00f11140531a0123cf8054c89., storeName=urlresult, fileCount=5, fileSize=1.4g, priority=2, time=4767593656471521; duration=1mins, 18sec2013-06-05 15:46:41,716 DEBUG org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest: CompactSplitThread status: compaction_queue=(1:0), split_queue=0

? ? ? ?duration=1mins, 18sec 是一个CompactionRequest执行compact操作所耗费的时间。

? ? ?我们可以根据时间长短,来判断是否存在性能问题。

热点排行