/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.format;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

public class TestInputFormat {
    private HoodieTableSource tableSource;
    private Configuration conf;
    @TempDir
    File tempFile;

    void beforeEach(HoodieTableType tableType) throws IOException {
        this.beforeEach(tableType, Collections.emptyMap());
    }

    void beforeEach(HoodieTableType tableType, Map<String, String> options) throws IOException {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        options.forEach((key, value) -> this.conf.setString(key, value));
        StreamerUtil.initTableIfNotExists((Configuration)this.conf);
        this.tableSource = this.getTableSource(this.conf);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testRead(HoodieTableType tableType) throws Exception {
        this.beforeEach(tableType);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        result = TestInputFormat.readData(inputFormat);
        actual = TestData.rowDataToString(result);
        expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], +I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], +I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], +I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadBaseAndLogFiles() throws Exception {
        this.beforeEach(HoodieTableType.MERGE_ON_READ);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, this.conf);
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        result = TestInputFormat.readData(inputFormat);
        actual = TestData.rowDataToString(result);
        expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], +I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], +I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], +I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4], +I[id12, Monica, 27, 1970-01-01T00:00:00.009, par5], +I[id13, Phoebe, 31, 1970-01-01T00:00:00.010, par5], +I[id14, Rachel, 52, 1970-01-01T00:00:00.011, par6], +I[id15, Ross, 29, 1970-01-01T00:00:00.012, par6]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadBaseAndLogFilesWithDeletes() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> result1 = TestInputFormat.readData(inputFormat);
        String actual1 = TestData.rowDataToString(result1);
        String expected1 = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]";
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]"));
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        ((MergeOnReadInputFormat)inputFormat).isEmitDelete(true);
        List<RowData> result2 = TestInputFormat.readData(inputFormat);
        String actual2 = TestData.rowDataToString(result2);
        String expected2 = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]";
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testReadBaseAndLogFilesWithDisorderUpdateDelete(boolean compact) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, compact);
        TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> result1 = TestInputFormat.readData(inputFormat);
        String rowKind = compact ? "I" : "U";
        String expected = "[+" + rowKind + "[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
        String actual1 = TestData.rowDataToString(result1);
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)expected));
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        ((MergeOnReadInputFormat)inputFormat).isEmitDelete(true);
        List<RowData> result2 = TestInputFormat.readData(inputFormat);
        String actual2 = TestData.rowDataToString(result2);
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadWithDeletesMOR() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        ((MergeOnReadInputFormat)inputFormat).isEmitDelete(true);
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"));
    }

    @Test
    void testReadWithDeletesCOW() throws Exception {
        this.beforeEach(HoodieTableType.COPY_ON_WRITE);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(CopyOnWriteInputFormat.class));
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1]]"));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
        this.beforeEach(tableType);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        HashMap<String, String> prunedPartitions = new HashMap<String, String>();
        prunedPartitions.put("partition", "par1");
        this.tableSource.applyPartitions(Collections.singletonList(prunedPartitions));
        InputFormat inputFormat = this.tableSource.getInputFormat();
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = "[+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadBaseFilesWithStartCommit() throws Exception {
        this.beforeEach(HoodieTableType.COPY_ON_WRITE);
        org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf((Configuration)this.conf);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
        IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath((Path)metaClient.getBasePathV2())).requiredPartitions(new HashSet<String>(Arrays.asList("par1", "par2", "par3", "par4"))).build();
        IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null);
        Assertions.assertFalse((boolean)splits1.isEmpty());
        List<RowData> result1 = TestInputFormat.readData(inputFormat, (InputSplit[])splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual1 = TestData.rowDataToString(result1);
        String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)expected1));
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String secondCommit = TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 1, false);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
        IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null);
        Assertions.assertFalse((boolean)splits2.isEmpty());
        List<RowData> result2 = TestInputFormat.readData(inputFormat, (InputSplit[])splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual2 = TestData.rowDataToString(result2);
        String expected2 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)expected2));
    }

    @Test
    void testReadChangesMergedMOR() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> result1 = TestInputFormat.readData(inputFormat);
        String actual1 = TestData.rowDataToString(result1);
        String expected1 = "[]";
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)"[]"));
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        ((MergeOnReadInputFormat)inputFormat).isEmitDelete(true);
        List<RowData> result2 = TestInputFormat.readData(inputFormat);
        String actual2 = TestData.rowDataToString(result2);
        String expected2 = "[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]";
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)"[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"));
    }

    @Test
    void testReadChangesUnMergedMOR() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = "[+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], -U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], +U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], -U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], +U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], -U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], +U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1], -D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], -U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], +U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], -U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], +U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], -U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], +U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1], -D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testReadIncrementally(HoodieTableType tableType) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        this.beforeEach(tableType, options);
        for (int i = 0; i < 6; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, this.conf);
        }
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((String)this.tempFile.getAbsolutePath(), (org.apache.hadoop.conf.Configuration)HadoopConfigurations.getHadoopConf((Configuration)this.conf));
        List commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)commits.size(), (Matcher)CoreMatchers.is((Object)3));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat1 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat1, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual1 = TestInputFormat.readData(inputFormat1);
        List<RowData> expected1 = TestData.dataSetInsert(3, 4, 5, 6);
        TestData.assertRowDataEquals(actual1, expected1);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat2, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual2 = TestInputFormat.readData(inputFormat2);
        List<RowData> expected2 = TestData.dataSetInsert(1, 2, 3, 4, 5, 6);
        TestData.assertRowDataEquals(actual2, expected2);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)commits.get(0));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat3 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat3, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual3 = TestInputFormat.readData(inputFormat3);
        List<RowData> expected3 = TestData.dataSetInsert(1, 2, 3, 4);
        TestData.assertRowDataEquals(actual3, expected3);
        this.conf.removeConfig(FlinkOptions.READ_START_COMMIT);
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat4 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat4, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual4 = TestInputFormat.readData(inputFormat4);
        List<RowData> expected4 = TestData.dataSetInsert(3, 4);
        TestData.assertRowDataEquals(actual4, expected4);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "000");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat5 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat4, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual5 = TestInputFormat.readData(inputFormat5);
        List<RowData> expected5 = TestData.dataSetInsert(1, 2, 3, 4);
        TestData.assertRowDataEquals(actual5, expected5);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "001");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, "002");
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat6 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat6, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual6 = TestInputFormat.readData(inputFormat6);
        TestData.assertRowDataEquals(actual6, Collections.emptyList());
    }

    @Test
    void testMergeOnReadDisorderUpdateAfterCompaction() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), EventTimeAvroPayload.class.getName());
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_DISORDER_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        String baseResult = TestData.rowDataToString(TestInputFormat.readData(inputFormat));
        String expected = "[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
        MatcherAssert.assertThat((Object)baseResult, (Matcher)CoreMatchers.is((Object)expected));
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, this.conf);
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        String baseMergeLogFileResult = TestData.rowDataToString(TestInputFormat.readData(inputFormat));
        MatcherAssert.assertThat((Object)baseMergeLogFileResult, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadArchivedCommitsIncrementally() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        options.put(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), "3");
        options.put(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), "4");
        options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "2");
        options.put(FlinkOptions.METADATA_ENABLED.key(), "false");
        options.put("hoodie.commits.archival.batch", "1");
        this.beforeEach(HoodieTableType.COPY_ON_WRITE, options);
        for (int i = 0; i < 20; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, this.conf);
        }
        HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient((HoodieEngineContext)HoodieFlinkEngineContext.DEFAULT, FlinkWriteClients.getHoodieClientConfig((Configuration)this.conf));
        writeClient.clean();
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((String)this.tempFile.getAbsolutePath(), (org.apache.hadoop.conf.Configuration)HadoopConfigurations.getHadoopConf((Configuration)this.conf));
        List commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)commits.size(), (Matcher)CoreMatchers.is((Object)4));
        List archivedCommits = metaClient.getArchivedTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)archivedCommits.size(), (Matcher)CoreMatchers.is((Object)6));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)archivedCommits.get(0));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)archivedCommits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat1 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat1, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual1 = TestInputFormat.readData(inputFormat1);
        List<RowData> expected1 = TestData.dataSetInsert(1, 2, 3, 4);
        TestData.assertRowDataEquals(actual1, expected1);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)archivedCommits.get(1));
        this.conf.removeConfig(FlinkOptions.READ_END_COMMIT);
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat2, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual2 = TestInputFormat.readData(inputFormat2);
        List<RowData> expected2 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
        TestData.assertRowDataEquals(actual2, expected2);
        this.conf.removeConfig(FlinkOptions.READ_START_COMMIT);
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)archivedCommits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat3 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat3, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual3 = TestInputFormat.readData(inputFormat3);
        List<RowData> expected3 = TestData.dataSetInsert(3, 4);
        TestData.assertRowDataEquals(actual3, expected3);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)archivedCommits.get(1));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)commits.get(0));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat4 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat4, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual4 = TestInputFormat.readData(inputFormat4);
        List<RowData> expected4 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
        TestData.assertRowDataEquals(actual4, expected4);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema((LogicalType)TestConfigurations.ROW_TYPE_WIDER).toString());
        this.beforeEach(tableType, options);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        List<RowData> result = TestInputFormat.readData(inputFormat);
        TestData.assertRowDataEquals(result, TestData.DATA_SET_INSERT);
    }

    @Test
    void testReadMORWithCompactionPlanScheduled() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        for (int i = 0; i < 6; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, this.conf);
        }
        InputFormat inputFormat1 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat1, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual = TestInputFormat.readData(inputFormat1);
        List<RowData> expected = TestData.dataSetInsert(1, 2, 3, 4, 5, 6);
        TestData.assertRowDataEquals(actual, expected);
    }

    private HoodieTableSource getTableSource(Configuration conf) {
        return new HoodieTableSource(TestConfigurations.TABLE_SCHEMA, new Path(this.tempFile.getAbsolutePath()), Collections.singletonList("partition"), "default", conf);
    }

    private static List<RowData> readData(InputFormat inputFormat) throws IOException {
        InputSplit[] inputSplits = inputFormat.createInputSplits(1);
        return TestInputFormat.readData(inputFormat, inputSplits);
    }

    private static List<RowData> readData(InputFormat inputFormat, InputSplit[] inputSplits) throws IOException {
        ArrayList<RowData> result = new ArrayList<RowData>();
        for (InputSplit inputSplit : inputSplits) {
            inputFormat.open(inputSplit);
            while (!inputFormat.reachedEnd()) {
                result.add(TestConfigurations.SERIALIZER.copy((RowData)inputFormat.nextRecord(null)));
            }
            inputFormat.close();
        }
        return result;
    }
}

