Class CreateChangelogViewProcedure
- All Implemented Interfaces:
- org.apache.spark.sql.connector.catalog.procedures.BoundProcedure,- org.apache.spark.sql.connector.catalog.procedures.Procedure,- org.apache.spark.sql.connector.catalog.procedures.UnboundProcedure
The procedure always removes the carry-over rows. Please query SparkChangelogTable
 instead when carry-over rows are required.
 
The procedure doesn't compute the pre/post update images by default. If you want to compute them, you can set "compute_updates" to be true in the options.
Carry-over rows are the result of a removal and insertion of the same row within an operation because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a', op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the table. The procedure finds the carry-over rows and removes them from the result.
Pre/post update images are converted from a pair of a delete row and an insert row. Identifier columns are used for determining whether an insert and a delete record refer to the same row. If the two records share the same values for the identity columns they are considered to be before and after states of the same row. You can either set identifier fields in the table schema or input them as the procedure parameters. Here is an example of pre/post update images with an identifier column(id). A pair of a delete row and an insert row with the same id:
- (id=1, data='a', op='DELETE')
- (id=1, data='b', op='INSERT')
will be marked as pre/post update images:
- (id=1, data='a', op='UPDATE_BEFORE')
- (id=1, data='b', op='UPDATE_AFTER')
- 
Field SummaryFieldsModifier and TypeFieldDescriptionprotected static final org.apache.spark.sql.types.DataTypeprotected static final org.apache.spark.sql.types.DataType
- 
Method SummaryModifier and TypeMethodDescriptionprotected SparkActionsactions()protected Iterator<org.apache.spark.sql.connector.read.Scan> asScanIterator(org.apache.spark.sql.types.StructType readSchema, org.apache.spark.sql.catalyst.InternalRow... rows) org.apache.spark.sql.connector.catalog.procedures.BoundProcedurebind(org.apache.spark.sql.types.StructType inputType) builder()Iterator<org.apache.spark.sql.connector.read.Scan> call(org.apache.spark.sql.catalyst.InternalRow args) protected voidCloses this procedure's executor service if a new one was created withBaseProcedure.executorService(int, String).protected ExecutorServiceexecutorService(int threadPoolSize, String nameFormat) Starts a new executor service which can be used by this procedure in its work.protected ExpressionfilterExpression(org.apache.spark.sql.connector.catalog.Identifier ident, String where) booleanprotected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> protected SparkTableloadSparkTable(org.apache.spark.sql.connector.catalog.Identifier ident) protected <T> TmodifyIcebergTable(org.apache.spark.sql.connector.catalog.Identifier ident, Function<Table, T> func) name()protected org.apache.spark.sql.catalyst.InternalRownewInternalRow(Object... values) protected static org.apache.spark.sql.connector.catalog.procedures.ProcedureParameteroptionalInParameter(String name, org.apache.spark.sql.types.DataType dataType) protected static org.apache.spark.sql.connector.catalog.procedures.ProcedureParameteroptionalInParameter(String name, org.apache.spark.sql.types.DataType dataType, String defaultValue) org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter[]protected voidrefreshSparkCache(org.apache.spark.sql.connector.catalog.Identifier ident, org.apache.spark.sql.connector.catalog.Table table) protected static org.apache.spark.sql.connector.catalog.procedures.ProcedureParameterrequiredInParameter(String name, org.apache.spark.sql.types.DataType dataType) protected org.apache.spark.sql.classic.SparkSessionspark()protected org.apache.spark.sql.connector.catalog.TableCatalogprotected Spark3Util.CatalogAndIdentifiertoCatalogAndIdentifier(String identifierAsString, String argName, org.apache.spark.sql.connector.catalog.CatalogPlugin catalog) protected org.apache.spark.sql.connector.catalog.IdentifiertoIdentifier(String identifierAsString, String argName) protected <T> TwithIcebergTable(org.apache.spark.sql.connector.catalog.Identifier ident, Function<Table, T> func) 
- 
Field Details- 
STRING_MAPprotected static final org.apache.spark.sql.types.DataType STRING_MAP
- 
STRING_ARRAYprotected static final org.apache.spark.sql.types.DataType STRING_ARRAY
 
- 
- 
Method Details- 
builder
- 
bindpublic org.apache.spark.sql.connector.catalog.procedures.BoundProcedure bind(org.apache.spark.sql.types.StructType inputType) 
- 
parameterspublic org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter[] parameters()
- 
callpublic Iterator<org.apache.spark.sql.connector.read.Scan> call(org.apache.spark.sql.catalyst.InternalRow args) 
- 
name
- 
description
- 
requiredInParameterprotected static org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter requiredInParameter(String name, org.apache.spark.sql.types.DataType dataType) 
- 
optionalInParameterprotected static org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter optionalInParameter(String name, org.apache.spark.sql.types.DataType dataType) 
- 
optionalInParameter
- 
isDeterministicpublic boolean isDeterministic()- Specified by:
- isDeterministicin interface- org.apache.spark.sql.connector.catalog.procedures.BoundProcedure
 
- 
sparkprotected org.apache.spark.sql.classic.SparkSession spark()
- 
actions
- 
tableCatalogprotected org.apache.spark.sql.connector.catalog.TableCatalog tableCatalog()
- 
modifyIcebergTable
- 
withIcebergTable
- 
toIdentifier
- 
toCatalogAndIdentifierprotected Spark3Util.CatalogAndIdentifier toCatalogAndIdentifier(String identifierAsString, String argName, org.apache.spark.sql.connector.catalog.CatalogPlugin catalog) 
- 
loadSparkTable
- 
loadRows
- 
refreshSparkCacheprotected void refreshSparkCache(org.apache.spark.sql.connector.catalog.Identifier ident, org.apache.spark.sql.connector.catalog.Table table) 
- 
filterExpressionprotected Expression filterExpression(org.apache.spark.sql.connector.catalog.Identifier ident, String where) 
- 
newInternalRow
- 
asScanIteratorprotected Iterator<org.apache.spark.sql.connector.read.Scan> asScanIterator(org.apache.spark.sql.types.StructType readSchema, org.apache.spark.sql.catalyst.InternalRow... rows) 
- 
closeServiceprotected void closeService()Closes this procedure's executor service if a new one was created withBaseProcedure.executorService(int, String). Does not block for any remaining tasks.
- 
executorServiceStarts a new executor service which can be used by this procedure in its work. The pool will be automatically shut down ifwithIcebergTable(Identifier, Function)ormodifyIcebergTable(Identifier, Function)are called. If these methods are not used then the service can be shut down withcloseService()or left to be closed when this class is finalized.- Parameters:
- threadPoolSize- number of threads in the service
- nameFormat- name prefix for threads created in this service
- Returns:
- the new executor service owned by this procedure
 
 
-