package org.apache.paimon.flink.procedure;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/procedure/CompactProcedureITCase.class */
public class CompactProcedureITCase extends CatalogITCaseBase {
    @Test
    public void testBatchCompact() throws Exception {
        sql("CREATE TABLE T ( k INT, v INT, hh INT, dt STRING, PRIMARY KEY (k, dt, hh) NOT ENFORCED) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true', 'bucket' = '1')", new Object[0]);
        FileStoreTable paimonTable = paimonTable("T");
        sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208'), (1, 100, 15, '20221209')", new Object[0]);
        sql("INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208'), (2, 100, 15, '20221209')", new Object[0]);
        checkLatestSnapshot(paimonTable, 2L, Snapshot.CommitKind.APPEND);
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
        sql("CALL sys.compact(`table` => 'default.T', partitions => 'dt=20221208,hh=15;dt=20221209,hh=15')", new Object[0]);
        checkLatestSnapshot(paimonTable, 3L, Snapshot.CommitKind.COMPACT);
        List<DataSplit> dataSplits = paimonTable.newSnapshotReader().read().dataSplits();
        Assertions.assertThat(dataSplits.size()).isEqualTo(3);
        for (DataSplit dataSplit : dataSplits) {
            if (dataSplit.partition().getInt(1) == 15) {
                Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
            } else {
                Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(2);
            }
        }
    }

    @Test
    public void testStreamingCompact() throws Exception {
        sql("CREATE TABLE T ( k INT, v INT, hh INT, dt STRING, PRIMARY KEY (k, dt, hh) NOT ENFORCED) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true', 'bucket' = '1', 'changelog-producer' = 'full-compaction', 'full-compaction.delta-commits' = '1', 'continuous.discovery-interval' = '1s')", new Object[0]);
        FileStoreTable paimonTable = paimonTable("T");
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM T", new Object[0]);
        sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208'), (1, 100, 15, '20221209')", new Object[0]);
        checkLatestSnapshot(paimonTable, 1L, Snapshot.CommitKind.APPEND);
        Assertions.assertThat(paimonTable.newReadBuilder().newStreamScan().plan().splits()).isEmpty();
        streamSqlIter("CALL sys.compact(`table` => 'default.T', partitions => 'dt=20221208,hh=15;dt=20221209,hh=15', options => 'scan.parallelism=1')", new Object[0]).close();
        Assertions.assertThat(streamSqlBlockIter.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 100, 15, "20221208"}), Row.of(new Object[]{1, 100, 15, "20221209"})});
        sql("INSERT INTO T VALUES (1, 101, 15, '20221208'), (1, 101, 16, '20221208'), (1, 101, 15, '20221209')", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, 100, 15, "20221208"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{1, 101, 15, "20221208"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, 100, 15, "20221209"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{1, 101, 15, "20221209"})});
        streamSqlBlockIter.close();
    }

    @Test
    public void testDynamicBucketSortCompact() throws Exception {
        sql("CREATE TABLE T ( f0 BIGINT PRIMARY KEY NOT ENFORCED, f1 BIGINT, f2 BIGINT, f3 BIGINT, f4 STRING) WITH ( 'write-only' = 'true', 'dynamic-bucket.target-row-num' = '100', 'zorder.var-length-contribution' = '14')", new Object[0]);
        FileStoreTable paimonTable = paimonTable("T");
        ThreadLocalRandom current = ThreadLocalRandom.current();
        for (int i = 0; i < 20; i++) {
            sql("INSERT INTO T VALUES %s", (String) IntStream.range(0, 100).mapToObj(i2 -> {
                return String.format("(%s, %s, %s, %s, '%s')", Long.valueOf(current.nextLong()), Long.valueOf(current.nextLong()), Long.valueOf(current.nextLong()), Long.valueOf(current.nextLong()), StringUtils.randomNumericString(14));
            }).collect(Collectors.joining(",")));
        }
        checkLatestSnapshot(paimonTable, 20L, Snapshot.CommitKind.APPEND);
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
        sql("CALL sys.compact(`table` => 'default.T', order_strategy => 'zorder', order_by => 'f2,f1')", new Object[0]);
        checkLatestSnapshot(paimonTable, 21L, Snapshot.CommitKind.OVERWRITE);
    }

    private void checkLatestSnapshot(FileStoreTable fileStoreTable, long j, Snapshot.CommitKind commitKind) {
        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat(snapshot.id()).isEqualTo(j);
        Assertions.assertThat(snapshot.commitKind()).isEqualTo(commitKind);
    }
}
