package org.apache.hudi.table.format;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/hudi/table/format/TestInputFormat.class */
public class TestInputFormat {
    private HoodieTableSource tableSource;
    private Configuration conf;

    @TempDir
    File tempFile;

    void beforeEach(HoodieTableType hoodieTableType) throws IOException {
        beforeEach(hoodieTableType, Collections.emptyMap());
    }

    void beforeEach(HoodieTableType hoodieTableType, Map<String, String> map) throws IOException {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, hoodieTableType.name());
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        map.forEach((str, str2) -> {
            this.conf.setString(str, str2);
        });
        StreamerUtil.initTableIfNotExists(this.conf);
        this.tableSource = getTableSource(this.conf);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testRead(HoodieTableType hoodieTableType) throws Exception {
        beforeEach(hoodieTableType);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_INSERT)));
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        this.tableSource.reset();
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], +I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], +I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], +I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4]]"));
    }

    @Test
    void testReadBaseAndLogFiles() throws Exception {
        beforeEach(HoodieTableType.MERGE_ON_READ);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_INSERT)));
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, this.conf);
        this.tableSource.reset();
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], +I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], +I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], +I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4], +I[id12, Monica, 27, 1970-01-01T00:00:00.009, par5], +I[id13, Phoebe, 31, 1970-01-01T00:00:00.010, par5], +I[id14, Rachel, 52, 1970-01-01T00:00:00.011, par6], +I[id15, Ross, 29, 1970-01-01T00:00:00.012, par6]]"));
    }

    @Test
    void testReadBaseAndLogFilesWithDeletes() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]"));
        this.tableSource.reset();
        MergeOnReadInputFormat inputFormat2 = this.tableSource.getInputFormat();
        inputFormat2.isEmitDelete(true);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat2)), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testReadBaseAndLogFilesWithDisorderUpdateDelete(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, z);
        TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> readData = readData(inputFormat);
        String str = "[+" + (z ? "I" : "U") + "[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
        MatcherAssert.assertThat(TestData.rowDataToString(readData), CoreMatchers.is(str));
        this.tableSource.reset();
        MergeOnReadInputFormat inputFormat2 = this.tableSource.getInputFormat();
        inputFormat2.isEmitDelete(true);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat2)), CoreMatchers.is(str));
    }

    @Test
    void testReadWithDeletesMOR() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        MergeOnReadInputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        inputFormat.isEmitDelete(true);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"));
    }

    @Test
    void testReadWithDeletesCOW() throws Exception {
        beforeEach(HoodieTableType.COPY_ON_WRITE);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(CopyOnWriteInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1]]"));
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testReadWithPartitionPrune(HoodieTableType hoodieTableType) throws Exception {
        beforeEach(hoodieTableType);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        HashMap hashMap = new HashMap();
        hashMap.put("partition", "par1");
        this.tableSource.applyPartitions(Collections.singletonList(hashMap));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is("[+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]]"));
    }

    @Test
    void testReadChangesMergedMOR() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[]"));
        this.tableSource.reset();
        MergeOnReadInputFormat inputFormat2 = this.tableSource.getInputFormat();
        inputFormat2.isEmitDelete(true);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat2)), CoreMatchers.is("[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"));
    }

    @Test
    void testReadChangesUnMergedMOR() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        hashMap.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], -U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], +U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], -U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], +U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], -U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], +U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1], -D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"));
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testReadIncrementally(HoodieTableType hoodieTableType) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        beforeEach(hoodieTableType, hashMap);
        for (int i = 0; i < 6; i += 2) {
            TestData.writeData(TestData.dataSetInsert(i + 1, i + 2), this.conf);
        }
        List list = (List) StreamerUtil.createMetaClient(this.tempFile.getAbsolutePath()).getCommitsTimeline().filterCompletedInstants().getInstants().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toList());
        MatcherAssert.assertThat(Integer.valueOf(list.size()), CoreMatchers.is(3));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat), TestData.dataSetInsert(3, 4, 5, 6));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat2, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat2), TestData.dataSetInsert(1, 2, 3, 4, 5, 6));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String) list.get(0));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat3 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat3, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat3), TestData.dataSetInsert(1, 2, 3, 4));
        this.conf.removeConfig(FlinkOptions.READ_START_COMMIT);
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat4 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat4, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat4), TestData.dataSetInsert(3, 4));
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testReadWithWiderSchema(HoodieTableType hoodieTableType) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString());
        beforeEach(hoodieTableType, hashMap);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.assertRowDataEquals(readData(this.tableSource.getInputFormat()), TestData.DATA_SET_INSERT);
    }

    @Test
    void testReadMORWithCompactionPlanScheduled() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
        hashMap.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        for (int i = 0; i < 6; i += 2) {
            TestData.writeData(TestData.dataSetInsert(i + 1, i + 2), this.conf);
        }
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat), TestData.dataSetInsert(1, 2, 3, 4, 5, 6));
    }

    private HoodieTableSource getTableSource(Configuration configuration) {
        return new HoodieTableSource(TestConfigurations.TABLE_SCHEMA, new Path(this.tempFile.getAbsolutePath()), Collections.singletonList("partition"), "default", configuration);
    }

    private static List<RowData> readData(InputFormat inputFormat) throws IOException {
        InputSplit[] createInputSplits = inputFormat.createInputSplits(1);
        ArrayList arrayList = new ArrayList();
        for (InputSplit inputSplit : createInputSplits) {
            inputFormat.open(inputSplit);
            while (!inputFormat.reachedEnd()) {
                arrayList.add(TestConfigurations.SERIALIZER.copy((RowData) inputFormat.nextRecord((Object) null)));
            }
            inputFormat.close();
        }
        return arrayList;
    }
}
