目录

HBase-Compaction-HFile-可见性和并发安全性分析

HBase Compaction HFile 可见性和并发安全性分析

HStore

HStore 本身不直接记录一个“目录”的概念,但它管理着与特定列族相关的所有 StoreFiles。这些 StoreFiles 存储在 HDFS 上的特定目录中,通常位于 Region 目录下的列族目录中。HStore 通过 StoreFileTracker (如 StoreFileTrackerFactory.create) 来跟踪和管理这些文件。StoreFileTracker 负责记录和更新 StoreFiles 的元数据,包括它们在文件系统中的位置。

  • 在 HStore 的构造函数中,会初始化 StoreFileManager,它负责管理 StoreFiles 的生命周期,包括添加、删除和获取 StoreFiles。StoreFileManager 会与 StoreFileTracker 交互,以确保文件列表是最新的。
  • HStore 还通过 archiveLock 和相关方法(如 closeAndArchiveCompactedFiles 和 removeCompactedfiles)来管理已压缩文件的归档,确保这些文件被正确地移动到归档目录并从活动文件列表中移除。

Compaction 如何与 HStore 交互?

  • Compaction 触发HStore 通过 requestCompaction 方法来触发 compaction。该方法会选择合适的 StoreFiles 进行 compaction,并将这些文件添加到 filesCompacting 列表中,以防止它们被其他操作干扰。
  • Compaction 执行:实际的 compaction 逻辑由 CompactionPipeline 和 Compactor 实现。HStore 通过 compact 方法来协调整个 compaction 过程。该方法会创建一个临时文件来存储 compaction 结果,然后将输入文件移动到归档目录,并将临时文件移动到 Store 的目录中。
  • Compaction 完成:在 compaction 完成后,HStore 会调用 replaceStoreFiles 方法来更新 StoreFiles 列表,将旧的输入文件替换为新的输出文件。同时,它会更新相关的统计信息和指标,如 compactedCellsCount 和 compactedOutputFileSize
  • Compaction 失败处理:如果 compaction 过程中发生错误,HStore 会尝试回滚操作,确保数据一致性。例如,在 compact 方法中,如果写入 WAL 记录失败,它会删除临时文件并恢复原始文件。
  • Compaction 监控HStore 提供了 getCompactionProgress 方法来获取当前 compaction 的进度,并通过 finishCompactionRequest 和 cancelRequestedCompaction 方法来管理 compaction 请求的完成和取消。

总结来说,HStore 通过 StoreFileManager 和 StoreFileTracker 来管理 StoreFiles 的目录和元数据,并通过一系列方法(如 requestCompactioncompactreplaceStoreFiles 等)来协调和执行 compaction 过程。

HStore 如何利用 storeEngine 的锁机制保证可见性和安全性

1. 读操作方面

在 HStore 类中,读操作(如 getScanners 方法)通过获取 storeEngine 的读锁来保证可见性和安全性:


public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
  boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,
  byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion)
  throws IOException {
  Collection<HStoreFile> storeFilesToScan;
  List<KeyValueScanner> memStoreScanners;
  this.storeEngine.readLock();
  try {
    storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
      includeStartRow, stopRow, includeStopRow, onlyLatestVersion);
    memStoreScanners = this.memstore.getScanners(readPt);
    // NOTE: here we must increase the refCount for storeFiles because we would open the
    // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
    // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
    // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
    // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
    // for more details.
    HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
  } finally {
    this.storeEngine.readUnlock();
  }
  try {
    // First the store file scanners

    // TODO this used to get the store files in descending order,
    // but now we get them in ascending order, which I think is
    // actually more correct, since memstore get put at the end.
    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
      storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);
    List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
    scanners.addAll(sfScanners);
    // Then the memstore scanners
    scanners.addAll(memStoreScanners);
    return scanners;
  } catch (Throwable t) {
    clearAndClose(memStoreScanners);
    throw t instanceof IOException ? (IOException) t : new IOException(t);
  } finally {
    HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
  }
}

在读操作中,storeEngine.readLock() 被调用以获取读锁,这确保了在读取 StoreFile 列表时不会发生并发修改。读锁允许多个读操作同时进行,但会阻止写操作。在读取完成后,通过 storeEngine.readUnlock() 释放读锁。

2. 写操作方面

在 HStore 类中,写操作(如 replaceStoreFiles 方法)通过获取 storeEngine 的写锁来保证可见性和安全性:


@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
    allowedOnPath = ".*/(HStore|TestHStore).java")
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
  boolean writeCompactionMarker) throws IOException {
  storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
    if (writeCompactionMarker) {
      writeCompactionWalRecord(compactedFiles, result);
    }
  }, () -> {
    synchronized (filesCompacting) {
      filesCompacting.removeAll(compactedFiles);
    }
  });
  // These may be null when the RS is shutting down. The space quota Chores will fix the Region
  // sizes later so it's not super-critical if we miss these.
  RegionServerServices rsServices = region.getRegionServerServices();
  if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
    updateSpaceQuotaAfterFileReplacement(
      rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
      compactedFiles, result);
  }
}

