package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/CompactActionITCase.class */
public class CompactActionITCase extends ActionITCaseBase {
    private static final DataType[] FIELD_TYPES = {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()};
    private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[]{"k", "v", "hh", "dt"});

    @Timeout(60)
    @Test
    public void testBatchCompact() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        FileStoreTable createFileStoreTable = createFileStoreTable(ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
        this.snapshotManager = createFileStoreTable.snapshotManager();
        StreamWriteBuilder withCommitUser = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = withCommitUser.newWrite();
        this.commit = withCommitUser.newCommit();
        writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
        writeData(rowData(2, 100, 15, BinaryString.fromString("20221208")), rowData(2, 100, 16, BinaryString.fromString("20221208")), rowData(2, 100, 15, BinaryString.fromString("20221209")));
        Snapshot snapshot = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
        Assertions.assertEquals(2L, snapshot.id());
        Assertions.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
        new CompactAction(this.warehouse, this.database, this.tableName).withPartitions(getSpecifiedPartitions()).build(executionEnvironment);
        executionEnvironment.execute();
        Snapshot snapshot2 = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
        Assertions.assertEquals(3L, snapshot2.id());
        Assertions.assertEquals(Snapshot.CommitKind.COMPACT, snapshot2.commitKind());
        List<DataSplit> splits = createFileStoreTable.newSnapshotSplitReader().splits();
        Assertions.assertEquals(3, splits.size());
        for (DataSplit dataSplit : splits) {
            if (dataSplit.partition().getInt(1) == 15) {
                Assertions.assertEquals(1, dataSplit.files().size());
            } else {
                Assertions.assertEquals(2, dataSplit.files().size());
            }
        }
    }

    @Test
    public void testStreamingCompact() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
        hashMap.put(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
        FileStoreTable createFileStoreTable = createFileStoreTable(ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
        this.snapshotManager = createFileStoreTable.snapshotManager();
        StreamWriteBuilder withCommitUser = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = withCommitUser.newWrite();
        this.commit = withCommitUser.newCommit();
        writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
        Snapshot snapshot = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
        Assertions.assertEquals(1L, snapshot.id());
        Assertions.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
        StreamTableScan newStreamScan = createFileStoreTable.newReadBuilder().newStreamScan();
        Assertions.assertTrue(newStreamScan.plan().splits().isEmpty());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setCheckpointInterval(500L);
        executionEnvironment.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
        new CompactAction(this.warehouse, this.database, this.tableName).withPartitions(getSpecifiedPartitions()).build(executionEnvironment);
        JobClient executeAsync = executionEnvironment.executeAsync();
        validateResult(createFileStoreTable, newStreamScan, Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]"), 60000L);
        writeData(rowData(1, 101, 15, BinaryString.fromString("20221208")), rowData(1, 101, 16, BinaryString.fromString("20221208")), rowData(1, 101, 15, BinaryString.fromString("20221209")));
        validateResult(createFileStoreTable, newStreamScan, Arrays.asList("+U[1, 101, 15, 20221208]", "+U[1, 101, 15, 20221209]", "-U[1, 100, 15, 20221208]", "-U[1, 100, 15, 20221209]"), 60000L);
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(this.snapshotManager.latestSnapshotId().longValue() - 2 == this.snapshotManager.earliestSnapshotId().longValue());
        }, Duration.ofSeconds(60000L), Duration.ofSeconds(100L), String.format("Cannot validate snapshot expiration in %s milliseconds.", 60000));
        executeAsync.cancel();
    }

    private List<Map<String, String>> getSpecifiedPartitions() {
        HashMap hashMap = new HashMap();
        hashMap.put("dt", "20221208");
        hashMap.put("hh", "15");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("dt", "20221209");
        hashMap2.put("hh", "15");
        return Arrays.asList(hashMap, hashMap2);
    }

    private void validateResult(FileStoreTable fileStoreTable, StreamTableScan streamTableScan, List<String> list, long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        while (arrayList.size() != list.size()) {
            arrayList.addAll(getResult(fileStoreTable.newReadBuilder().newRead(), streamTableScan.plan().splits(), ROW_TYPE));
            if (System.currentTimeMillis() - currentTimeMillis > j) {
                break;
            }
        }
        if (arrayList.size() != list.size()) {
            throw new TimeoutException(String.format("Cannot collect %s records in %s milliseconds.", Integer.valueOf(list.size()), Long.valueOf(j)));
        }
        arrayList.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        Assertions.assertEquals(list, arrayList);
    }
}
