package org.apache.paimon.flink;

import java.util.Arrays;
import java.util.List;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.options.Options;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/ForceCompactionITCase.class */
public class ForceCompactionITCase extends CatalogITCaseBase {
    private final FileFormat avro = FileFormat.fromIdentifier("avro", new Options());

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T (\n  f0 INT\n,   f1 STRING\n,   f2 STRING\n) PARTITIONED BY (f1) WITH (\n'write-mode' = 'change-log')", "CREATE TABLE IF NOT EXISTS T1 (\n  f0 INT\n,   f1 STRING\n,   f2 STRING\n) WITH (\n'write-mode' = 'change-log')", "CREATE TABLE IF NOT EXISTS T2 (\n  f0 INT\n,   f1 STRING\n,   f2 STRING\n) WITH (\n'write-mode' = 'append-only')");
    }

    @Test
    public void testDynamicPartition() {
        batchSql("ALTER TABLE T SET ('num-levels' = '3')", new Object[0]);
        assertAppend("INSERT INTO T VALUES(1, 'Winter', 'Winter is Coming'),(2, 'Winter', 'The First Snowflake'), (2, 'Spring', 'The First Rose in Spring'), (7, 'Summer', 'Summertime Sadness')", "T", 1L);
        assertAppend("INSERT INTO T VALUES(12, 'Winter', 'Last Christmas')", "T", 2L);
        assertAppend("INSERT INTO T VALUES(11, 'Winter', 'Winter is Coming'), (4, 'Spring', 'April')", "T", 3L);
        assertAppend("INSERT INTO T VALUES(10, 'Autumn', 'Refrain')", "T", 4L);
        assertAppend("INSERT INTO T VALUES(6, 'Summer', 'Watermelon Sugar'), (4, 'Spring', 'Spring Water')", "T", 5L);
        assertAppend("INSERT INTO T VALUES(66, 'Summer', 'Summer Vibe'), (9, 'Autumn', 'Wake Me Up When September Ends')", "T", 6L);
        assertAppend("INSERT INTO T VALUES(666, 'Summer', 'Summer Vibe'), (9, 'Autumn', 'Wake Me Up When September Ends')", "T", 7L);
        assertCompact("INSERT INTO T VALUES(6666, 'Summer', 'Summer Vibe'), (9, 'Autumn', 'Wake Me Up When September Ends')", "T", 9L, "Summer", "Summer");
        assertCompact("INSERT INTO T VALUES(66666, 'Summer', 'Summer Vibe'), (9, 'Autumn', 'Wake Me Up When September Ends')", "T", 11L, "Autumn", "Autumn");
        assertAppend("INSERT INTO T VALUES(1, 'Winter', 'Cold Water'), (4, 'Spring', 'SpringBoot')", "T", 12L);
        assertCompact("INSERT INTO T VALUES(1, 'Winter', 'Winter is Coming'), (4, 'Spring', 'The First Rose in Spring')", "T", 14L, "Spring", "Winter");
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).hasSize(22);
    }

    @Test
    public void testNoDefaultNumOfLevels() {
        assertAppend("INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming'),(2, 'Winter', 'The First Snowflake'), (2, 'Spring', 'The First Rose in Spring'), (7, 'Summer', 'Summertime Sadness')", "T1", 1L);
        assertAppend("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')", "T1", 2L);
        assertAppend("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')", "T1", 3L);
        assertAppend("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')", "T1", 4L);
        assertCompact("INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), (4, 'Spring', 'Spring Water')", "T1", 6L, new String[0]);
        assertAppend("INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), (9, 'Autumn', 'Wake Me Up When September Ends')", "T1", 7L);
        assertAppend("INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), (9, 'Autumn', 'Wake Me Up When September Ends')", "T1", 8L);
        batchSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' = '2')", new Object[0]);
        assertCompact("INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), (9, 'Autumn', 'Wake Me Up When September Ends')", "T1", 10L, new String[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T1", new Object[0])).hasSize(15);
    }

    @Test
    public void testForceCompact() {
        batchSql("ALTER TABLE T1 SET ('commit.force-compact' = 'false')", new Object[0]);
        batchSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' = '2')", new Object[0]);
        assertAppend("INSERT INTO T1 VALUES (1, 'Winter', 'Winter is Coming')", "T1", 1L);
        assertAppend("INSERT INTO T1 VALUES (2, 'Spring', 'Spring Water')", "T1", 2L);
        assertCompact("INSERT INTO T1 VALUES (3, 'Summer', 'Summer Vibe')", "T1", 4L, new String[0]);
        batchSql("ALTER TABLE T2 SET ('commit.force-compact' = 'false')", new Object[0]);
        batchSql("ALTER TABLE T2 SET ('compaction.early-max.file-num' = '2')", new Object[0]);
        assertAppend("INSERT INTO T2 VALUES (1, 'Winter', 'Winter is Coming')", "T2", 1L);
        assertCompact("INSERT INTO T2 VALUES (2, 'Spring', 'Spring Water')", "T2", 3L, new String[0]);
    }

    private void assertAppend(String str, String str2, long j) {
        batchSql(str, new Object[0]);
        Snapshot findLatestSnapshot = findLatestSnapshot(str2);
        Assertions.assertThat(findLatestSnapshot.id()).isEqualTo(j);
        Assertions.assertThat(findLatestSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
    }

    private void assertCompact(String str, String str2, long j, String... strArr) {
        batchSql(str, new Object[0]);
        Snapshot findLatestSnapshot = findLatestSnapshot(str2);
        Assertions.assertThat(findLatestSnapshot.id()).isEqualTo(j);
        Assertions.assertThat(findLatestSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
        List read = new ManifestList.Factory(LocalFileIO.create(), this.avro, new FileStorePathFactory(getTableDirectory(str2), strArr.length > 0 ? RowType.of(new DataType[]{new VarCharType()}, new String[]{"f1"}) : RowType.of(new DataType[0]), FlinkTestBase.CURRENT_DATABASE, ((CoreOptions.FileFormatType) CoreOptions.FILE_FORMAT.defaultValue()).toString()), (SegmentsCache) null).create().read(findLatestSnapshot.deltaManifestList());
        Assertions.assertThat(((ManifestFileMeta) read.get(0)).numDeletedFiles()).isGreaterThanOrEqualTo(1L);
        BinaryTableStats partitionStats = ((ManifestFileMeta) read.get(0)).partitionStats();
        if (strArr.length > 0) {
            Assertions.assertThat(partitionStats.min().getString(0).toString()).isEqualTo(strArr[0]);
            Assertions.assertThat(partitionStats.max().getString(0).toString()).isEqualTo(strArr[1]);
        }
    }
}
