HBase-Compaction-HFile-可见性和并发安全性分析
HBase Compaction HFile 可见性和并发安全性分析
HStore
HStore
本身不直接记录一个“目录”的概念,但它管理着与特定列族相关的所有 StoreFiles。这些 StoreFiles 存储在 HDFS 上的特定目录中,通常位于 Region 目录下的列族目录中。HStore
通过 StoreFileTracker
(如 StoreFileTrackerFactory.create
) 来跟踪和管理这些文件。StoreFileTracker
负责记录和更新 StoreFiles 的元数据,包括它们在文件系统中的位置。
- 在
HStore
的构造函数中,会初始化StoreFileManager
,它负责管理 StoreFiles 的生命周期,包括添加、删除和获取 StoreFiles。StoreFileManager
会与StoreFileTracker
交互,以确保文件列表是最新的。 HStore
还通过archiveLock
和相关方法(如closeAndArchiveCompactedFiles
和removeCompactedfiles
)来管理已压缩文件的归档,确保这些文件被正确地移动到归档目录并从活动文件列表中移除。
Compaction 如何与 HStore 交互?
- Compaction 触发:
HStore
通过requestCompaction
方法来触发 compaction。该方法会选择合适的 StoreFiles 进行 compaction,并将这些文件添加到filesCompacting
列表中,以防止它们被其他操作干扰。 - Compaction 执行:实际的 compaction 逻辑由
CompactionPipeline
和Compactor
实现。HStore
通过compact
方法来协调整个 compaction 过程。该方法会创建一个临时文件来存储 compaction 结果,然后将输入文件移动到归档目录,并将临时文件移动到 Store 的目录中。 - Compaction 完成:在 compaction 完成后,
HStore
会调用replaceStoreFiles
方法来更新 StoreFiles 列表,将旧的输入文件替换为新的输出文件。同时,它会更新相关的统计信息和指标,如compactedCellsCount
和compactedOutputFileSize
。 - Compaction 失败处理:如果 compaction 过程中发生错误,
HStore
会尝试回滚操作,确保数据一致性。例如,在compact
方法中,如果写入 WAL 记录失败,它会删除临时文件并恢复原始文件。 - Compaction 监控:
HStore
提供了getCompactionProgress
方法来获取当前 compaction 的进度,并通过finishCompactionRequest
和cancelRequestedCompaction
方法来管理 compaction 请求的完成和取消。
总结来说,HStore
通过 StoreFileManager
和 StoreFileTracker
来管理 StoreFiles 的目录和元数据,并通过一系列方法(如 requestCompaction
, compact
, replaceStoreFiles
等)来协调和执行 compaction 过程。
HStore 如何利用 storeEngine
的锁机制保证可见性和安全性
1. 读操作方面
在 HStore
类中,读操作(如 getScanners
方法)通过获取 storeEngine
的读锁来保证可见性和安全性:
public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,
byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion)
throws IOException {
Collection<HStoreFile> storeFilesToScan;
List<KeyValueScanner> memStoreScanners;
this.storeEngine.readLock();
try {
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow, onlyLatestVersion);
memStoreScanners = this.memstore.getScanners(readPt);
// NOTE: here we must increase the refCount for storeFiles because we would open the
// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
// for more details.
HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
} finally {
this.storeEngine.readUnlock();
}
try {
// First the store file scanners
// TODO this used to get the store files in descending order,
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);
List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
scanners.addAll(sfScanners);
// Then the memstore scanners
scanners.addAll(memStoreScanners);
return scanners;
} catch (Throwable t) {
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
} finally {
HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
}
}
在读操作中,storeEngine.readLock()
被调用以获取读锁,这确保了在读取 StoreFile 列表时不会发生并发修改。读锁允许多个读操作同时进行,但会阻止写操作。在读取完成后,通过 storeEngine.readUnlock()
释放读锁。
2. 写操作方面
在 HStore
类中,写操作(如 replaceStoreFiles
方法)通过获取 storeEngine
的写锁来保证可见性和安全性:
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
allowedOnPath = ".*/(HStore|TestHStore).java")
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
boolean writeCompactionMarker) throws IOException {
storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
if (writeCompactionMarker) {
writeCompactionWalRecord(compactedFiles, result);
}
}, () -> {
synchronized (filesCompacting) {
filesCompacting.removeAll(compactedFiles);
}
});
// These may be null when the RS is shutting down. The space quota Chores will fix the Region
// sizes later so it's not super-critical if we miss these.
RegionServerServices rsServices = region.getRegionServerServices();
if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
updateSpaceQuotaAfterFileReplacement(
rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
compactedFiles, result);
}
}
replaceStoreFiles
方法调用了 storeEngine.replaceStoreFiles
,其实现如下:
public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
throws IOException {
storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
StoreUtils.toStoreFileInfo(newFiles));
walMarkerWriter.run();
writeLock();
try {
storeFileManager.addCompactionResults(compactedFiles, newFiles);
actionUnderLock.run();
} finally {
writeUnlock();
}
}
在写操作中,storeEngine.writeLock()
被调用以获取写锁,这确保了在修改 StoreFile 列表时不会有其他读或写操作同时进行。写锁是独占的,它会阻止所有其他读和写操作。在修改完成后,通过 storeEngine.writeUnlock()
释放写锁。
StoreEngine 增加文件也是使用写锁
/**
* Add the store files to store file manager, and also record it in the store file tracker.
* <p/>
* The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
* the lock protection. Usually this is for clear the memstore snapshot.
*/
public void addStoreFiles(Collection<HStoreFile> storeFiles,
IOExceptionRunnable actionAfterAdding) throws IOException {
storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
writeLock();
try {
storeFileManager.insertNewFiles(storeFiles);
actionAfterAdding.run();
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
// the lock.
writeUnlock();
}
}
总结
HStore
通过 storeEngine
的读写锁机制来保证并发安全和可见性:
- 读操作:使用读锁 (
readLock
),允许多个读操作并发执行,但阻止写操作。 - 写操作:使用写锁 (
writeLock
),确保写操作的独占性,阻止所有其他读和写操作。
这种机制确保了在并发环境下,StoreFile 列表的读取和修改操作是安全的,并且能够保证数据的一致性和可见性。
Compaction 执行层级
1. HStore 级别触发
- 核心机制:Compaction 主要在 HStore 级别进行触发和管理
- 存储关联:每个列族(Column Family)对应一个 HStore 实例
- 触发判断:通过
HStore.needsCompaction()
方法决定是否需要执行 compaction
2. Region 级别协调
多 Store 管理:单个 HRegion 包含多个 HStore,由 Region 层级协调各 Store 的 compaction
启动检查:在 HRegionServer 中,region 打开时自动检查 compaction 需求:
if (!r.isReadOnly()) { for (HStore s : r.stores.values()) { if (s.hasReferences() || s.needsCompaction()) { this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); } } }
3. Region Server 级别调度
- 定期检测:
CompactionChecker
定期扫描所有 region 的 stores,触发必要 compaction - 任务执行:通过
CompactSplit
线程池异步执行 compaction 任务
Compaction 协调机制
核心检测逻辑
// CompactionChecker 定期检查实现
private static class CompactionChecker extends ScheduledChore {
private final HRegionServer instance;
@Override
protected void chore() {
for (HRegion hr : this.instance.onlineRegions.values()) {
if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
continue;
}
for (HStore s : hr.stores.values()) {
if (s.needsCompaction()) { // 检查 compaction 必要性
this.instance.compactSplitThread.requestSystemCompaction(
hr, s, "Periodic compaction" // 提交系统级 compaction 请求
);
}
}
}
}
}
可能存在的问题及解决方案
问题类型 | 具体表现 | 解决方案 |
---|---|---|
并发控制 | Compaction 与 region 关闭操作冲突 | 使用 writestate.compacting 计数器同步状态;region 关闭时等待所有 compaction 完成 |
资源竞争 | 多 compaction 任务争抢系统资源 | 通过 CompactSplit 线程池限制并发数;支持优先级调度确保关键任务优先执行 |
跨 Region 协调 | 各 Region 独立管理导致资源分配不均 | 设计上保持 Region 独立性(简化实现),但可能引发局部资源竞争 |
总结
HBase Compaction 采用 多层级协同设计:
- 触发检测:Store 级别实时监测 compaction 需求
- 执行调度:Region Server 级别统一协调任务分配与执行
- 资源管理:通过线程池并发控制和优先级机制优化资源利用率
该设计通过分层解耦实现了 灵活性 与 可靠性 的平衡,避免了单点故障风险,是经过验证的高效架构方案。