org.apache.iceberg.BaseTableScan#planTasks ScanTask的构建从这里开始。 基本的逻辑如下

  1. 查看需要扫描的文件
  2. 判断是否需要将一个文件扫描任务拆分为多个更小的扫描任务,提高大文件扫描的效率。
  3. 因为经过上面一步一个大任务可能被拆分为多个小任务了,所以这里还需要提供一个将其合并的方法。
public CloseableIterable<CombinedScanTask> planTasks() {  
  CloseableIterable<FileScanTask> fileScanTasks = planFiles();  
  CloseableIterable<FileScanTask> splitFiles =  
      TableScanUtil.splitFiles(fileScanTasks, targetSplitSize());  
  return TableScanUtil.planTasks(  
      splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());  
}

这里只有数据文件的扫描计划的构建,在实际当中其实需要依赖元数据的信息,再往上一层可以看到是存在元数据的读取的

public VectorizedTableScanIterable(TableScan scan, int batchSize, boolean reuseContainers) {  
  this.reader = new ArrowReader(scan, batchSize, reuseContainers);  
  // start planning tasks in the background  
  this.tasks = scan.planTasks();  
}

元数据的读取这里暂且按下不表,指示简单的文件读取而已,只不过存在很多可以用于优化扫描任务的信息,在下面的扫描任务构建过程当中可以慢慢发掘。

planFiles数据文件

返回一个文件扫描任务列表,planFiles返回的每个任务对应一个单独的文件。

public CloseableIterable<T> planFiles() {  
  // 1. 获取快照,如果指定了快照则使用指定的,没有指定则使用当前最新的
  Snapshot snapshot = snapshot();  
  
  if (snapshot == null) {  
    LOG.info("Scanning empty table {}", table());  
    return CloseableIterable.empty();  
  }  
  
  // 2. 使用查询的字段做投影,并转为字段id
  List<Integer> projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema()));  
  // 保存和字段id的顺序的字段名,用来展示
  List<String> projectedFieldNames = projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList());  
  
  Timer.Timed planningDuration = scanMetrics().totalPlanningDuration().start();  
  // 3. 返回一个可迭代的任务列表,所以核心的逻辑在doPlanFiles当中
  return CloseableIterable.whenComplete(  
      doPlanFiles(),  
      () -> {  
        planningDuration.stop();  
        Map<String, String> metadata = Maps.newHashMap(context().options());  
        metadata.putAll(EnvironmentContext.get());  
        ScanReport scanReport =  
            ImmutableScanReport.builder()  
                .schemaId(schema().schemaId())  
                .projectedFieldIds(projectedFieldIds)  
                .projectedFieldNames(projectedFieldNames)  
                .tableName(table().name())  
                .snapshotId(snapshot.snapshotId())  
                .filter(  
                    ExpressionUtil.sanitize(  
                        schema().asStruct(), filter(), context().caseSensitive()))
                        .scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics()))  
                .metadata(metadata)  
                .build();  
        context().metricsReporter().report(scanReport);  
      });  
}

doPlanFiles()有很多实现,我们这里关注的是表数据的读取实现,所以进入org.apache.iceberg.DataTableScan#doPlanFiles实现当中。

public CloseableIterable<FileScanTask> doPlanFiles() {  
  // 1. 获取当前查询使用的快照
  Snapshot snapshot = snapshot();  
  // 2. FileIO,即用来访问存储的抽象
  FileIO io = table().io();  
  // 3. 虽然分为了两步,但是实际上dataManifests和deleteManifests是在一次查询中完成的
  // 3.1 从快照当中获取数据manifest列表
  List<ManifestFile> dataManifests = snapshot.dataManifests(io);  
  // 3.2 从快照当中获取删除数据文件manifest列表
  List<ManifestFile> deleteManifests = snapshot.deleteManifests(io);  
  scanMetrics().totalDataManifests().increment((long) dataManifests.size());  
  scanMetrics().totalDeleteManifests().increment((long) deleteManifests.size());  
  // 根据manifest list信息以及上下文传递的过滤信息构建ManifestGroup。可以这样理解,ManifestGroup就是一系列查询过滤条件的结构体
  ManifestGroup manifestGroup =  
      new ManifestGroup(io, dataManifests, deleteManifests)  
          .caseSensitive(isCaseSensitive())  
          .select(scanColumns())  
          .filterData(filter())  
          .specsById(table().specs())  
          .scanMetrics(scanMetrics())  
          .ignoreDeleted()  
          .columnsToKeepStats(columnsToKeepStats());  
  // 忽略残差文件,残差文件是某些操作或者异常导致的未被正常记录或处理的文件
  if (shouldIgnoreResiduals()) {  
    manifestGroup = manifestGroup.ignoreResiduals();  
  }  
  // 是否使用线程池,多线程构建文件扫描计划
  if (shouldPlanWithExecutor() && (dataManifests.size() > 1 || deleteManifests.size() > 1)) {  
    manifestGroup = manifestGroup.planWith(planExecutor());  
  }  
  // 基于ManifestGroup构建文件扫描计划
  return manifestGroup.planFiles();  
}

