package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/sink/CompactorSinkITCase.class */
public class CompactorSinkITCase extends AbstractTestBase {
    private static final RowType ROW_TYPE = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}, new String[]{"k", "v", "hh", "dt"});
    private Path tablePath;
    private String commitUser;

    @BeforeEach
    public void before() throws IOException {
        this.tablePath = new Path(getTempDirPath());
        this.commitUser = UUID.randomUUID().toString();
    }

    @Test
    public void testCompact() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        SnapshotManager snapshotManager = createFileStoreTable.snapshotManager();
        StreamWriteBuilder withCommitUser = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite newWrite = withCommitUser.newWrite();
        StreamTableCommit newCommit = withCommitUser.newCommit();
        newWrite.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
        newWrite.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
        newWrite.write(rowData(1, 100, 15, BinaryString.fromString("20221209")));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(2, 200, 15, BinaryString.fromString("20221208")));
        newWrite.write(rowData(2, 200, 16, BinaryString.fromString("20221208")));
        newWrite.write(rowData(2, 200, 15, BinaryString.fromString("20221209")));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat(snapshot.id()).isEqualTo(2L);
        Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
        newWrite.close();
        newCommit.close();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        new CompactorSinkBuilder(createFileStoreTable).withInput(new CompactorSourceBuilder(this.tablePath.toString(), createFileStoreTable).withEnv(executionEnvironment).withContinuousMode(false).withPartitions(getSpecifiedPartitions()).build()).build();
        executionEnvironment.execute();
        Snapshot snapshot2 = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat(snapshot2.id()).isEqualTo(3L);
        Assertions.assertThat(snapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
        TableScan.Plan plan = createFileStoreTable.newReadBuilder().newScan().plan();
        Assertions.assertThat(plan.splits().size()).isEqualTo(3);
        for (DataSplit dataSplit : plan.splits()) {
            if (dataSplit.partition().getInt(1) == 15) {
                Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
            } else {
                Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(2);
            }
        }
    }

    private List<Map<String, String>> getSpecifiedPartitions() {
        HashMap hashMap = new HashMap();
        hashMap.put("dt", "20221208");
        hashMap.put("hh", "15");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("dt", "20221209");
        hashMap2.put("hh", "15");
        return Arrays.asList(hashMap, hashMap2);
    }

    private GenericRow rowData(Object... objArr) {
        return GenericRow.of(objArr);
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        return FileStoreTableFactory.create(LocalFileIO.create(), this.tablePath, new SchemaManager(LocalFileIO.create(), this.tablePath).createTable(new Schema(ROW_TYPE.getFields(), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyMap(), "")));
    }
}
