package org.apache.iceberg.flink;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.source.BoundedTableFactory;
import org.apache.iceberg.flink.source.ChangeLogTableTestBase;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/TestChangeLogTable.class */
public class TestChangeLogTable extends ChangeLogTableTestBase {
    private static final Configuration CONF = new Configuration();
    private static final String SOURCE_TABLE = "default_catalog.default_database.source_change_logs";
    private static final String CATALOG_NAME = "test_catalog";
    private static final String DATABASE_NAME = "test_db";
    private static final String TABLE_NAME = "test_table";
    private static String warehouse;
    private final boolean partitioned;

    @Parameterized.Parameters(name = "PartitionedTable={0}")
    public static Iterable<Object[]> parameters() {
        return ImmutableList.of(new Object[]{true}, new Object[]{false});
    }

    public TestChangeLogTable(boolean z) {
        this.partitioned = z;
    }

    @BeforeClass
    public static void createWarehouse() throws IOException {
        File newFolder = TEMPORARY_FOLDER.newFolder();
        Assert.assertTrue("The warehouse should be deleted", newFolder.delete());
        warehouse = String.format("file:%s", newFolder);
    }

    @Before
    public void before() {
        sql("CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", CATALOG_NAME, warehouse);
        sql("USE CATALOG %s", CATALOG_NAME);
        sql("CREATE DATABASE %s", DATABASE_NAME);
        sql("USE %s", DATABASE_NAME);
    }

    @Override // org.apache.iceberg.flink.source.ChangeLogTableTestBase
    @After
    public void clean() {
        sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
        sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
        sql("DROP CATALOG IF EXISTS %s", CATALOG_NAME);
        BoundedTableFactory.clearDataSets();
    }

    @Test
    public void testSqlChangeLogOnIdKey() throws Exception {
        testSqlChangeLog(TABLE_NAME, ImmutableList.of("id"), ImmutableList.of(ImmutableList.of(insertRow(1, "aaa"), deleteRow(1, "aaa"), insertRow(1, "bbb"), insertRow(2, "aaa"), deleteRow(2, "aaa"), insertRow(2, "bbb")), ImmutableList.of(updateBeforeRow(2, "bbb"), updateAfterRow(2, "ccc"), deleteRow(2, "ccc"), insertRow(2, "ddd")), ImmutableList.of(deleteRow(1, "bbb"), insertRow(1, "ccc"), deleteRow(1, "ccc"), insertRow(1, "ddd"))), ImmutableList.of(ImmutableList.of(record(1, "bbb"), record(2, "bbb")), ImmutableList.of(record(1, "bbb"), record(2, "ddd")), ImmutableList.of(record(1, "ddd"), record(2, "ddd"))));
    }

    @Test
    public void testChangeLogOnDataKey() throws Exception {
        testSqlChangeLog(TABLE_NAME, ImmutableList.of("data"), ImmutableList.of(ImmutableList.of(insertRow(1, "aaa"), deleteRow(1, "aaa"), insertRow(2, "bbb"), insertRow(1, "bbb"), insertRow(2, "aaa")), ImmutableList.of(updateBeforeRow(2, "aaa"), updateAfterRow(1, "ccc"), insertRow(1, "aaa")), ImmutableList.of(deleteRow(1, "bbb"), insertRow(2, "aaa"), insertRow(2, "ccc"))), ImmutableList.of(ImmutableList.of(record(1, "bbb"), record(2, "aaa")), ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")), ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))));
    }

    @Test
    public void testChangeLogOnIdDataKey() throws Exception {
        testSqlChangeLog(TABLE_NAME, ImmutableList.of("data", "id"), ImmutableList.of(ImmutableList.of(insertRow(1, "aaa"), deleteRow(1, "aaa"), insertRow(2, "bbb"), insertRow(1, "bbb"), insertRow(2, "aaa")), ImmutableList.of(updateBeforeRow(2, "aaa"), updateAfterRow(1, "ccc"), insertRow(1, "aaa")), ImmutableList.of(deleteRow(1, "bbb"), insertRow(2, "aaa"))), ImmutableList.of(ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")), ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")), ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb"))));
    }

    @Test
    public void testPureInsertOnIdKey() throws Exception {
        testSqlChangeLog(TABLE_NAME, ImmutableList.of("data"), ImmutableList.of(ImmutableList.of(insertRow(1, "aaa"), insertRow(2, "bbb")), ImmutableList.of(insertRow(3, "ccc"), insertRow(4, "ddd")), ImmutableList.of(insertRow(5, "eee"), insertRow(6, "fff"))), ImmutableList.of(ImmutableList.of(record(1, "aaa"), record(2, "bbb")), ImmutableList.of(record(1, "aaa"), record(2, "bbb"), record(3, "ccc"), record(4, "ddd")), ImmutableList.of(record(1, "aaa"), record(2, "bbb"), record(3, "ccc"), record(4, "ddd"), record(5, "eee"), record(6, "fff"))));
    }

    private Record record(int i, String str) {
        return SimpleDataUtil.createRecord(Integer.valueOf(i), str);
    }

    private Table createTable(String str, List<String> list, boolean z) {
        sql("CREATE TABLE %s(id INT, data VARCHAR, PRIMARY KEY(%s) NOT ENFORCED) %s", str, Joiner.on(',').join(list), z ? "PARTITIONED BY (data)" : "");
        BaseTable loadTable = CatalogLoader.hadoop("my_catalog", CONF, ImmutableMap.of("warehouse", warehouse)).loadCatalog().loadTable(TableIdentifier.of(new String[]{DATABASE_NAME, TABLE_NAME}));
        TableOperations operations = loadTable.operations();
        TableMetadata current = operations.current();
        operations.commit(current, current.upgradeToFormatVersion(2));
        return loadTable;
    }

    private void testSqlChangeLog(String str, List<String> list, List<List<Row>> list2, List<List<Record>> list3) throws Exception {
        sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL) WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, BoundedTableFactory.registerDataSet(list2));
        Assert.assertEquals("Should have the expected rows", listJoin(list2), sql("SELECT * FROM %s", SOURCE_TABLE));
        Table createTable = createTable(str, list, this.partitioned);
        sql("INSERT INTO %s SELECT * FROM %s", str, SOURCE_TABLE);
        createTable.refresh();
        List<Snapshot> findValidSnapshots = findValidSnapshots(createTable);
        int size = list3.size();
        Assert.assertEquals("Should have the expected snapshot number", size, findValidSnapshots.size());
        for (int i = 0; i < size; i++) {
            Assert.assertEquals("Should have the expected records for the checkpoint#" + i, expectedRowSet(createTable, list3.get(i)), actualRowSet(createTable, findValidSnapshots.get(i).snapshotId()));
        }
    }

    private List<Snapshot> findValidSnapshots(Table table) {
        ArrayList newArrayList = Lists.newArrayList();
        for (Snapshot snapshot : table.snapshots()) {
            if (snapshot.allManifests().stream().anyMatch(manifestFile -> {
                return snapshot.snapshotId() == manifestFile.snapshotId().longValue();
            })) {
                newArrayList.add(snapshot);
            }
        }
        return newArrayList;
    }

    private static StructLikeSet expectedRowSet(Table table, List<Record> list) {
        return SimpleDataUtil.expectedRowSet(table, (Record[]) list.toArray(new Record[0]));
    }

    private static StructLikeSet actualRowSet(Table table, long j) throws IOException {
        return SimpleDataUtil.actualRowSet(table, Long.valueOf(j), "*");
    }
}