下面查看ManifestGroup的实现

// 调用plan函数,并传递了一个回调函数
public CloseableIterable<FileScanTask> planFiles() {  
  return plan(ManifestGroup::createFileScanTasks);  
}  
  
public <T extends ScanTask> CloseableIterable<T> plan(CreateTasksFunction<T> createTasksFunc) {  
  LoadingCache<Integer, ResidualEvaluator> residualCache =  
      Caffeine.newBuilder()  
          .build(  
              specId -> {  
                PartitionSpec spec = specsById.get(specId);  
                Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;  
                return ResidualEvaluator.of(spec, filter, caseSensitive);  
              });  
  
  DeleteFileIndex deleteFiles = deleteIndexBuilder.scanMetrics(scanMetrics).build();  
  
  boolean dropStats = ManifestReader.dropStats(columns); 
  // 存在删除文件,添加统计列 
  if (!deleteFiles.isEmpty()) {  
    select(ManifestReader.withStatsColumns(columns));  
  }  
 
  // 分区上下文的缓存对象
  LoadingCache<Integer, TaskContext> taskContextCache =  
      Caffeine.newBuilder()  
          .build(  
              specId -> {  
                PartitionSpec spec = specsById.get(specId);  
                ResidualEvaluator residuals = residualCache.get(specId);  
                return new TaskContext(  
                    spec, deleteFiles, residuals, dropStats, columnsToKeepStats, scanMetrics);  
              });  
 
  // 根据manifest的分区信息,获取到分区上下文,然后创建数据文件扫描任务
  Iterable<CloseableIterable<T>> tasks =  
      entries(  
          (manifest, entries) -> {  
            int specId = manifest.partitionSpecId();  
            TaskContext taskContext = taskContextCache.get(specId);  
            return createTasksFunc.apply(entries, taskContext);  
          });  
  // 多线程并发的设置
  if (executorService != null) {  
    return new ParallelIterable<>(tasks, executorService);  
  } else {  
    return CloseableIterable.concat(tasks);  
  }  
}

那么接下来就是进入entries方法了,在这里先过滤掉不需要的manifest文件,然后再根据原信息和manifest记录的数据文件元信息过滤掉不需要的数据文件

private <T> Iterable<CloseableIterable<T>> entries(  
    BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>>  
        entryFn) {  
  // 分区匹配器的缓存
  LoadingCache<Integer, ManifestEvaluator> evalCache =  
      specsById == null  
          ? null  
          : Caffeine.newBuilder()  
              .build(  
                  specId -> {  
                    PartitionSpec spec = specsById.get(specId);  
                    return ManifestEvaluator.forPartitionFilter(  
                        Expressions.and(  
                            partitionFilter,  
                            Projections.inclusive(spec, caseSensitive).project(dataFilter)),  
                        spec,  
                        caseSensitive);  
                  });  
  // 数据文件的过滤计算器
  Evaluator evaluator;  
  if (fileFilter != null && fileFilter != Expressions.alwaysTrue()) {  
    evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter, caseSensitive);  
  } else {  
    evaluator = null;  
  }  
  // ManifestGroup中manifest文件的迭代器
  CloseableIterable<ManifestFile> closeableDataManifests =  
      CloseableIterable.withNoopClose(dataManifests);  
  // 返回会使用过滤条件过滤的manifest迭代器
  CloseableIterable<ManifestFile> matchingManifests =  
      evalCache == null  
          ? closeableDataManifests  
          : CloseableIterable.filter(  
              scanMetrics.skippedDataManifests(),  
              closeableDataManifests,  
              manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));  
  // 设置了忽略删除文件,那么就过滤掉没有任务added 或exist文件的manifest
  if (ignoreDeleted) {  
    // only scan manifests that have entries other than deletes  
    // remove any manifests that don't have any existing or added files. if either the added or    // existing files count is missing, the manifest must be scanned.    matchingManifests =  
        CloseableIterable.filter(  
            scanMetrics.skippedDataManifests(),  
            matchingManifests,  
            manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles());  
  }  
  // 过滤掉没有added或者deleted文件的manifest
  if (ignoreExisting) {  
    // only scan manifests that have entries other than existing  
    // remove any manifests that don't have any deleted or added files. if either the added or    // deleted files count is missing, the manifest must be scanned.    matchingManifests =  
        CloseableIterable.filter(  
            scanMetrics.skippedDataManifests(),  
            matchingManifests,  
            manifest -> manifest.hasAddedFiles() || manifest.hasDeletedFiles());  
  }  
  // 包装一次,迭代的过程中增加计数器的值
  matchingManifests =  
      CloseableIterable.count(scanMetrics.scannedDataManifests(), matchingManifests);  
  // 返回一个transform迭代器,用于将machingManifest使用设置函数进行转换
  return Iterables.transform(  
      matchingManifests,  
      manifest ->  
          new CloseableIterable<T>() {  
            private CloseableIterable<T> iterable;  
  
            @Override  
            public CloseableIterator<T> iterator() {  
              // 构造manifest数据文件的读取器,并设置好过滤条件
              ManifestReader<DataFile> reader =  
                  ManifestFiles.read(manifest, io, specsById)  
                      .filterRows(dataFilter)  
                      .filterPartitions(partitionFilter)  
                      .caseSensitive(caseSensitive)  
                      .select(columns)  
                      .scanMetrics(scanMetrics);  
  
              CloseableIterable<ManifestEntry<DataFile>> entries;  
              if (ignoreDeleted) {  
                entries = reader.liveEntries();  
              } else {  
                entries = reader.entries();  
              }  
			  // 过滤
              if (ignoreExisting) {  
                entries =  
                    CloseableIterable.filter(  
                        scanMetrics.skippedDataFiles(),  
                        entries,  
                        entry -> entry.status() != ManifestEntry.Status.EXISTING);  
              }  
			  // 过滤
              if (evaluator != null) {  
                entries =  
                    CloseableIterable.filter(  
                        scanMetrics.skippedDataFiles(),  
                        entries,  
                        entry -> evaluator.eval((GenericDataFile) entry.file()));  
              }  
			  // 再过滤
              entries =  
                  CloseableIterable.filter(  
                      scanMetrics.skippedDataFiles(), entries, manifestEntryPredicate);  
			  // 回调传入的函数
              iterable = entryFn.apply(manifest, entries);  
  
              return iterable.iterator();  
            }  
  
            @Override  
            public void close() throws IOException {  
              if (iterable != null) {  
                iterable.close();  
              }  
            }  
          });  
}

