org.apache.iceberg.BaseTableScan#planTasks ScanTask的构建从这里开始。
基本的逻辑如下
- 查看需要扫描的文件
- 判断是否需要将一个文件扫描任务拆分为多个更小的扫描任务,提高大文件扫描的效率。
- 因为经过上面一步一个大任务可能被拆分为多个小任务了,所以这里还需要提供一个将其合并的方法。
public CloseableIterable<CombinedScanTask> planTasks() {
CloseableIterable<FileScanTask> fileScanTasks = planFiles();
CloseableIterable<FileScanTask> splitFiles =
TableScanUtil.splitFiles(fileScanTasks, targetSplitSize());
return TableScanUtil.planTasks(
splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());
}这里只有数据文件的扫描计划的构建,在实际当中其实需要依赖元数据的信息,再往上一层可以看到是存在元数据的读取的
public VectorizedTableScanIterable(TableScan scan, int batchSize, boolean reuseContainers) {
this.reader = new ArrowReader(scan, batchSize, reuseContainers);
// start planning tasks in the background
this.tasks = scan.planTasks();
}元数据的读取这里暂且按下不表,指示简单的文件读取而已,只不过存在很多可以用于优化扫描任务的信息,在下面的扫描任务构建过程当中可以慢慢发掘。
planFiles数据文件
返回一个文件扫描任务列表,planFiles返回的每个任务对应一个单独的文件。
public CloseableIterable<T> planFiles() {
// 1. 获取快照,如果指定了快照则使用指定的,没有指定则使用当前最新的
Snapshot snapshot = snapshot();
if (snapshot == null) {
LOG.info("Scanning empty table {}", table());
return CloseableIterable.empty();
}
// 2. 使用查询的字段做投影,并转为字段id
List<Integer> projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema()));
// 保存和字段id的顺序的字段名,用来展示
List<String> projectedFieldNames = projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList());
Timer.Timed planningDuration = scanMetrics().totalPlanningDuration().start();
// 3. 返回一个可迭代的任务列表,所以核心的逻辑在doPlanFiles当中
return CloseableIterable.whenComplete(
doPlanFiles(),
() -> {
planningDuration.stop();
Map<String, String> metadata = Maps.newHashMap(context().options());
metadata.putAll(EnvironmentContext.get());
ScanReport scanReport =
ImmutableScanReport.builder()
.schemaId(schema().schemaId())
.projectedFieldIds(projectedFieldIds)
.projectedFieldNames(projectedFieldNames)
.tableName(table().name())
.snapshotId(snapshot.snapshotId())
.filter(
ExpressionUtil.sanitize(
schema().asStruct(), filter(), context().caseSensitive()))
.scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics()))
.metadata(metadata)
.build();
context().metricsReporter().report(scanReport);
});
}doPlanFiles()有很多实现,我们这里关注的是表数据的读取实现,所以进入org.apache.iceberg.DataTableScan#doPlanFiles实现当中。
public CloseableIterable<FileScanTask> doPlanFiles() {
// 1. 获取当前查询使用的快照
Snapshot snapshot = snapshot();
// 2. FileIO,即用来访问存储的抽象
FileIO io = table().io();
// 3. 虽然分为了两步,但是实际上dataManifests和deleteManifests是在一次查询中完成的
// 3.1 从快照当中获取数据manifest列表
List<ManifestFile> dataManifests = snapshot.dataManifests(io);
// 3.2 从快照当中获取删除数据文件manifest列表
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io);
scanMetrics().totalDataManifests().increment((long) dataManifests.size());
scanMetrics().totalDeleteManifests().increment((long) deleteManifests.size());
// 根据manifest list信息以及上下文传递的过滤信息构建ManifestGroup。可以这样理解,ManifestGroup就是一系列查询过滤条件的结构体
ManifestGroup manifestGroup =
new ManifestGroup(io, dataManifests, deleteManifests)
.caseSensitive(isCaseSensitive())
.select(scanColumns())
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());
// 忽略残差文件,残差文件是某些操作或者异常导致的未被正常记录或处理的文件
if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}
// 是否使用线程池,多线程构建文件扫描计划
if (shouldPlanWithExecutor() && (dataManifests.size() > 1 || deleteManifests.size() > 1)) {
manifestGroup = manifestGroup.planWith(planExecutor());
}
// 基于ManifestGroup构建文件扫描计划
return manifestGroup.planFiles();
}下面查看ManifestGroup的实现
// 调用plan函数,并传递了一个回调函数
public CloseableIterable<FileScanTask> planFiles() {
return plan(ManifestGroup::createFileScanTasks);
}
public <T extends ScanTask> CloseableIterable<T> plan(CreateTasksFunction<T> createTasksFunc) {
LoadingCache<Integer, ResidualEvaluator> residualCache =
Caffeine.newBuilder()
.build(
specId -> {
PartitionSpec spec = specsById.get(specId);
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;
return ResidualEvaluator.of(spec, filter, caseSensitive);
});
DeleteFileIndex deleteFiles = deleteIndexBuilder.scanMetrics(scanMetrics).build();
boolean dropStats = ManifestReader.dropStats(columns);
// 存在删除文件,添加统计列
if (!deleteFiles.isEmpty()) {
select(ManifestReader.withStatsColumns(columns));
}
// 分区上下文的缓存对象
LoadingCache<Integer, TaskContext> taskContextCache =
Caffeine.newBuilder()
.build(
specId -> {
PartitionSpec spec = specsById.get(specId);
ResidualEvaluator residuals = residualCache.get(specId);
return new TaskContext(
spec, deleteFiles, residuals, dropStats, columnsToKeepStats, scanMetrics);
});
// 根据manifest的分区信息,获取到分区上下文,然后创建数据文件扫描任务
Iterable<CloseableIterable<T>> tasks =
entries(
(manifest, entries) -> {
int specId = manifest.partitionSpecId();
TaskContext taskContext = taskContextCache.get(specId);
return createTasksFunc.apply(entries, taskContext);
});
// 多线程并发的设置
if (executorService != null) {
return new ParallelIterable<>(tasks, executorService);
} else {
return CloseableIterable.concat(tasks);
}
}那么接下来就是进入entries方法了,在这里先过滤掉不需要的manifest文件,然后再根据原信息和manifest记录的数据文件元信息过滤掉不需要的数据文件
private <T> Iterable<CloseableIterable<T>> entries(
BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>>
entryFn) {
// 分区匹配器的缓存
LoadingCache<Integer, ManifestEvaluator> evalCache =
specsById == null
? null
: Caffeine.newBuilder()
.build(
specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(
partitionFilter,
Projections.inclusive(spec, caseSensitive).project(dataFilter)),
spec,
caseSensitive);
});
// 数据文件的过滤计算器
Evaluator evaluator;
if (fileFilter != null && fileFilter != Expressions.alwaysTrue()) {
evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter, caseSensitive);
} else {
evaluator = null;
}
// ManifestGroup中manifest文件的迭代器
CloseableIterable<ManifestFile> closeableDataManifests =
CloseableIterable.withNoopClose(dataManifests);
// 返回会使用过滤条件过滤的manifest迭代器
CloseableIterable<ManifestFile> matchingManifests =
evalCache == null
? closeableDataManifests
: CloseableIterable.filter(
scanMetrics.skippedDataManifests(),
closeableDataManifests,
manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
// 设置了忽略删除文件,那么就过滤掉没有任务added 或exist文件的manifest
if (ignoreDeleted) {
// only scan manifests that have entries other than deletes
// remove any manifests that don't have any existing or added files. if either the added or // existing files count is missing, the manifest must be scanned. matchingManifests =
CloseableIterable.filter(
scanMetrics.skippedDataManifests(),
matchingManifests,
manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles());
}
// 过滤掉没有added或者deleted文件的manifest
if (ignoreExisting) {
// only scan manifests that have entries other than existing
// remove any manifests that don't have any deleted or added files. if either the added or // deleted files count is missing, the manifest must be scanned. matchingManifests =
CloseableIterable.filter(
scanMetrics.skippedDataManifests(),
matchingManifests,
manifest -> manifest.hasAddedFiles() || manifest.hasDeletedFiles());
}
// 包装一次,迭代的过程中增加计数器的值
matchingManifests =
CloseableIterable.count(scanMetrics.scannedDataManifests(), matchingManifests);
// 返回一个transform迭代器,用于将machingManifest使用设置函数进行转换
return Iterables.transform(
matchingManifests,
manifest ->
new CloseableIterable<T>() {
private CloseableIterable<T> iterable;
@Override
public CloseableIterator<T> iterator() {
// 构造manifest数据文件的读取器,并设置好过滤条件
ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, io, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
.select(columns)
.scanMetrics(scanMetrics);
CloseableIterable<ManifestEntry<DataFile>> entries;
if (ignoreDeleted) {
entries = reader.liveEntries();
} else {
entries = reader.entries();
}
// 过滤
if (ignoreExisting) {
entries =
CloseableIterable.filter(
scanMetrics.skippedDataFiles(),
entries,
entry -> entry.status() != ManifestEntry.Status.EXISTING);
}
// 过滤
if (evaluator != null) {
entries =
CloseableIterable.filter(
scanMetrics.skippedDataFiles(),
entries,
entry -> evaluator.eval((GenericDataFile) entry.file()));
}
// 再过滤
entries =
CloseableIterable.filter(
scanMetrics.skippedDataFiles(), entries, manifestEntryPredicate);
// 回调传入的函数
iterable = entryFn.apply(manifest, entries);
return iterable.iterator();
}
@Override
public void close() throws IOException {
if (iterable != null) {
iterable.close();
}
}
});
}现在回到最上层传入的entryFn,也就是ManifestGroup::createFileScanTasks函数。也就是将过滤好的要扫描的数据文件转为文件扫描任务,也就是扫描的数据文件、关联的删除文件、schema、分区信息等内容封装成任务。
private static CloseableIterable<FileScanTask> createFileScanTasks(
CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext ctx) {
return CloseableIterable.transform(
entries,
entry -> {
DataFile dataFile =
ContentFileUtil.copy(entry.file(), ctx.shouldKeepStats(), ctx.columnsToKeepStats());
// 这个文件有关的删除文件
DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);
ScanMetricsUtil.fileTask(ctx.scanMetrics(), dataFile, deleteFiles);
return new BaseFileScanTask(
dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), ctx.residuals());
});
}总结下来,就是获取快照,然后读取快照中的ManifestList,构建出ManifestGroup。然后使用扫描上下文以及快照当中的元数据信息先过滤manifest,然后再额外根据manifest记录的数据文件元数据进一步过滤掉不需要的元数据文件。最后将所有需要扫描的数据文件创建为一个文件扫描任务。
这里我们比较关心的是过滤逻辑的实现,分为manifest文件过滤和manifest下的数据文件过滤两部分,一一探讨一下。
manifest文件过滤
在metadata当中记录了表的元信息,这些元信息中包含历史版本的,而用户传递的上下文中指定了在后续查询当中会生效的元信息,例如schema版本,spec版本等等,除此之外,在上下文中还包含了查询的一些额外信息,例如查询的字段列,其实都可以看作是过滤条件。
来看这个过滤的逻辑具体是怎么实现的,具体的代码在org.apache.iceberg.DataTableScan#doPlanFiles当中,前面已经介绍过了。这里就要展示详细介绍使用哪些条件,如何过滤。
- 首先是读取上下文指定快照信息或者最新的快照信息。也就是snap-xxx文件.
Snapshot snapshot = snapshot();- 读取manifest列表信息,FileIO是存储层的抽象,这里不需要关注。基于snapshot中的信息可以获取到manifest列表,注意,这里读取到的是snapshot中记录的manifest文件的元数据
FileIO io = table().io();
List<ManifestFile> dataManifests = snapshot.dataManifests(io);
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io);
ManifestGroup manifestGroup =
new ManifestGroup(io, dataManifests, deleteManifests)
.caseSensitive(isCaseSensitive()) // 是否大小写敏感
.select(scanColumns()) // 要扫描的manifest的列
.filterData(filter()) // 过滤条件
.specsById(table().specs()) // 表的全部版本的分区信息
.scanMetrics(scanMetrics())
.ignoreDeleted() // 是否忽略删除的文件
.columnsToKeepStats(columnsToKeepStats());
if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}
if (shouldPlanWithExecutor() && (dataManifests.size() > 1 || deleteManifests.size() > 1)) {
manifestGroup = manifestGroup.planWith(planExecutor());
}