package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/CompactDatabaseActionITCase.class */
public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
    private static final String[] DATABASE_NAMES = {"db1", "db2"};
    private static final String[] TABLE_NAMES = {"t1", "t2"};
    private static final Map<String, RowType> ROW_TYPE_MAP = new HashMap(TABLE_NAMES.length);

    @BeforeAll
    public static void beforeAll() {
        DataType[] dataTypeArr = {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()};
        DataType[] dataTypeArr2 = {DataTypes.INT(), DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()};
        ROW_TYPE_MAP.put("t1", RowType.of(dataTypeArr, new String[]{"k", "v", "hh", "dt"}));
        ROW_TYPE_MAP.put("t2", RowType.of(dataTypeArr2, new String[]{"k", "v1", "hh", "dt"}));
    }

    private FileStoreTable createTable(String str, String str2, RowType rowType, List<String> list, List<String> list2, Map<String, String> map) throws Exception {
        Catalog catalog = catalog();
        Identifier create = Identifier.create(str, str2);
        catalog.createDatabase(str, true);
        catalog.createTable(create, new Schema(rowType.getFields(), list, list2, map, ""), false);
        return catalog.getTable(create);
    }

    @Timeout(60)
    @Test
    public void testBatchCompact() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        ArrayList<FileStoreTable> arrayList = new ArrayList();
        for (String str : DATABASE_NAMES) {
            for (String str2 : TABLE_NAMES) {
                FileStoreTable createTable = createTable(str, str2, ROW_TYPE_MAP.get(str2), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                arrayList.add(createTable);
                this.snapshotManager = createTable.snapshotManager();
                StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = withCommitUser.newWrite();
                this.commit = withCommitUser.newCommit();
                Object obj = null;
                if (str2.equals("t1")) {
                    obj = 100;
                } else if (str2.equals("t2")) {
                    obj = 100L;
                }
                writeData(rowData(1, obj, 15, BinaryString.fromString("20221208")), rowData(1, obj, 16, BinaryString.fromString("20221208")), rowData(1, obj, 15, BinaryString.fromString("20221209")));
                writeData(rowData(2, obj, 15, BinaryString.fromString("20221208")), rowData(2, obj, 16, BinaryString.fromString("20221208")), rowData(2, obj, 15, BinaryString.fromString("20221209")));
                Snapshot snapshot = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat(snapshot.id()).isEqualTo(2L);
                Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
            }
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
        new CompactDatabaseAction(this.warehouse, "db1|db2", (String) null, (String) null, new HashMap()).build(executionEnvironment);
        executionEnvironment.execute();
        for (FileStoreTable fileStoreTable : arrayList) {
            Snapshot snapshot2 = fileStoreTable.snapshotManager().snapshot(this.snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat(snapshot2.id()).isEqualTo(3L);
            Assertions.assertThat(snapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
            List dataSplits = fileStoreTable.newSnapshotReader().read().dataSplits();
            Assertions.assertThat(dataSplits.size()).isEqualTo(3);
            Iterator it = dataSplits.iterator();
            while (it.hasNext()) {
                Assertions.assertThat(((DataSplit) it.next()).dataFiles().size()).isEqualTo(1);
            }
        }
    }

    @Timeout(60)
    @Test
    public void includeTableCompaction() throws Exception {
        includingAndExcludingTablesImpl("db1.t1", null, Collections.singletonList(Identifier.fromString("db1.t1")), Arrays.asList(Identifier.fromString("db1.t2"), Identifier.fromString("db2.t1"), Identifier.fromString("db2.t2")));
    }

    @Timeout(60)
    @Test
    public void excludeTableCompaction() throws Exception {
        includingAndExcludingTablesImpl(null, "db2.t2", Arrays.asList(Identifier.fromString("db1.t1"), Identifier.fromString("db1.t2"), Identifier.fromString("db2.t1")), Collections.singletonList(Identifier.fromString("db2.t2")));
    }

    @Timeout(60)
    @Test
    public void includeAndExcludeTableCompaction() throws Exception {
        includingAndExcludingTablesImpl("db1.+|db2.t1", "db1.t2", Arrays.asList(Identifier.fromString("db1.t1"), Identifier.fromString("db2.t1")), Arrays.asList(Identifier.fromString("db1.t2"), Identifier.fromString("db2.t2")));
    }

    private void includingAndExcludingTablesImpl(String str, String str2, List<Identifier> list, List<Identifier> list2) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        ArrayList<FileStoreTable> arrayList = new ArrayList();
        ArrayList<FileStoreTable> arrayList2 = new ArrayList();
        for (String str3 : DATABASE_NAMES) {
            for (String str4 : TABLE_NAMES) {
                FileStoreTable createTable = createTable(str3, str4, ROW_TYPE_MAP.get(str4), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                if (list.contains(Identifier.create(str3, str4))) {
                    arrayList.add(createTable);
                } else if (list2.contains(Identifier.create(str3, str4))) {
                    arrayList2.add(createTable);
                }
                this.snapshotManager = createTable.snapshotManager();
                StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = withCommitUser.newWrite();
                this.commit = withCommitUser.newCommit();
                Object obj = null;
                if (str4.equals("t1")) {
                    obj = 100;
                } else if (str4.equals("t2")) {
                    obj = 100L;
                }
                writeData(rowData(1, obj, 15, BinaryString.fromString("20221208")), rowData(1, obj, 16, BinaryString.fromString("20221208")), rowData(1, obj, 15, BinaryString.fromString("20221209")));
                writeData(rowData(2, obj, 15, BinaryString.fromString("20221208")), rowData(2, obj, 16, BinaryString.fromString("20221208")), rowData(2, obj, 15, BinaryString.fromString("20221209")));
                Snapshot snapshot = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat(snapshot.id()).isEqualTo(2L);
                Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
            }
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
        new CompactDatabaseAction(this.warehouse, "db1|db2", str, str2, new HashMap()).build(executionEnvironment);
        executionEnvironment.execute();
        for (FileStoreTable fileStoreTable : arrayList) {
            this.snapshotManager = fileStoreTable.snapshotManager();
            Snapshot snapshot2 = fileStoreTable.snapshotManager().snapshot(this.snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat(snapshot2.id()).isEqualTo(3L);
            Assertions.assertThat(snapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
            List dataSplits = fileStoreTable.newSnapshotReader().read().dataSplits();
            Assertions.assertThat(dataSplits.size()).isEqualTo(3);
            Iterator it = dataSplits.iterator();
            while (it.hasNext()) {
                Assertions.assertThat(((DataSplit) it.next()).dataFiles().size()).isEqualTo(1);
            }
        }
        for (FileStoreTable fileStoreTable2 : arrayList2) {
            this.snapshotManager = fileStoreTable2.snapshotManager();
            Snapshot snapshot3 = fileStoreTable2.snapshotManager().snapshot(this.snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat(snapshot3.id()).isEqualTo(2L);
            Assertions.assertThat(snapshot3.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
            List dataSplits2 = fileStoreTable2.newSnapshotReader().read().dataSplits();
            Assertions.assertThat(dataSplits2.size()).isEqualTo(3);
            Iterator it2 = dataSplits2.iterator();
            while (it2.hasNext()) {
                Assertions.assertThat(((DataSplit) it2.next()).dataFiles().size()).isEqualTo(2);
            }
        }
    }

    @Test
    public void testStreamingCompact() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
        hashMap.put(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
        ArrayList<FileStoreTable> arrayList = new ArrayList();
        for (String str : DATABASE_NAMES) {
            for (String str2 : TABLE_NAMES) {
                FileStoreTable createTable = createTable(str, str2, ROW_TYPE_MAP.get(str2), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                arrayList.add(createTable);
                this.snapshotManager = createTable.snapshotManager();
                StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = withCommitUser.newWrite();
                this.commit = withCommitUser.newCommit();
                Object obj = null;
                if (str2.equals("t1")) {
                    obj = 100;
                } else if (str2.equals("t2")) {
                    obj = 100L;
                }
                writeData(rowData(1, obj, 15, BinaryString.fromString("20221208")), rowData(1, obj, 16, BinaryString.fromString("20221208")), rowData(1, obj, 15, BinaryString.fromString("20221209")));
                Snapshot snapshot = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat(snapshot.id()).isEqualTo(1L);
                Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
                Assertions.assertThat(createTable.newReadBuilder().newStreamScan().plan().splits()).isEmpty();
            }
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setCheckpointInterval(500L);
        executionEnvironment.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
        new CompactDatabaseAction(this.warehouse, "db1|db2", (String) null, (String) null, new HashMap()).build(executionEnvironment);
        JobClient executeAsync = executionEnvironment.executeAsync();
        for (FileStoreTable fileStoreTable : arrayList) {
            StreamTableScan newStreamScan = fileStoreTable.newReadBuilder().newStreamScan();
            validateResult(fileStoreTable, ROW_TYPE_MAP.get(fileStoreTable.name()), newStreamScan, Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]", "+I[1, 100, 16, 20221208]"), 60000L);
            Object obj2 = null;
            String name = fileStoreTable.name();
            if (name.equals("t1")) {
                obj2 = 101;
            } else if (name.equals("t2")) {
                obj2 = 101L;
            }
            this.snapshotManager = fileStoreTable.snapshotManager();
            StreamWriteBuilder withCommitUser2 = fileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = withCommitUser2.newWrite();
            this.commit = withCommitUser2.newCommit();
            writeData(rowData(1, obj2, 15, BinaryString.fromString("20221208")), rowData(1, obj2, 16, BinaryString.fromString("20221208")), rowData(1, obj2, 15, BinaryString.fromString("20221209")));
            validateResult(fileStoreTable, ROW_TYPE_MAP.get(fileStoreTable.name()), newStreamScan, Arrays.asList("+U[1, 101, 15, 20221208]", "+U[1, 101, 15, 20221209]", "+U[1, 101, 16, 20221208]", "-U[1, 100, 15, 20221208]", "-U[1, 100, 15, 20221209]", "-U[1, 100, 16, 20221208]"), 60000L);
            CommonTestUtils.waitUtil(() -> {
                return Boolean.valueOf(this.snapshotManager.latestSnapshotId().longValue() - 2 == this.snapshotManager.earliestSnapshotId().longValue());
            }, Duration.ofSeconds(60000L), Duration.ofSeconds(100L), String.format("Cannot validate snapshot expiration in %s milliseconds.", 60000));
        }
        executeAsync.cancel();
    }

    @Test
    public void testUnawareBucketStreamingCompact() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.BUCKET.key(), "-1");
        hashMap.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        hashMap.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
        ArrayList<FileStoreTable> arrayList = new ArrayList();
        for (String str : TABLE_NAMES) {
            FileStoreTable createTable = createTable(this.database, str, ROW_TYPE_MAP.get(str), Arrays.asList("k"), Collections.emptyList(), hashMap);
            arrayList.add(createTable);
            this.snapshotManager = createTable.snapshotManager();
            StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = withCommitUser.newWrite();
            this.commit = withCommitUser.newCommit();
            Object obj = null;
            if (str.equals("t1")) {
                obj = 100;
            } else if (str.equals("t2")) {
                obj = 100L;
            }
            writeData(rowData(1, obj, 15, BinaryString.fromString("20221208")), rowData(1, obj, 16, BinaryString.fromString("20221208")), rowData(1, obj, 15, BinaryString.fromString("20221209")));
            writeData(rowData(1, obj, 15, BinaryString.fromString("20221208")), rowData(1, obj, 16, BinaryString.fromString("20221208")), rowData(1, obj, 15, BinaryString.fromString("20221209")));
            Snapshot snapshot = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat(snapshot.id()).isEqualTo(2L);
            Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setCheckpointInterval(500L);
        executionEnvironment.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
        new CompactDatabaseAction(this.warehouse, this.database, (String) null, (String) null, new HashMap()).build(executionEnvironment);
        JobClient executeAsync = executionEnvironment.executeAsync();
        for (FileStoreTable fileStoreTable : arrayList) {
            FileStoreScan newScan = fileStoreTable.store().newScan();
            this.snapshotManager = fileStoreTable.snapshotManager();
            StreamWriteBuilder withCommitUser2 = fileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = withCommitUser2.newWrite();
            this.commit = withCommitUser2.newCommit();
            checkFileAndRowSize(newScan, 3L, 30000L, 1, 6L);
            Object obj2 = null;
            String name = fileStoreTable.name();
            if (name.equals("t1")) {
                obj2 = 101;
            } else if (name.equals("t2")) {
                obj2 = 101L;
            }
            writeData(rowData(1, obj2, 15, BinaryString.fromString("20221208")), rowData(1, obj2, 16, BinaryString.fromString("20221208")), rowData(1, obj2, 15, BinaryString.fromString("20221209")));
            checkFileAndRowSize(newScan, 5L, 30000L, 1, 9L);
        }
        executeAsync.cancel().get();
    }

    @Test
    public void testUnawareBucketBatchCompact() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.BUCKET.key(), "-1");
        hashMap.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        hashMap.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
        ArrayList<FileStoreTable> arrayList = new ArrayList();
        for (String str : TABLE_NAMES) {
            FileStoreTable createTable = createTable(this.database, str, ROW_TYPE_MAP.get(str), Collections.singletonList("k"), Collections.emptyList(), hashMap);
            arrayList.add(createTable);
            this.snapshotManager = createTable.snapshotManager();
            StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = withCommitUser.newWrite();
            this.commit = withCommitUser.newCommit();
            Object obj = null;
            if (str.equals("t1")) {
                obj = 100;
            } else if (str.equals("t2")) {
                obj = 100L;
            }
            writeData(rowData(1, obj, 15, BinaryString.fromString("20221208")), rowData(1, obj, 16, BinaryString.fromString("20221208")), rowData(1, obj, 15, BinaryString.fromString("20221209")));
            writeData(rowData(1, obj, 15, BinaryString.fromString("20221208")), rowData(1, obj, 16, BinaryString.fromString("20221208")), rowData(1, obj, 15, BinaryString.fromString("20221209")));
            Snapshot snapshot = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat(snapshot.id()).isEqualTo(2L);
            Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
        new CompactDatabaseAction(this.warehouse, this.database, (String) null, (String) null, new HashMap()).build(executionEnvironment);
        executionEnvironment.execute();
        for (FileStoreTable fileStoreTable : arrayList) {
            FileStoreScan newScan = fileStoreTable.store().newScan();
            this.snapshotManager = fileStoreTable.snapshotManager();
            checkFileAndRowSize(newScan, 3L, 0L, 1, 6L);
        }
    }
}