现在回到最上层传入的entryFn,也就是ManifestGroup::createFileScanTasks函数。也就是将过滤好的要扫描的数据文件转为文件扫描任务,也就是扫描的数据文件、关联的删除文件、schema、分区信息等内容封装成任务。

private static CloseableIterable<FileScanTask> createFileScanTasks(  
    CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext ctx) {  
  return CloseableIterable.transform(  
      entries,  
      entry -> {  
        DataFile dataFile =  
            ContentFileUtil.copy(entry.file(), ctx.shouldKeepStats(), ctx.columnsToKeepStats());  
        // 这个文件有关的删除文件
        DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);  
        ScanMetricsUtil.fileTask(ctx.scanMetrics(), dataFile, deleteFiles);  
        return new BaseFileScanTask(  
            dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), ctx.residuals());  
      });  
}

总结下来,就是获取快照,然后读取快照中的ManifestList,构建出ManifestGroup。然后使用扫描上下文以及快照当中的元数据信息先过滤manifest,然后再额外根据manifest记录的数据文件元数据进一步过滤掉不需要的元数据文件。最后将所有需要扫描的数据文件创建为一个文件扫描任务。

这里我们比较关心的是过滤逻辑的实现,分为manifest文件过滤和manifest下的数据文件过滤两部分,一一探讨一下。

manifest文件过滤

在metadata当中记录了表的元信息,这些元信息中包含历史版本的,而用户传递的上下文中指定了在后续查询当中会生效的元信息,例如schema版本,spec版本等等,除此之外,在上下文中还包含了查询的一些额外信息,例如查询的字段列,其实都可以看作是过滤条件。

来看这个过滤的逻辑具体是怎么实现的,具体的代码在org.apache.iceberg.DataTableScan#doPlanFiles当中,前面已经介绍过了。这里就要展示详细介绍使用哪些条件,如何过滤。

  1. 首先是读取上下文指定快照信息或者最新的快照信息。也就是snap-xxx文件.
Snapshot snapshot = snapshot();
  1. 读取manifest列表信息,FileIO是存储层的抽象,这里不需要关注。基于snapshot中的信息可以获取到manifest列表,注意,这里读取到的是snapshot中记录的manifest文件的元数据
FileIO io = table().io();  
List<ManifestFile> dataManifests = snapshot.dataManifests(io);  
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io);
 
ManifestGroup manifestGroup =  
    new ManifestGroup(io, dataManifests, deleteManifests)  
        .caseSensitive(isCaseSensitive()) // 是否大小写敏感 
        .select(scanColumns())  // 要扫描的manifest的列
        .filterData(filter())  // 过滤条件
        .specsById(table().specs())  // 表的全部版本的分区信息
        .scanMetrics(scanMetrics())  
        .ignoreDeleted()  // 是否忽略删除的文件
        .columnsToKeepStats(columnsToKeepStats());  
  
if (shouldIgnoreResiduals()) {  
  manifestGroup = manifestGroup.ignoreResiduals();  
}  
  
if (shouldPlanWithExecutor() && (dataManifests.size() > 1 || deleteManifests.size() > 1)) {  
  manifestGroup = manifestGroup.planWith(planExecutor());  
}