replaceStoreFiles 方法调用了 storeEngine.replaceStoreFiles,其实现如下:


public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
  Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
  throws IOException {
  storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
    StoreUtils.toStoreFileInfo(newFiles));
  walMarkerWriter.run();
  writeLock();
  try {
    storeFileManager.addCompactionResults(compactedFiles, newFiles);
    actionUnderLock.run();
  } finally {
    writeUnlock();
  }
}

在写操作中,storeEngine.writeLock() 被调用以获取写锁,这确保了在修改 StoreFile 列表时不会有其他读或写操作同时进行。写锁是独占的,它会阻止所有其他读和写操作。在修改完成后,通过 storeEngine.writeUnlock() 释放写锁。

StoreEngine 增加文件也是使用写锁


  /**
   * Add the store files to store file manager, and also record it in the store file tracker.
   * <p/>
   * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
   * the lock protection. Usually this is for clear the memstore snapshot.
   */
  public void addStoreFiles(Collection<HStoreFile> storeFiles,
    IOExceptionRunnable actionAfterAdding) throws IOException {
    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
    writeLock();
    try {
      storeFileManager.insertNewFiles(storeFiles);
      actionAfterAdding.run();
    } finally {
      // We need the lock, as long as we are updating the storeFiles
      // or changing the memstore. Let us release it before calling
      // notifyChangeReadersObservers. See HBASE-4485 for a possible
      // deadlock scenario that could have happened if continue to hold
      // the lock.
      writeUnlock();
    }
  }

总结

HStore 通过 storeEngine 的读写锁机制来保证并发安全和可见性:

  1. 读操作:使用读锁 (readLock),允许多个读操作并发执行,但阻止写操作。
  2. 写操作:使用写锁 (writeLock),确保写操作的独占性,阻止所有其他读和写操作。

这种机制确保了在并发环境下,StoreFile 列表的读取和修改操作是安全的,并且能够保证数据的一致性和可见性。


​Compaction 执行层级​

​1. HStore 级别触发​
  • ​核心机制​​:Compaction 主要在 HStore 级别进行触发和管理
  • ​存储关联​​:每个列族(Column Family)对应一个 HStore 实例
  • ​触发判断​​:通过 HStore.needsCompaction()方法决定是否需要执行 compaction
​2. Region 级别协调​
  • ​多 Store 管理​​:单个 HRegion 包含多个 HStore,由 Region 层级协调各 Store 的 compaction

  • ​启动检查​​:在 HRegionServer 中,region 打开时自动检查 compaction 需求:

    
    if (!r.isReadOnly()) {
      for (HStore s : r.stores.values()) {
        if (s.hasReferences() || s.needsCompaction()) {
          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
        }
      }
    }
​3. Region Server 级别调度​
  • ​定期检测​​:CompactionChecker定期扫描所有 region 的 stores,触发必要 compaction
  • ​任务执行​​:通过 CompactSplit线程池异步执行 compaction 任务

​Compaction 协调机制​

​核心检测逻辑​


// CompactionChecker 定期检查实现
private static class CompactionChecker extends ScheduledChore {
  private final HRegionServer instance;
  
  @Override
  protected void chore() {
    for (HRegion hr : this.instance.onlineRegions.values()) {
      if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
        continue;
      }

      for (HStore s : hr.stores.values()) {
        if (s.needsCompaction()) {  // 检查 compaction 必要性
          this.instance.compactSplitThread.requestSystemCompaction(
            hr, s, "Periodic compaction"  // 提交系统级 compaction 请求
          );
        }
      }
    }
  }
}

​可能存在的问题及解决方案​

​问题类型​​具体表现​​解决方案​
​并发控制​Compaction 与 region 关闭操作冲突使用 writestate.compacting计数器同步状态;region 关闭时等待所有 compaction 完成
​资源竞争​多 compaction 任务争抢系统资源通过 CompactSplit线程池限制并发数;支持优先级调度确保关键任务优先执行
​跨 Region 协调​各 Region 独立管理导致资源分配不均设计上保持 Region 独立性(简化实现),但可能引发局部资源竞争

​总结​

HBase Compaction 采用 ​​多层级协同设计​​:

  1. ​触发检测​​:Store 级别实时监测 compaction 需求
  2. ​执行调度​​:Region Server 级别统一协调任务分配与执行
  3. ​资源管理​​:通过线程池并发控制和优先级机制优化资源利用率

该设计通过分层解耦实现了 ​​灵活性​​ 与 ​​可靠性​​ 的平衡,避免了单点故障风险,是经过验证的高效架构方案。