package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/paimon/flink/action/CompactDatabaseActionITCase.class */
public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
    private static final String[] DATABASE_NAMES = {"db1", "db2"};
    private static final String[] TABLE_NAMES = {"t1", "t2"};
    private static final String[] New_DATABASE_NAMES = {"db3", "db4"};
    private static final String[] New_TABLE_NAMES = {"t3", "t4"};
    private static final RowType ROW_TYPE = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}, new String[]{"k", "v", "hh", "dt"});

    private FileStoreTable createTable(String str, String str2, List<String> list, List<String> list2, Map<String, String> map) throws Exception {
        Identifier create = Identifier.create(str, str2);
        this.catalog.createDatabase(str, true);
        this.catalog.createTable(create, new Schema(ROW_TYPE.getFields(), list, list2, map, ""), false);
        return this.catalog.getTable(create);
    }

    @Timeout(60)
    @ValueSource(strings = {"divided", "combined"})
    @ParameterizedTest(name = "mode = {0}")
    public void testBatchCompact(String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        hashMap.put("bucket", "1");
        ArrayList<FileStoreTable> arrayList = new ArrayList();
        for (String str2 : DATABASE_NAMES) {
            for (String str3 : TABLE_NAMES) {
                FileStoreTable createTable = createTable(str2, str3, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                arrayList.add(createTable);
                SnapshotManager snapshotManager = createTable.snapshotManager();
                StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = withCommitUser.newWrite();
                this.commit = withCommitUser.newCommit();
                writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
                writeData(rowData(2, 100, 15, BinaryString.fromString("20221208")), rowData(2, 100, 16, BinaryString.fromString("20221208")), rowData(2, 100, 15, BinaryString.fromString("20221209")));
                Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat(snapshot.id()).isEqualTo(2L);
                Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
                this.write.close();
                this.commit.close();
            }
        }
        if (ThreadLocalRandom.current().nextBoolean()) {
            StreamExecutionEnvironment build = streamExecutionEnvironmentBuilder().batchMode().build();
            createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse, "--mode", str).withStreamExecutionEnvironment(build).build();
            build.execute();
        } else {
            callProcedure(String.format("CALL sys.compact_database('', '%s')", str), false, true);
        }
        for (FileStoreTable fileStoreTable : arrayList) {
            Snapshot snapshot2 = fileStoreTable.snapshotManager().snapshot(fileStoreTable.snapshotManager().latestSnapshotId().longValue());
            Assertions.assertThat(snapshot2.id()).isEqualTo(3L);
            Assertions.assertThat(snapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
            List dataSplits = fileStoreTable.newSnapshotReader().read().dataSplits();
            Assertions.assertThat(dataSplits.size()).isEqualTo(3);
            Iterator it = dataSplits.iterator();
            while (it.hasNext()) {
                Assertions.assertThat(((DataSplit) it.next()).dataFiles().size()).isEqualTo(1);
            }
        }
    }

    @ValueSource(strings = {"divided", "combined"})
    @ParameterizedTest(name = "mode = {0}")
    public void testStreamingCompact(String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
        hashMap.put(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
        hashMap.put("bucket", "1");
        ArrayList<FileStoreTable> arrayList = new ArrayList();
        for (String str2 : DATABASE_NAMES) {
            for (String str3 : TABLE_NAMES) {
                FileStoreTable createTable = createTable(str2, str3, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                arrayList.add(createTable);
                SnapshotManager snapshotManager = createTable.snapshotManager();
                StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = withCommitUser.newWrite();
                this.commit = withCommitUser.newCommit();
                writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
                Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat(snapshot.id()).isEqualTo(1L);
                Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
                Assertions.assertThat(createTable.newReadBuilder().newStreamScan().plan().splits()).isEmpty();
                this.write.close();
                this.commit.close();
            }
        }
        if (ThreadLocalRandom.current().nextBoolean()) {
            CompactDatabaseAction createAction = str.equals("divided") ? (CompactDatabaseAction) createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse) : createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse, "--mode", "combined", "--table_conf", CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
            StreamExecutionEnvironment build = streamExecutionEnvironmentBuilder().streamingMode().build();
            createAction.withStreamExecutionEnvironment(build).build();
            build.executeAsync();
        } else if (str.equals("divided")) {
            callProcedure("CALL sys.compact_database()", true, false);
        } else {
            callProcedure("CALL sys.compact_database('', 'combined', '', '', 'continuous.discovery-interval=1s')", true, false);
        }
        for (FileStoreTable fileStoreTable : arrayList) {
            StreamTableScan newStreamScan = fileStoreTable.newReadBuilder().newStreamScan();
            validateResult(fileStoreTable, ROW_TYPE, newStreamScan, Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]", "+I[1, 100, 16, 20221208]"), 60000L);
            SnapshotManager snapshotManager2 = fileStoreTable.snapshotManager();
            StreamWriteBuilder withCommitUser2 = fileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = withCommitUser2.newWrite();
            this.commit = withCommitUser2.newCommit();
            writeData(rowData(1, 101, 15, BinaryString.fromString("20221208")), rowData(1, 101, 16, BinaryString.fromString("20221208")), rowData(1, 101, 15, BinaryString.fromString("20221209")));
            validateResult(fileStoreTable, ROW_TYPE, newStreamScan, Arrays.asList("+U[1, 101, 15, 20221208]", "+U[1, 101, 15, 20221209]", "+U[1, 101, 16, 20221208]", "-U[1, 100, 15, 20221208]", "-U[1, 100, 15, 20221209]", "-U[1, 100, 16, 20221208]"), 60000L);
            CommonTestUtils.waitUtil(() -> {
                return Boolean.valueOf(snapshotManager2.latestSnapshotId().longValue() - 2 == snapshotManager2.earliestSnapshotId().longValue());
            }, Duration.ofSeconds(60L), Duration.ofMillis(100L), String.format("Cannot validate snapshot expiration in %s milliseconds.", 60000));
            this.write.close();
            this.commit.close();
        }
        if (str.equals("combined")) {
            ArrayList<FileStoreTable> arrayList2 = new ArrayList();
            for (String str4 : New_DATABASE_NAMES) {
                for (String str5 : New_TABLE_NAMES) {
                    FileStoreTable createTable2 = createTable(str4, str5, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                    arrayList2.add(createTable2);
                    SnapshotManager snapshotManager3 = createTable2.snapshotManager();
                    StreamWriteBuilder withCommitUser3 = createTable2.newStreamWriteBuilder().withCommitUser(this.commitUser);
                    this.write = withCommitUser3.newWrite();
                    this.commit = withCommitUser3.newCommit();
                    writeData(this.write, this.commit, 0L, rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
                    Snapshot snapshot2 = snapshotManager3.snapshot(snapshotManager3.latestSnapshotId().longValue());
                    Assertions.assertThat(snapshot2.id()).isEqualTo(1L);
                    Assertions.assertThat(snapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
                    this.write.close();
                    this.commit.close();
                }
            }
            for (FileStoreTable fileStoreTable2 : arrayList2) {
                StreamTableScan newStreamScan2 = fileStoreTable2.newReadBuilder().newStreamScan();
                validateResult(fileStoreTable2, ROW_TYPE, newStreamScan2, Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]", "+I[1, 100, 16, 20221208]"), 60000L);
                SnapshotManager snapshotManager4 = fileStoreTable2.snapshotManager();
                StreamWriteBuilder withCommitUser4 = fileStoreTable2.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = withCommitUser4.newWrite();
                this.commit = withCommitUser4.newCommit();
                writeData(this.write, this.commit, 1L, rowData(1, 101, 15, BinaryString.fromString("20221208")), rowData(1, 101, 16, BinaryString.fromString("20221208")), rowData(1, 101, 15, BinaryString.fromString("20221209")));
                validateResult(fileStoreTable2, ROW_TYPE, newStreamScan2, Arrays.asList("+U[1, 101, 15, 20221208]", "+U[1, 101, 15, 20221209]", "+U[1, 101, 16, 20221208]", "-U[1, 100, 15, 20221208]", "-U[1, 100, 15, 20221209]", "-U[1, 100, 16, 20221208]"), 60000L);
                CommonTestUtils.waitUtil(() -> {
                    return Boolean.valueOf(snapshotManager4.latestSnapshotId().longValue() - 2 == snapshotManager4.earliestSnapshotId().longValue());
                }, Duration.ofSeconds(60L), Duration.ofMillis(100L), String.format("Cannot validate snapshot expiration in %s milliseconds.", 60000));
                this.write.close();
                this.commit.close();
            }
        }
    }

    @Timeout(60)
    @ValueSource(strings = {"divided", "combined"})
    @ParameterizedTest(name = "mode = {0}")
    public void includeTableCompaction(String str) throws Exception {
        includingAndExcludingTablesImpl(str, "db1.t1", null, Collections.singletonList(Identifier.fromString("db1.t1")), Arrays.asList(Identifier.fromString("db1.t2"), Identifier.fromString("db2.t1"), Identifier.fromString("db2.t2")));
    }

    @Timeout(60)
    @ValueSource(strings = {"divided", "combined"})
    @ParameterizedTest(name = "mode = {0}")
    public void excludeTableCompaction(String str) throws Exception {
        includingAndExcludingTablesImpl(str, null, "db2.t2", Arrays.asList(Identifier.fromString("db1.t1"), Identifier.fromString("db1.t2"), Identifier.fromString("db2.t1")), Collections.singletonList(Identifier.fromString("db2.t2")));
    }

    @Timeout(60)
    @ValueSource(strings = {"divided", "combined"})
    @ParameterizedTest(name = "mode = {0}")
    public void includeAndExcludeTableCompaction(String str) throws Exception {
        includingAndExcludingTablesImpl(str, "db1.+|db2.t1", "db1.t2", Arrays.asList(Identifier.fromString("db1.t1"), Identifier.fromString("db2.t1")), Arrays.asList(Identifier.fromString("db1.t2"), Identifier.fromString("db2.t2")));
    }

    private void includingAndExcludingTablesImpl(String str, @Nullable String str2, @Nullable String str3, List<Identifier> list, List<Identifier> list2) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        hashMap.put("bucket", "1");
        ArrayList<FileStoreTable> arrayList = new ArrayList();
        ArrayList<FileStoreTable> arrayList2 = new ArrayList();
        for (String str4 : DATABASE_NAMES) {
            for (String str5 : TABLE_NAMES) {
                FileStoreTable createTable = createTable(str4, str5, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                if (list.contains(Identifier.create(str4, str5))) {
                    arrayList.add(createTable);
                } else if (list2.contains(Identifier.create(str4, str5))) {
                    arrayList2.add(createTable);
                }
                SnapshotManager snapshotManager = createTable.snapshotManager();
                StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = withCommitUser.newWrite();
                this.commit = withCommitUser.newCommit();
                writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
                writeData(rowData(2, 100, 15, BinaryString.fromString("20221208")), rowData(2, 100, 16, BinaryString.fromString("20221208")), rowData(2, 100, 15, BinaryString.fromString("20221209")));
                Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat(snapshot.id()).isEqualTo(2L);
                Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
                this.write.close();
                this.commit.close();
            }
        }
        if (ThreadLocalRandom.current().nextBoolean()) {
            ArrayList arrayList3 = new ArrayList();
            arrayList3.add("compact_database");
            arrayList3.add("--warehouse");
            arrayList3.add(this.warehouse);
            if (str2 != null) {
                arrayList3.add("--including_tables");
                arrayList3.add(str2);
            }
            if (str3 != null) {
                arrayList3.add("--excluding_tables");
                arrayList3.add(str3);
            }
            arrayList3.add("--mode");
            arrayList3.add(str);
            if (str.equals("combined")) {
                arrayList3.add("--table_conf");
                arrayList3.add(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
            }
            StreamExecutionEnvironment build = streamExecutionEnvironmentBuilder().batchMode().build();
            createAction(CompactDatabaseAction.class, arrayList3).withStreamExecutionEnvironment(build).build();
            build.execute();
        } else if (str.equals("divided")) {
            callProcedure(String.format("CALL sys.compact_database('', 'divided', '%s', '%s')", nonNull(str2), nonNull(str3)), false, true);
        } else {
            callProcedure(String.format("CALL sys.compact_database('', 'combined', '%s', '%s', 'continuous.discovery-interval=1s')", nonNull(str2), nonNull(str3)), false, true);
        }
        for (FileStoreTable fileStoreTable : arrayList) {
            Snapshot snapshot2 = fileStoreTable.snapshotManager().snapshot(fileStoreTable.snapshotManager().latestSnapshotId().longValue());
            Assertions.assertThat(snapshot2.id()).isEqualTo(3L);
            Assertions.assertThat(snapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
            List dataSplits = fileStoreTable.newSnapshotReader().read().dataSplits();
            Assertions.assertThat(dataSplits.size()).isEqualTo(3);
            Iterator it = dataSplits.iterator();
            while (it.hasNext()) {
                Assertions.assertThat(((DataSplit) it.next()).dataFiles().size()).isEqualTo(1);
            }
        }
        for (FileStoreTable fileStoreTable2 : arrayList2) {
            Snapshot snapshot3 = fileStoreTable2.snapshotManager().snapshot(fileStoreTable2.snapshotManager().latestSnapshotId().longValue());
            Assertions.assertThat(snapshot3.id()).isEqualTo(2L);
            Assertions.assertThat(snapshot3.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
            List dataSplits2 = fileStoreTable2.newSnapshotReader().read().dataSplits();
            Assertions.assertThat(dataSplits2.size()).isEqualTo(3);
            Iterator it2 = dataSplits2.iterator();
            while (it2.hasNext()) {
                Assertions.assertThat(((DataSplit) it2.next()).dataFiles().size()).isEqualTo(2);
            }
        }
    }

    private String nonNull(@Nullable String str) {
        return str == null ? "" : str;
    }

    @Test
    public void testUnawareBucketStreamingCompact() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.BUCKET.key(), "-1");
        hashMap.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        hashMap.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
        ArrayList<FileStoreTable> arrayList = new ArrayList();
        for (String str : TABLE_NAMES) {
            FileStoreTable createTable = createTable(this.database, str, Collections.singletonList("k"), Collections.emptyList(), hashMap);
            arrayList.add(createTable);
            SnapshotManager snapshotManager = createTable.snapshotManager();
            StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = withCommitUser.newWrite();
            this.commit = withCommitUser.newCommit();
            writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
            writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
            Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat(snapshot.id()).isEqualTo(2L);
            Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
            this.write.close();
            this.commit.close();
        }
        if (ThreadLocalRandom.current().nextBoolean()) {
            StreamExecutionEnvironment build = streamExecutionEnvironmentBuilder().streamingMode().build();
            createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse).withStreamExecutionEnvironment(build).build();
            build.executeAsync();
        } else {
            callProcedure("CALL sys.compact_database()");
        }
        for (FileStoreTable fileStoreTable : arrayList) {
            StreamWriteBuilder withCommitUser2 = fileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = withCommitUser2.newWrite();
            this.commit = withCommitUser2.newCommit();
            checkFileAndRowSize(fileStoreTable, 3L, 30000L, 1, 6L);
            writeData(rowData(1, 101, 15, BinaryString.fromString("20221208")), rowData(1, 101, 16, BinaryString.fromString("20221208")), rowData(1, 101, 15, BinaryString.fromString("20221209")));
            checkFileAndRowSize(fileStoreTable, 5L, 30000L, 1, 9L);
            this.write.close();
            this.commit.close();
        }
    }

    @Test
    public void testUnawareBucketBatchCompact() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.BUCKET.key(), "-1");
        hashMap.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        hashMap.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
        ArrayList arrayList = new ArrayList();
        for (String str : TABLE_NAMES) {
            FileStoreTable createTable = createTable(this.database, str, Collections.singletonList("k"), Collections.emptyList(), hashMap);
            arrayList.add(createTable);
            SnapshotManager snapshotManager = createTable.snapshotManager();
            StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = withCommitUser.newWrite();
            this.commit = withCommitUser.newCommit();
            writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
            writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
            Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat(snapshot.id()).isEqualTo(2L);
            Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
            this.write.close();
            this.commit.close();
        }
        if (ThreadLocalRandom.current().nextBoolean()) {
            StreamExecutionEnvironment build = streamExecutionEnvironmentBuilder().batchMode().build();
            createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse).withStreamExecutionEnvironment(build).build();
            build.execute();
        } else {
            callProcedure("CALL sys.compact_database()", false, true);
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            checkFileAndRowSize((FileStoreTable) it.next(), 3L, 0L, 1, 6L);
        }
    }

    @Test
    public void testCombinedModeWithDynamicOptions() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        hashMap.put(CoreOptions.BUCKET.key(), "1");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1000");
        FileStoreTable createTable = createTable("test_db", "t", Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
        StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = withCommitUser.newWrite();
        this.commit = withCommitUser.newCommit();
        for (int i = 0; i < 10; i++) {
            writeData(rowData(1, Integer.valueOf(i), 15, BinaryString.fromString("20221208")));
        }
        SnapshotManager snapshotManager = createTable.snapshotManager();
        Assertions.assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10L);
        CompactDatabaseAction createAction = createAction((Class<CompactDatabaseAction>) CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse, "--mode", "combined", "--table_conf", CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s", "--table_conf", CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key() + "=3", "--table_conf", CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key() + "=3");
        StreamExecutionEnvironment build = streamExecutionEnvironmentBuilder().streamingMode().build();
        createAction.withStreamExecutionEnvironment(build).build();
        JobClient executeAsync = build.executeAsync();
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(snapshotManager.latestSnapshotId().longValue() == 11);
        }, Duration.ofSeconds(60L), Duration.ofMillis(500L));
        executeAsync.cancel();
        Assertions.assertThat(snapshotManager.latestSnapshot().commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(snapshotManager.earliestSnapshotId().longValue() == 9);
        }, Duration.ofSeconds(60L), Duration.ofMillis(200L), "Failed to wait snapshot expiration success");
    }

    private void writeData(StreamTableWrite streamTableWrite, StreamTableCommit streamTableCommit, long j, GenericRow... genericRowArr) throws Exception {
        for (GenericRow genericRow : genericRowArr) {
            streamTableWrite.write(genericRow);
        }
        streamTableCommit.commit(j, streamTableWrite.prepareCommit(true, j));
    }
}
