package org.apache.paimon.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerializationUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/paimon/flink/source/CompactorSourceITCase.class */
public class CompactorSourceITCase extends AbstractTestBase {
    private static final RowType ROW_TYPE = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()}, new String[]{"k", "v", "dt", "hh"});
    private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
    private Path tablePath;
    private String commitUser;

    @BeforeEach
    public void before() throws IOException {
        this.tablePath = new Path(getTempDirPath());
        this.commitUser = UUID.randomUUID().toString();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "defaultOptions = {0}")
    public void testBatchRead(boolean z) throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        if (!z) {
            createFileStoreTable = createFileStoreTable.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2"));
        }
        StreamWriteBuilder withCommitUser = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite newWrite = withCommitUser.newWrite();
        StreamTableCommit newCommit = withCommitUser.newCommit();
        newWrite.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1620, BinaryString.fromString("20221208"), 16));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 1511, BinaryString.fromString("20221208"), 15));
        newWrite.write(rowData(1, 1510, BinaryString.fromString("20221209"), 15));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        CloseableIterator executeAndCollect = new CompactorSourceBuilder("test", createFileStoreTable).withContinuousMode(false).withEnv(streamExecutionEnvironmentBuilder().streamingMode().build()).build().executeAndCollect();
        ArrayList arrayList = new ArrayList();
        while (executeAndCollect.hasNext()) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(Arrays.asList("+I 2|20221208|15|0|0", "+I 2|20221208|16|0|0", "+I 2|20221209|15|0|0"));
        newWrite.close();
        newCommit.close();
        executeAndCollect.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "defaultOptions = {0}")
    public void testStreamingRead(boolean z) throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        if (!z) {
            HashMap hashMap = new HashMap();
            hashMap.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
            hashMap.put(CoreOptions.CHANGELOG_PRODUCER.key(), CoreOptions.ChangelogProducer.NONE.toString());
            hashMap.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "0");
            createFileStoreTable = createFileStoreTable.copy(hashMap);
        }
        StreamWriteBuilder withCommitUser = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite newWrite = withCommitUser.newWrite();
        StreamTableCommit newCommit = withCommitUser.newCommit();
        newWrite.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1620, BinaryString.fromString("20221208"), 16));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 1511, BinaryString.fromString("20221208"), 15));
        newWrite.write(rowData(1, 1510, BinaryString.fromString("20221209"), 15));
        newWrite.compact(binaryRow("20221208", 15), 0, true);
        newWrite.compact(binaryRow("20221209", 15), 0, true);
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(2, 1520, BinaryString.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1621, BinaryString.fromString("20221208"), 16));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        newWrite.write(rowData(1, 1512, BinaryString.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1620, BinaryString.fromString("20221209"), 16));
        newCommit.commit(3L, newWrite.prepareCommit(true, 3L));
        CloseableIterator executeAndCollect = new CompactorSourceBuilder("test", createFileStoreTable).withContinuousMode(true).withEnv(streamExecutionEnvironmentBuilder().streamingMode().build()).build().executeAndCollect();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(Arrays.asList("+I 4|20221208|15|0|1", "+I 4|20221208|16|0|1", "+I 5|20221208|15|0|1", "+I 5|20221209|16|0|1"));
        newWrite.write(rowData(2, 1520, BinaryString.fromString("20221209"), 15));
        newWrite.write(rowData(1, 1510, BinaryString.fromString("20221208"), 16));
        newWrite.write(rowData(1, 1511, BinaryString.fromString("20221209"), 15));
        newCommit.commit(4L, newWrite.prepareCommit(true, 4L));
        arrayList.clear();
        for (int i2 = 0; i2 < 2; i2++) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(Arrays.asList("+I 6|20221208|16|0|1", "+I 6|20221209|15|0|1"));
        newWrite.close();
        newCommit.close();
        executeAndCollect.close();
    }

    @Test
    public void testStreamingPartitionSpec() throws Exception {
        testPartitionSpec(true, getSpecifiedPartitions(), Arrays.asList("+I 1|20221208|16|0|1", "+I 2|20221209|15|0|1", "+I 3|20221208|16|0|1", "+I 3|20221209|15|0|1"));
    }

    @Test
    public void testBatchPartitionSpec() throws Exception {
        testPartitionSpec(false, getSpecifiedPartitions(), Arrays.asList("+I 3|20221208|16|0|0", "+I 3|20221209|15|0|0"));
    }

    private List<Map<String, String>> getSpecifiedPartitions() {
        HashMap hashMap = new HashMap();
        hashMap.put("dt", "20221208");
        hashMap.put("hh", "16");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("dt", "20221209");
        hashMap2.put("hh", "15");
        return Arrays.asList(hashMap, hashMap2);
    }

    private void testPartitionSpec(boolean z, List<Map<String, String>> list, List<String> list2) throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        StreamWriteBuilder withCommitUser = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite newWrite = withCommitUser.newWrite();
        StreamTableCommit newCommit = withCommitUser.newCommit();
        newWrite.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1620, BinaryString.fromString("20221208"), 16));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(2, 1520, BinaryString.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1520, BinaryString.fromString("20221209"), 15));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(1, 1511, BinaryString.fromString("20221208"), 15));
        newWrite.write(rowData(1, 1610, BinaryString.fromString("20221208"), 16));
        newWrite.write(rowData(1, 1510, BinaryString.fromString("20221209"), 15));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        CloseableIterator executeAndCollect = new CompactorSourceBuilder("test", createFileStoreTable).withContinuousMode(z).withEnv(streamExecutionEnvironmentBuilder().streamingMode().build()).withPartitions(list).build().executeAndCollect();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list2.size(); i++) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(list2);
        newWrite.close();
        newCommit.close();
        executeAndCollect.close();
    }

    private String toString(RowData rowData) {
        try {
            int size = this.dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size();
            BinaryRow deserializeBinaryRow = SerializationUtils.deserializeBinaryRow(rowData.getBinary(1));
            return String.format("%s %d|%s|%d|%d|%d", rowData.getRowKind().shortString(), Long.valueOf(rowData.getLong(0)), deserializeBinaryRow.getString(0), Integer.valueOf(deserializeBinaryRow.getInt(1)), Integer.valueOf(rowData.getInt(2)), Integer.valueOf(size));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private GenericRow rowData(Object... objArr) {
        return GenericRow.of(objArr);
    }

    private BinaryRow binaryRow(String str, int i) {
        BinaryRow binaryRow = new BinaryRow(2);
        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
        binaryRowWriter.writeString(0, BinaryString.fromString(str));
        binaryRowWriter.writeInt(1, i);
        binaryRowWriter.complete();
        return binaryRow;
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        return FileStoreTableFactory.create(LocalFileIO.create(), this.tablePath, new SchemaManager(LocalFileIO.create(), this.tablePath).createTable(new Schema(ROW_TYPE.getFields(), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.singletonMap("bucket", "1"), "")));
    }
}
