Class CreateChangelogViewProcedure

java.lang.Object
org.apache.iceberg.spark.procedures.CreateChangelogViewProcedure
All Implemented Interfaces:
Procedure

public class CreateChangelogViewProcedure extends Object
A procedure that creates a view for changed rows.

The procedure always removes the carry-over rows. Please query SparkChangelogTable instead when carry-over rows are required.

The procedure doesn't compute the pre/post update images by default. If you want to compute them, you can set "compute_updates" to be true in the options.

Carry-over rows are the result of a removal and insertion of the same row within an operation because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a', op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the table. The procedure finds the carry-over rows and removes them from the result.

Pre/post update images are converted from a pair of a delete row and an insert row. Identifier columns are used for determining whether an insert and a delete record refer to the same row. If the two records share the same values for the identity columns they are considered to be before and after states of the same row. You can either set identifier fields in the table schema or input them as the procedure parameters. Here is an example of pre/post update images with an identifier column(id). A pair of a delete row and an insert row with the same id:

  • (id=1, data='a', op='DELETE')
  • (id=1, data='b', op='INSERT')

will be marked as pre/post update images:

  • (id=1, data='a', op='UPDATE_BEFORE')
  • (id=1, data='b', op='UPDATE_AFTER')
  • Field Details

    • STRING_MAP

      protected static final org.apache.spark.sql.types.DataType STRING_MAP
    • STRING_ARRAY

      protected static final org.apache.spark.sql.types.DataType STRING_ARRAY
  • Method Details

    • builder

      public static SparkProcedures.ProcedureBuilder builder()
    • parameters

      public ProcedureParameter[] parameters()
      Description copied from interface: Procedure
      Returns the input parameters of this procedure.
    • outputType

      public org.apache.spark.sql.types.StructType outputType()
      Description copied from interface: Procedure
      Returns the type of rows produced by this procedure.
    • call

      public org.apache.spark.sql.catalyst.InternalRow[] call(org.apache.spark.sql.catalyst.InternalRow args)
      Description copied from interface: Procedure
      Executes this procedure.

      Spark will align the provided arguments according to the input parameters defined in Procedure.parameters() either by position or by name before execution.

      Implementations may provide a summary of execution by returning one or many rows as a result. The schema of output rows must match the defined output type in Procedure.outputType().

      Parameters:
      args - input arguments
      Returns:
      the result of executing this procedure with the given arguments
    • description

      public String description()
      Description copied from interface: Procedure
      Returns the description of this procedure.
    • spark

      protected org.apache.spark.sql.SparkSession spark()
    • actions

      protected SparkActions actions()
    • tableCatalog

      protected org.apache.spark.sql.connector.catalog.TableCatalog tableCatalog()
    • modifyIcebergTable

      protected <T> T modifyIcebergTable(org.apache.spark.sql.connector.catalog.Identifier ident, Function<Table,T> func)
    • withIcebergTable

      protected <T> T withIcebergTable(org.apache.spark.sql.connector.catalog.Identifier ident, Function<Table,T> func)
    • toIdentifier

      protected org.apache.spark.sql.connector.catalog.Identifier toIdentifier(String identifierAsString, String argName)
    • toCatalogAndIdentifier

      protected Spark3Util.CatalogAndIdentifier toCatalogAndIdentifier(String identifierAsString, String argName, org.apache.spark.sql.connector.catalog.CatalogPlugin catalog)
    • loadSparkTable

      protected SparkTable loadSparkTable(org.apache.spark.sql.connector.catalog.Identifier ident)
    • loadRows

      protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> loadRows(org.apache.spark.sql.connector.catalog.Identifier tableIdent, Map<String,String> options)
    • refreshSparkCache

      protected void refreshSparkCache(org.apache.spark.sql.connector.catalog.Identifier ident, org.apache.spark.sql.connector.catalog.Table table)
    • filterExpression

      protected Expression filterExpression(org.apache.spark.sql.connector.catalog.Identifier ident, String where)
    • newInternalRow

      protected org.apache.spark.sql.catalyst.InternalRow newInternalRow(Object... values)
    • closeService

      protected void closeService()
      Closes this procedure's executor service if a new one was created with BaseProcedure.executorService(int, String). Does not block for any remaining tasks.
    • executorService

      protected ExecutorService executorService(int threadPoolSize, String nameFormat)
      Starts a new executor service which can be used by this procedure in its work. The pool will be automatically shut down if withIcebergTable(Identifier, Function) or modifyIcebergTable(Identifier, Function) are called. If these methods are not used then the service can be shut down with closeService() or left to be closed when this class is finalized.
      Parameters:
      threadPoolSize - number of threads in the service
      nameFormat - name prefix for threads created in this service
      Returns:
      the new executor service owned by this procedure