public class SparkDistributedDataScan extends SnapshotScan<ThisT,T,G>
This scan remotely filters manifests, fetching only the relevant data and delete files to the driver. The delete file assignment is done locally after the remote filtering step. Such approach is beneficial if the remote parallelism is much higher than the number of driver cores.
This scan is best suited for queries with selective filters on lower/upper bounds across all partitions, or against poorly clustered metadata. This allows job planning to benefit from highly concurrent remote filtering while not incurring high serialization and data transfer costs. This class is also useful for full table scans over large tables but the cost of bringing data and delete file details to the driver may become noticeable. Make sure to follow the performance tips below in such cases.
Ensure the filtered metadata size doesn't exceed the driver's max result size. For large table scans, consider increasing `spark.driver.maxResultSize` to avoid job failures.
Performance tips:
Modifier and Type | Field and Description |
---|---|
protected static java.util.List<java.lang.String> |
DELETE_SCAN_COLUMNS |
protected static java.util.List<java.lang.String> |
DELETE_SCAN_WITH_STATS_COLUMNS |
protected static boolean |
PLAN_SCANS_WITH_WORKER_POOL |
protected static java.util.List<java.lang.String> |
SCAN_COLUMNS |
protected static java.util.List<java.lang.String> |
SCAN_WITH_STATS_COLUMNS |
Constructor and Description |
---|
SparkDistributedDataScan(org.apache.spark.sql.SparkSession spark,
Table table,
SparkReadConf readConf) |
Modifier and Type | Method and Description |
---|---|
ThisT |
caseSensitive(boolean caseSensitive)
Create a new scan from this that, if data columns where selected via
Scan.select(java.util.Collection) , controls whether the match to the schema will be done with case
sensitivity. |
protected java.util.Set<java.lang.Integer> |
columnsToKeepStats() |
protected org.apache.iceberg.TableScanContext |
context() |
protected PlanningMode |
dataPlanningMode()
Returns which planning mode to use for data.
|
protected PlanningMode |
deletePlanningMode()
Returns which planning mode to use for deletes.
|
protected CloseableIterable<ScanTask> |
doPlanFiles() |
Expression |
filter()
Returns this scan's filter
Expression . |
ThisT |
filter(Expression expr)
Create a new scan from the results of this filtered by the
Expression . |
ThisT |
ignoreResiduals()
Create a new scan from this that applies data filtering to files but not to rows in those
files.
|
ThisT |
includeColumnStats()
Create a new scan from this that loads the column stats with each data file.
|
ThisT |
includeColumnStats(java.util.Collection<java.lang.String> requestedColumns)
Create a new scan from this that loads the column stats for the specific columns with each data
file.
|
protected FileIO |
io() |
boolean |
isCaseSensitive()
Returns whether this scan is case-sensitive with respect to column names.
|
ThisT |
metricsReporter(MetricsReporter reporter)
Create a new scan that will report scan metrics to the provided reporter in addition to
reporters maintained by the scan.
|
protected org.apache.iceberg.ManifestGroup |
newManifestGroup(java.util.List<ManifestFile> dataManifests,
boolean withColumnStats) |
protected org.apache.iceberg.ManifestGroup |
newManifestGroup(java.util.List<ManifestFile> dataManifests,
java.util.List<ManifestFile> deleteManifests) |
protected org.apache.iceberg.ManifestGroup |
newManifestGroup(java.util.List<ManifestFile> dataManifests,
java.util.List<ManifestFile> deleteManifests,
boolean withColumnStats) |
protected BatchScan |
newRefinedScan(Table newTable,
Schema newSchema,
org.apache.iceberg.TableScanContext newContext) |
ThisT |
option(java.lang.String property,
java.lang.String value)
Create a new scan from this scan's configuration that will override the
Table 's
behavior based on the incoming pair. |
protected java.util.Map<java.lang.String,java.lang.String> |
options() |
protected java.lang.Iterable<CloseableIterable<DataFile>> |
planDataRemotely(java.util.List<ManifestFile> dataManifests,
boolean withColumnStats)
Plans data remotely.
|
protected org.apache.iceberg.DeleteFileIndex |
planDeletesRemotely(java.util.List<ManifestFile> deleteManifests)
Plans deletes remotely.
|
protected java.util.concurrent.ExecutorService |
planExecutor() |
CloseableIterable<ScanTaskGroup<ScanTask>> |
planTasks()
Plan balanced task groups for this scan by splitting large and combining small tasks.
|
ThisT |
planWith(java.util.concurrent.ExecutorService executorService)
Create a new scan to use a particular executor to plan.
|
ThisT |
project(Schema projectedSchema)
Create a new scan from this with the schema as its projection.
|
protected int |
remoteParallelism()
Returns the cluster parallelism.
|
protected Expression |
residualFilter() |
protected java.util.List<java.lang.String> |
scanColumns() |
Schema |
schema()
Returns this scan's projection
Schema . |
ThisT |
select(java.util.Collection<java.lang.String> columns)
Create a new scan from this that will read the given data columns.
|
protected boolean |
shouldCopyRemotelyPlannedDataFiles()
Controls whether defensive copies are created for remotely planned data files.
|
protected boolean |
shouldIgnoreResiduals() |
protected boolean |
shouldPlanWithExecutor() |
protected boolean |
shouldReturnColumnStats() |
int |
splitLookback()
Returns the split lookback for this scan.
|
long |
splitOpenFileCost()
Returns the split open file cost for this scan.
|
Table |
table() |
protected Schema |
tableSchema() |
long |
targetSplitSize()
Returns the target split size for this scan.
|
protected boolean |
useSnapshotSchema() |
asOfTime, planFiles, scanMetrics, snapshot, snapshotId, toString, useRef, useSnapshot
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
asOfTime, snapshot, table, useRef, useSnapshot
caseSensitive, filter, filter, ignoreResiduals, includeColumnStats, includeColumnStats, isCaseSensitive, metricsReporter, option, planFiles, planWith, project, schema, select, select, splitLookback, splitOpenFileCost, targetSplitSize
protected static final java.util.List<java.lang.String> SCAN_COLUMNS
protected static final java.util.List<java.lang.String> SCAN_WITH_STATS_COLUMNS
protected static final java.util.List<java.lang.String> DELETE_SCAN_COLUMNS
protected static final java.util.List<java.lang.String> DELETE_SCAN_WITH_STATS_COLUMNS
protected static final boolean PLAN_SCANS_WITH_WORKER_POOL
public SparkDistributedDataScan(org.apache.spark.sql.SparkSession spark, Table table, SparkReadConf readConf)
protected BatchScan newRefinedScan(Table newTable, Schema newSchema, org.apache.iceberg.TableScanContext newContext)
protected int remoteParallelism()
This value indicates the maximum number of manifests that can be processed concurrently by the cluster. Implementations should take into account both the currently available processing slots and potential dynamic allocation, if applicable.
The remote parallelism is compared against the size of the thread pool available locally to determine the feasibility of remote planning. This value is ignored if the planning mode is set explicitly as local or distributed.
protected PlanningMode dataPlanningMode()
protected boolean shouldCopyRemotelyPlannedDataFiles()
By default, this class creates defensive copies for each data file that is planned remotely, assuming the provided iterable can be lazy and may reuse objects. If unnecessary and data file objects can be safely added into a collection, implementations can override this behavior.
protected java.lang.Iterable<CloseableIterable<DataFile>> planDataRemotely(java.util.List<ManifestFile> dataManifests, boolean withColumnStats)
Implementations are encouraged to return groups of matching data files, enabling this class to process multiple groups concurrently to speed up the remaining work. This is particularly useful when dealing with equality deletes, as delete index lookups with such delete files require comparing bounds and typically benefit from parallelization.
If the result iterable reuses objects, shouldCopyRemotelyPlannedDataFiles()
must
return true.
The input data manifests have been already filtered to include only potential matches based on the scan filter. Implementations are expected to further filter these manifests and only return files that may hold data matching the scan filter.
dataManifests
- data manifests that may contain files matching the scan filterwithColumnStats
- a flag whether to load column statsprotected PlanningMode deletePlanningMode()
protected org.apache.iceberg.DeleteFileIndex planDeletesRemotely(java.util.List<ManifestFile> deleteManifests)
The input delete manifests have been already filtered to include only potential matches based on the scan filter. Implementations are expected to further filter these manifests and return files that may hold deletes matching the scan filter.
deleteManifests
- delete manifests that may contain files matching the scan filterprotected CloseableIterable<ScanTask> doPlanFiles()
doPlanFiles
in class SnapshotScan<BatchScan,ScanTask,ScanTaskGroup<ScanTask>>
public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks()
Scan
Task groups created by this method may read partial input files, multiple input files or both.
protected boolean useSnapshotSchema()
useSnapshotSchema
in class SnapshotScan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
protected org.apache.iceberg.ManifestGroup newManifestGroup(java.util.List<ManifestFile> dataManifests, java.util.List<ManifestFile> deleteManifests)
protected org.apache.iceberg.ManifestGroup newManifestGroup(java.util.List<ManifestFile> dataManifests, boolean withColumnStats)
protected org.apache.iceberg.ManifestGroup newManifestGroup(java.util.List<ManifestFile> dataManifests, java.util.List<ManifestFile> deleteManifests, boolean withColumnStats)
public Table table()
protected FileIO io()
protected Schema tableSchema()
protected org.apache.iceberg.TableScanContext context()
protected java.util.Map<java.lang.String,java.lang.String> options()
protected java.util.List<java.lang.String> scanColumns()
protected boolean shouldReturnColumnStats()
protected java.util.Set<java.lang.Integer> columnsToKeepStats()
protected boolean shouldIgnoreResiduals()
protected Expression residualFilter()
protected boolean shouldPlanWithExecutor()
protected java.util.concurrent.ExecutorService planExecutor()
public ThisT option(java.lang.String property, java.lang.String value)
Scan
Table
's
behavior based on the incoming pair. Unknown properties will be ignored.option
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
property
- name of the table property to be overriddenvalue
- value to override withpublic ThisT project(Schema projectedSchema)
Scan
project
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
projectedSchema
- a projection schemapublic ThisT caseSensitive(boolean caseSensitive)
Scan
Scan.select(java.util.Collection)
, controls whether the match to the schema will be done with case
sensitivity. Default is true.caseSensitive
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
public boolean isCaseSensitive()
Scan
isCaseSensitive
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
public ThisT includeColumnStats()
Scan
Column stats include: value count, null value count, lower bounds, and upper bounds.
includeColumnStats
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
public ThisT includeColumnStats(java.util.Collection<java.lang.String> requestedColumns)
Scan
Column stats include: value count, null value count, lower bounds, and upper bounds.
includeColumnStats
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
requestedColumns
- column names for which to keep the stats.public ThisT select(java.util.Collection<java.lang.String> columns)
Scan
select
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
columns
- column names from the table's schemapublic ThisT filter(Expression expr)
Scan
Expression
.filter
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
expr
- a filter expressionpublic Expression filter()
Scan
Expression
.filter
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
public ThisT ignoreResiduals()
Scan
ignoreResiduals
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
public ThisT planWith(java.util.concurrent.ExecutorService executorService)
Scan
planWith
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
executorService
- the provided executorpublic Schema schema()
Scan
Schema
.
If the projection schema was set directly using Scan.project(Schema)
, returns that
schema.
If the projection schema was set by calling Scan.select(Collection)
, returns a
projection schema that includes the selected data fields and any fields used in the filter
expression.
schema
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
public long targetSplitSize()
Scan
targetSplitSize
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
public int splitLookback()
Scan
splitLookback
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
public long splitOpenFileCost()
Scan
splitOpenFileCost
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>
public ThisT metricsReporter(MetricsReporter reporter)
Scan
metricsReporter
in interface Scan<ThisT,T extends ScanTask,G extends ScanTaskGroup<T>>