package org.apache.iceberg.spark.extensions;

import java.util.List;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Row;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/iceberg/spark/extensions/TestChangelogTable.class */
public class TestChangelogTable extends ExtensionsTestBase {

    @Parameter(index = 3)
    private int formatVersion;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{SparkCatalogConfig.SPARK.catalogName(), SparkCatalogConfig.SPARK.implementation(), SparkCatalogConfig.SPARK.properties(), 1}, new Object[]{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), SparkCatalogConfig.HIVE.properties(), 2}};
    }

    @AfterEach
    public void removeTables() {
        sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
    }

    @TestTemplate
    public void testDataFilters() {
        createTableWithDefaultRows();
        sql("INSERT INTO %s VALUES (3, 'c')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot currentSnapshot = loadTable.currentSnapshot();
        sql("DELETE FROM %s WHERE id = 3", new Object[]{this.tableName});
        loadTable.refresh();
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{3, "c", "INSERT", 2, Long.valueOf(currentSnapshot.snapshotId())}), row(new Object[]{3, "c", "DELETE", 3, Long.valueOf(loadTable.currentSnapshot().snapshotId())})), sql("SELECT * FROM %s.changes WHERE id = 3 ORDER BY _change_ordinal, id", new Object[]{this.tableName}));
    }

    @TestTemplate
    public void testOverwrites() {
        createTableWithDefaultRows();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot currentSnapshot = loadTable.currentSnapshot();
        sql("INSERT OVERWRITE %s VALUES (-2, 'b')", new Object[]{this.tableName});
        loadTable.refresh();
        Snapshot currentSnapshot2 = loadTable.currentSnapshot();
        assertEquals("Rows should match", ImmutableList.of(row(new Object[]{2, "b", "DELETE", 0, Long.valueOf(currentSnapshot2.snapshotId())}), row(new Object[]{-2, "b", "INSERT", 0, Long.valueOf(currentSnapshot2.snapshotId())})), changelogRecords(currentSnapshot, currentSnapshot2));
    }

    @TestTemplate
    public void testQueryWithTimeRange() {
        createTable();
        sql("INSERT INTO %s VALUES (1, 'a')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot currentSnapshot = loadTable.currentSnapshot();
        long waitUntilAfter = waitUntilAfter(currentSnapshot.timestampMillis());
        sql("INSERT INTO %s VALUES (2, 'b')", new Object[]{this.tableName});
        loadTable.refresh();
        Snapshot currentSnapshot2 = loadTable.currentSnapshot();
        long waitUntilAfter2 = waitUntilAfter(currentSnapshot2.timestampMillis());
        sql("INSERT OVERWRITE %s VALUES (-2, 'b')", new Object[]{this.tableName});
        loadTable.refresh();
        Snapshot currentSnapshot3 = loadTable.currentSnapshot();
        long waitUntilAfter3 = waitUntilAfter(currentSnapshot3.timestampMillis());
        assertEquals("Should have expected changed rows only from snapshot 3", ImmutableList.of(row(new Object[]{2, "b", "DELETE", 0, Long.valueOf(currentSnapshot3.snapshotId())}), row(new Object[]{-2, "b", "INSERT", 0, Long.valueOf(currentSnapshot3.snapshotId())})), changelogRecords(Long.valueOf(waitUntilAfter2), Long.valueOf(currentSnapshot3.timestampMillis())));
        assertEquals("Should have expected changed rows only from snapshot 3", ImmutableList.of(row(new Object[]{2, "b", "DELETE", 0, Long.valueOf(currentSnapshot3.snapshotId())}), row(new Object[]{-2, "b", "INSERT", 0, Long.valueOf(currentSnapshot3.snapshotId())})), changelogRecords(Long.valueOf(currentSnapshot2.timestampMillis()), Long.valueOf(currentSnapshot3.timestampMillis())));
        assertEquals("Should have expected changed rows from snapshot 2 and 3", ImmutableList.of(row(new Object[]{2, "b", "INSERT", 0, Long.valueOf(currentSnapshot2.snapshotId())}), row(new Object[]{2, "b", "DELETE", 1, Long.valueOf(currentSnapshot3.snapshotId())}), row(new Object[]{-2, "b", "INSERT", 1, Long.valueOf(currentSnapshot3.snapshotId())})), changelogRecords(Long.valueOf(waitUntilAfter), Long.valueOf(currentSnapshot3.timestampMillis())));
        assertEquals("Should have expected changed rows up to the current snapshot", ImmutableList.of(row(new Object[]{2, "b", "INSERT", 0, Long.valueOf(currentSnapshot2.snapshotId())}), row(new Object[]{2, "b", "DELETE", 1, Long.valueOf(currentSnapshot3.snapshotId())}), row(new Object[]{-2, "b", "INSERT", 1, Long.valueOf(currentSnapshot3.snapshotId())})), changelogRecords(Long.valueOf(waitUntilAfter), (Long) null));
        assertEquals("Should have empty changed rows if end time is before the first snapshot", ImmutableList.of(), changelogRecords((Long) null, Long.valueOf(currentSnapshot.timestampMillis() - 1)));
        assertEquals("Should have empty changed rows if start time is after the current snapshot", ImmutableList.of(), changelogRecords(Long.valueOf(waitUntilAfter3), (Long) null));
        assertEquals("Should have empty changed rows if end time is before the first snapshot", ImmutableList.of(), changelogRecords((Long) null, Long.valueOf(currentSnapshot.timestampMillis() - 1)));
        assertEquals("Should have empty changed rows if there are no snapshots between start time and end time", ImmutableList.of(), changelogRecords(Long.valueOf(waitUntilAfter2), Long.valueOf(currentSnapshot3.timestampMillis() - 1)));
    }

    @TestTemplate
    public void testTimeRangeValidation() {
        createTableWithDefaultRows();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot currentSnapshot = loadTable.currentSnapshot();
        sql("INSERT OVERWRITE %s VALUES (-2, 'b')", new Object[]{this.tableName});
        loadTable.refresh();
        Snapshot currentSnapshot2 = loadTable.currentSnapshot();
        waitUntilAfter(currentSnapshot2.timestampMillis());
        Assertions.assertThatThrownBy(() -> {
            changelogRecords(Long.valueOf(currentSnapshot2.timestampMillis()), Long.valueOf(currentSnapshot.timestampMillis()));
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Cannot set start-timestamp to be greater than end-timestamp for changelogs");
    }

    @TestTemplate
    public void testMetadataDeletes() {
        createTableWithDefaultRows();
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot currentSnapshot = loadTable.currentSnapshot();
        sql("DELETE FROM %s WHERE data = 'a'", new Object[]{this.tableName});
        loadTable.refresh();
        Snapshot currentSnapshot2 = loadTable.currentSnapshot();
        ((AbstractStringAssert) Assertions.assertThat(currentSnapshot2.operation()).as("Operation must match", new Object[0])).isEqualTo("delete");
        assertEquals("Rows should match", ImmutableList.of(row(new Object[]{1, "a", "DELETE", 0, Long.valueOf(currentSnapshot2.snapshotId())})), changelogRecords(currentSnapshot, currentSnapshot2));
    }

    @TestTemplate
    public void testExistingEntriesInNewDataManifestsAreIgnored() {
        sql("CREATE TABLE %s (id INT, data STRING) USING iceberg PARTITIONED BY (data) TBLPROPERTIES (  '%s' = '%d',  '%s' = '1',  '%s' = 'true' )", new Object[]{this.tableName, "format-version", Integer.valueOf(this.formatVersion), "commit.manifest.min-count-to-merge", "commit.manifest-merge.enabled"});
        sql("INSERT INTO %s VALUES (1, 'a')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot currentSnapshot = loadTable.currentSnapshot();
        sql("INSERT INTO %s VALUES (2, 'b')", new Object[]{this.tableName});
        loadTable.refresh();
        Snapshot currentSnapshot2 = loadTable.currentSnapshot();
        Assertions.assertThat(currentSnapshot2.dataManifests(loadTable.io())).as("Manifest number must match", new Object[0]).hasSize(1);
        assertEquals("Rows should match", ImmutableList.of(row(new Object[]{2, "b", "INSERT", 0, Long.valueOf(currentSnapshot2.snapshotId())})), changelogRecords(currentSnapshot, currentSnapshot2));
    }

    @TestTemplate
    public void testManifestRewritesAreIgnored() {
        createTableWithDefaultRows();
        sql("CALL %s.system.rewrite_manifests('%s')", new Object[]{this.catalogName, this.tableIdent});
        Assertions.assertThat(this.validationCatalog.loadTable(this.tableIdent).snapshots()).as("Num snapshots must match", new Object[0]).hasSize(3);
        assertEquals("Should have expected rows", ImmutableList.of(row(new Object[]{1, "INSERT"}), row(new Object[]{2, "INSERT"})), sql("SELECT id, _change_type FROM %s.changes ORDER BY id", new Object[]{this.tableName}));
    }

    @TestTemplate
    public void testMetadataColumns() {
        createTableWithDefaultRows();
        List sql = sql("SELECT id, _file, _pos, _deleted, _spec_id, _partition FROM %s.changes ORDER BY id", new Object[]{this.tableName});
        String obj = ((Object[]) sql.get(0))[1].toString();
        Assertions.assertThat(obj).startsWith("file:/");
        assertEquals("Rows should match", ImmutableList.of(row(new Object[]{1, obj, 0L, false, 0, row(new Object[]{"a"})}), row(new Object[]{2, ((Object[]) sql.get(1))[1].toString(), 0L, false, 0, row(new Object[]{"b"})})), sql);
    }

    @TestTemplate
    public void testQueryWithRollback() {
        createTable();
        sql("INSERT INTO %s VALUES (1, 'a')", new Object[]{this.tableName});
        Table loadTable = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot currentSnapshot = loadTable.currentSnapshot();
        long waitUntilAfter = waitUntilAfter(currentSnapshot.timestampMillis());
        sql("INSERT INTO %s VALUES (2, 'b')", new Object[]{this.tableName});
        loadTable.refresh();
        Snapshot currentSnapshot2 = loadTable.currentSnapshot();
        long waitUntilAfter2 = waitUntilAfter(currentSnapshot2.timestampMillis());
        sql("CALL %s.system.rollback_to_snapshot('%s', %d)", new Object[]{this.catalogName, this.tableIdent, Long.valueOf(currentSnapshot.snapshotId())});
        loadTable.refresh();
        Assertions.assertThat(loadTable.currentSnapshot()).isEqualTo(currentSnapshot);
        sql("INSERT OVERWRITE %s VALUES (-2, 'a')", new Object[]{this.tableName});
        loadTable.refresh();
        Snapshot currentSnapshot3 = loadTable.currentSnapshot();
        assertEquals("Should have expected changed rows up to snapshot 3", ImmutableList.of(row(new Object[]{1, "a", "INSERT", 0, Long.valueOf(currentSnapshot.snapshotId())}), row(new Object[]{1, "a", "DELETE", 1, Long.valueOf(currentSnapshot3.snapshotId())}), row(new Object[]{-2, "a", "INSERT", 1, Long.valueOf(currentSnapshot3.snapshotId())})), changelogRecords((Long) null, Long.valueOf(waitUntilAfter(currentSnapshot3.timestampMillis()))));
        assertEquals("Should have expected changed rows up to snapshot 2", ImmutableList.of(row(new Object[]{1, "a", "INSERT", 0, Long.valueOf(currentSnapshot.snapshotId())})), changelogRecords((Long) null, Long.valueOf(waitUntilAfter2)));
        assertEquals("Should have expected changed rows from snapshot 3 only since snapshot 2 is on a different branch.", ImmutableList.of(row(new Object[]{1, "a", "DELETE", 0, Long.valueOf(currentSnapshot3.snapshotId())}), row(new Object[]{-2, "a", "INSERT", 0, Long.valueOf(currentSnapshot3.snapshotId())})), changelogRecords(Long.valueOf(waitUntilAfter), Long.valueOf(currentSnapshot3.timestampMillis())));
        assertEquals("Should have expected changed rows from snapshot 3", ImmutableList.of(row(new Object[]{1, "a", "DELETE", 0, Long.valueOf(currentSnapshot3.snapshotId())}), row(new Object[]{-2, "a", "INSERT", 0, Long.valueOf(currentSnapshot3.snapshotId())})), changelogRecords(Long.valueOf(waitUntilAfter2), (Long) null));
        sql("CALL %s.system.set_current_snapshot('%s', %d)", new Object[]{this.catalogName, this.tableIdent, Long.valueOf(currentSnapshot2.snapshotId())});
        loadTable.refresh();
        Assertions.assertThat(loadTable.currentSnapshot()).isEqualTo(currentSnapshot2);
        assertEquals("Should have expected changed rows from snapshot 2 only since snapshot 3 is on a different branch.", ImmutableList.of(row(new Object[]{2, "b", "INSERT", 0, Long.valueOf(currentSnapshot2.snapshotId())})), changelogRecords(Long.valueOf(waitUntilAfter), (Long) null));
    }

    private void createTableWithDefaultRows() {
        createTable();
        insertDefaultRows();
    }

    private void createTable() {
        sql("CREATE TABLE %s (id INT, data STRING) USING iceberg PARTITIONED BY (data) TBLPROPERTIES (  '%s' = '%d' )", new Object[]{this.tableName, "format-version", Integer.valueOf(this.formatVersion)});
    }

    private void insertDefaultRows() {
        sql("INSERT INTO %s VALUES (1, 'a')", new Object[]{this.tableName});
        sql("INSERT INTO %s VALUES (2, 'b')", new Object[]{this.tableName});
    }

    private List<Object[]> changelogRecords(Snapshot snapshot, Snapshot snapshot2) {
        DataFrameReader read = spark.read();
        if (snapshot != null) {
            read = read.option("start-snapshot-id", snapshot.snapshotId());
        }
        if (snapshot2 != null) {
            read = read.option("end-snapshot-id", snapshot2.snapshotId());
        }
        return rowsToJava(collect(read));
    }

    private List<Object[]> changelogRecords(Long l, Long l2) {
        DataFrameReader read = spark.read();
        if (l != null) {
            read = read.option("start-timestamp", l.longValue());
        }
        if (l2 != null) {
            read = read.option("end-timestamp", l2.longValue());
        }
        return rowsToJava(collect(read));
    }

    private List<Row> collect(DataFrameReader dataFrameReader) {
        return dataFrameReader.table(this.tableName + ".changes").orderBy("_change_ordinal", new String[]{"_commit_snapshot_id", "_change_type", "id"}).collectAsList();
    }
}
