/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.io.File;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.factory.CollectSinkTableFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(value={FlinkMiniCluster.class})
public class ITTestHoodieDataSource {
    private TableEnvironment streamTableEnv;
    private TableEnvironment batchTableEnv;
    @TempDir
    File tempFile;

    @BeforeEach
    void beforeEach() {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        this.streamTableEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        this.streamTableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        Configuration execConf = this.streamTableEnv.getConfig().getConfiguration();
        execConf.setString("execution.checkpointing.interval", "2s");
        execConf.setString("restart-strategy", "fixed-delay");
        execConf.setString("restart-strategy.fixed-delay.attempts", "0");
        this.batchTableEnv = TestTableEnvs.getBatchTableEnv();
        this.batchTableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType) throws Exception {
        String createSource = TestConfigurations.getFileSourceDDL("source");
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.TABLE_TYPE, (Object)tableType).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        String firstCommit = TestUtils.getFirstCompleteInstant(this.tempFile.getAbsolutePath());
        this.streamTableEnv.executeSql("drop table t1");
        hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(FlinkOptions.READ_START_COMMIT, (Object)firstCommit).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        List<Row> rows = this.execSelectSql(this.streamTableEnv, "select * from t1", 10L);
        TestData.assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
        this.execInsertSql(this.streamTableEnv, insertInto);
        List<Row> rows2 = this.execSelectSql(this.streamTableEnv, "select * from t1", 10L);
        TestData.assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
        this.streamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true);
        List<Row> rows3 = this.execSelectSql(this.streamTableEnv, "select * from t1/*+options('read.start-commit'='earliest')*/", 10L);
        TestData.assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
        String createSource = TestConfigurations.getFileSourceDDL("source");
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.TABLE_TYPE, (Object)tableType).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        List<Row> rows = this.execSelectSql(this.streamTableEnv, "select * from t1", 10L);
        TestData.assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
        this.execInsertSql(this.streamTableEnv, insertInto);
        List<Row> rows2 = this.execSelectSql(this.streamTableEnv, "select * from t1", 10L);
        TestData.assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testStreamReadAppendData(HoodieTableType tableType) throws Exception {
        String createSource = TestConfigurations.getFileSourceDDL("source");
        String createSource2 = TestConfigurations.getFileSourceDDL("source2", "test_source_2.data");
        this.streamTableEnv.executeSql(createSource);
        this.streamTableEnv.executeSql(createSource2);
        String createHoodieTable = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.TABLE_TYPE, (Object)tableType).end();
        this.streamTableEnv.executeSql(createHoodieTable);
        String insertInto = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        String specifiedCommit = TestUtils.getFirstCompleteInstant(this.tempFile.getAbsolutePath());
        String insertInto2 = "insert into t1 select * from source2";
        this.execInsertSql(this.streamTableEnv, insertInto2);
        String createHoodieTable2 = TestConfigurations.sql("t2").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(FlinkOptions.READ_START_COMMIT, (Object)specifiedCommit).end();
        this.streamTableEnv.executeSql(createHoodieTable2);
        List<Row> rows = this.execSelectSql(this.streamTableEnv, "select * from t2", 10L);
        TestData.assertRowsEquals(rows, TestData.DATA_SET_SOURCE_MERGED);
    }

    @Test
    void testStreamWriteBatchRead() {
        String createSource = TestConfigurations.getFileSourceDDL("source");
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        List rows = CollectionUtil.iterableToList(() -> this.streamTableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)rows, TestData.DATA_SET_SOURCE_INSERT);
    }

    @Test
    void testStreamWriteBatchReadOptimized() throws Exception {
        String createSource = TestConfigurations.getFileSourceDDL("source");
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.QUERY_TYPE, (Object)"read_optimized").option(FlinkOptions.COMPACTION_DELTA_COMMITS, (Object)1).option(FlinkOptions.COMPACTION_TASKS, (Object)1).option(FlinkOptions.METADATA_ENABLED, (Object)false).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        TimeUnit.SECONDS.sleep(5L);
        List rows = CollectionUtil.iterableToList(() -> this.streamTableEnv.sqlQuery("select * from t1").execute().collect());
        if (rows.size() < TestData.DATA_SET_SOURCE_INSERT.size()) {
            TestData.assertRowsEquals((List<Row>)rows, TestData.DATA_SET_SOURCE_INSERT_FIRST_COMMIT);
        } else {
            TestData.assertRowsEquals((List<Row>)rows, TestData.DATA_SET_SOURCE_INSERT);
        }
    }

    @Test
    void testStreamWriteReadSkippingCompaction() throws Exception {
        String createSource = TestConfigurations.getFileSourceDDL("source", 4);
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, (Object)true).option(FlinkOptions.COMPACTION_DELTA_COMMITS, (Object)1).option(FlinkOptions.COMPACTION_TASKS, (Object)1).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        String instant = TestUtils.getNthCompleteInstant(this.tempFile.getAbsolutePath(), 2, true);
        this.streamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true);
        String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant);
        List<Row> rows = this.execSelectSql(this.streamTableEnv, query, 10L);
        TestData.assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
    }

    @Test
    void testStreamWriteWithCleaning() {
        String createSource = TestConfigurations.getFileSourceDDL("source", "test_source_3.data", 4);
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.CLEAN_RETAIN_COMMITS, (Object)1).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        HashMap<String, String> options1 = new HashMap<String, String>(defaultConf.toMap());
        options1.put(FlinkOptions.TABLE_NAME.key(), "t1");
        Configuration conf = Configuration.fromMap(options1);
        HoodieActiveTimeline timeline = StreamerUtil.createMetaClient((Configuration)conf).getActiveTimeline();
        Assertions.assertTrue((boolean)timeline.filterCompletedInstants().getInstants().anyMatch(instant -> instant.getAction().equals("clean")), (String)"some commits should be cleaned");
    }

    @Test
    void testStreamReadWithDeletes() throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setString(FlinkOptions.TABLE_NAME, "t1");
        conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
        String latestCommit = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, (Object)2).option(FlinkOptions.READ_START_COMMIT, (Object)latestCommit).option(FlinkOptions.CHANGELOG_ENABLED, (Object)true).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String sinkDDL = "create table sink(\n  name varchar(20),\n  age_sum int\n) with (\n  'connector' = 'collect')";
        List<Row> result = this.execSelectSql(this.streamTableEnv, "select name, sum(age) from t1 group by name", "create table sink(\n  name varchar(20),\n  age_sum int\n) with (\n  'connector' = 'collect')", 10L);
        String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]";
        TestData.assertRowsEquals(result, "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]", true);
    }

    @ParameterizedTest
    @MethodSource(value={"tableTypeAndPartitioningParams"})
    void testStreamReadFilterByPartition(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setString(FlinkOptions.TABLE_NAME, "t1");
        conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
        conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, (Object)2).option(FlinkOptions.HIVE_STYLE_PARTITIONING, (Object)hiveStylePartitioning).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        List<Row> result = this.execSelectSql(this.streamTableEnv, "select * from t1 where `partition`='par1'", 10L);
        String expected = "[+I(+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1]), +I(+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1])]";
        TestData.assertRowsEquals(result, "[+I(+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1]), +I(+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1])]", true);
    }

    @Test
    void testStreamReadMorTableWithCompactionPlan() throws Exception {
        String createSource = TestConfigurations.getFileSourceDDL("source");
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.READ_START_COMMIT, (Object)"earliest").option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, (Object)2).option(FlinkOptions.COMPACTION_ASYNC_ENABLED, (Object)false).option(FlinkOptions.COMPACTION_DELTA_COMMITS, (Object)1).noPartition().end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        List<Row> result = this.execSelectSql(this.streamTableEnv, "select * from t1", 10L);
        String expected = "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:02, par1], +I[id3, Julian, 53, 1970-01-01T00:00:03, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:05, par3], +I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]";
        TestData.assertRowsEquals(result, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:02, par1], +I[id3, Julian, 53, 1970-01-01T00:00:03, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:05, par3], +I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @ParameterizedTest
    @MethodSource(value={"executionModeAndPartitioningParams"})
    void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {
        TableEnvironment tableEnv = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.HIVE_STYLE_PARTITIONING, (Object)hiveStylePartitioning).end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, TestData.DATA_SET_SOURCE_INSERT);
        List result2 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1 where uuid > 'id5'").execute().collect());
        TestData.assertRowsEquals((List<Row>)result2, "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @ParameterizedTest
    @MethodSource(value={"tableTypeAndPartitioningParams"})
    void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean hiveStylePartitioning) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").field("uuid varchar(20)").field("name varchar(10)").field("age int").field("tss timestamp(3)").field("`partition` varchar(10)").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(FlinkOptions.HIVE_STYLE_PARTITIONING, (Object)hiveStylePartitioning).end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:05','par1'),\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:04','par1'),\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:03','par1'),\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]");
    }

    @ParameterizedTest
    @MethodSource(value={"tableTypeAndPartitioningParams"})
    void testWriteAndReadWithProctimeSequenceWithTsColumnExisting(HoodieTableType tableType, boolean hiveStylePartitioning) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").field("uuid varchar(20)").field("name varchar(10)").field("age int").field("ts timestamp(3)").field("`partition` varchar(10)").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(FlinkOptions.HIVE_STYLE_PARTITIONING, (Object)hiveStylePartitioning).option(FlinkOptions.PRECOMBINE_FIELD, (Object)"no_precombine").end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:05','par1'),\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:04','par1'),\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:03','par1'),\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]");
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_NAME, (Object)tableType.name()).option("hoodie.parquet.small.file.limit", (Object)"0").option("hoodie.parquet.max.file.size", (Object)"0").noPartition().end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, TestData.DATA_SET_SOURCE_INSERT);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',54,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id9','Jane',19,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id10','Ella',38,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id11','Phoebe',52,TIMESTAMP '1970-01-01 00:00:08','par4')");
        List result2 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result2, TestData.DATA_SET_SOURCE_MERGED);
    }

    @ParameterizedTest
    @MethodSource(value={"tableTypeAndPartitioningParams"})
    void testBatchModeUpsert(HoodieTableType tableType, boolean hiveStylePartitioning) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_NAME, (Object)tableType).option(FlinkOptions.HIVE_STYLE_PARTITIONING, (Object)hiveStylePartitioning).end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, TestData.DATA_SET_SOURCE_INSERT);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',54,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id9','Jane',19,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id10','Ella',38,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id11','Phoebe',52,TIMESTAMP '1970-01-01 00:00:08','par4')");
        List result2 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result2, TestData.DATA_SET_SOURCE_MERGED);
    }

    @ParameterizedTest
    @EnumSource(value=ExecMode.class)
    void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
        boolean streaming = execMode == ExecMode.STREAM;
        String hoodieTableDDL = "create table t1(\n  uuid varchar(20),\n  name varchar(10),\n  age int,\n  `partition` varchar(20),\n  ts timestamp(3),\n  PRIMARY KEY(uuid) NOT ENFORCED\n)\nPARTITIONED BY (`partition`)\nwith (\n  'connector' = 'hudi',\n  'path' = '" + this.tempFile.getAbsolutePath() + "',\n  'read.streaming.enabled' = '" + streaming + "'\n)";
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 values\n('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 00:00:01'),\n('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 00:00:02'),\n('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 00:00:03'),\n('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 00:00:04'),\n('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 00:00:05'),\n('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 00:00:06'),\n('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 00:00:07'),\n('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')";
        this.execInsertSql(this.streamTableEnv, insertInto);
        String expected = "[+I[id1, Danny, 23, par1, 1970-01-01T00:00:01], +I[id2, Stephen, 33, par1, 1970-01-01T00:00:02], +I[id3, Julian, 53, par2, 1970-01-01T00:00:03], +I[id4, Fabian, 31, par2, 1970-01-01T00:00:04], +I[id5, Sophia, 18, par3, 1970-01-01T00:00:05], +I[id6, Emma, 20, par3, 1970-01-01T00:00:06], +I[id7, Bob, 44, par4, 1970-01-01T00:00:07], +I[id8, Han, 56, par4, 1970-01-01T00:00:08]]";
        List<Row> result = this.execSelectSql(this.streamTableEnv, "select * from t1", execMode);
        TestData.assertRowsEquals(result, "[+I[id1, Danny, 23, par1, 1970-01-01T00:00:01], +I[id2, Stephen, 33, par1, 1970-01-01T00:00:02], +I[id3, Julian, 53, par2, 1970-01-01T00:00:03], +I[id4, Fabian, 31, par2, 1970-01-01T00:00:04], +I[id5, Sophia, 18, par3, 1970-01-01T00:00:05], +I[id6, Emma, 20, par3, 1970-01-01T00:00:06], +I[id7, Bob, 44, par4, 1970-01-01T00:00:07], +I[id8, Han, 56, par4, 1970-01-01T00:00:08]]");
        this.execInsertSql(this.streamTableEnv, insertInto);
        List<Row> result2 = this.execSelectSql(this.streamTableEnv, "select * from t1", execMode);
        TestData.assertRowsEquals(result2, "[+I[id1, Danny, 23, par1, 1970-01-01T00:00:01], +I[id2, Stephen, 33, par1, 1970-01-01T00:00:02], +I[id3, Julian, 53, par2, 1970-01-01T00:00:03], +I[id4, Fabian, 31, par2, 1970-01-01T00:00:04], +I[id5, Sophia, 18, par3, 1970-01-01T00:00:05], +I[id6, Emma, 20, par3, 1970-01-01T00:00:06], +I[id7, Bob, 44, par4, 1970-01-01T00:00:07], +I[id8, Han, 56, par4, 1970-01-01T00:00:08]]");
    }

    @ParameterizedTest
    @EnumSource(value=ExecMode.class)
    void testWriteAndReadWithTimestampMicros(ExecMode execMode) throws Exception {
        boolean streaming = execMode == ExecMode.STREAM;
        String hoodieTableDDL = TestConfigurations.sql("t1").field("id int").field("name varchar(10)").field("ts timestamp(6)").pkField("id").noPartition().option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object)streaming).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 values\n(1,'Danny',TIMESTAMP '2021-12-01 01:02:01.100001'),\n(2,'Stephen',TIMESTAMP '2021-12-02 03:04:02.200002'),\n(3,'Julian',TIMESTAMP '2021-12-03 13:14:03.300003'),\n(4,'Fabian',TIMESTAMP '2021-12-04 15:16:04.400004'),\n(5,'Tom',TIMESTAMP '2721-12-04 15:16:04.500005')";
        this.execInsertSql(this.streamTableEnv, insertInto);
        String expected = "[+I[1, Danny, 2021-12-01T01:02:01.100001], +I[2, Stephen, 2021-12-02T03:04:02.200002], +I[3, Julian, 2021-12-03T13:14:03.300003], +I[4, Fabian, 2021-12-04T15:16:04.400004], +I[5, Tom, 2721-12-04T15:16:04.500005]]";
        List<Row> result = this.execSelectSql(this.streamTableEnv, "select * from t1", execMode);
        TestData.assertRowsEquals(result, "[+I[1, Danny, 2021-12-01T01:02:01.100001], +I[2, Stephen, 2021-12-02T03:04:02.200002], +I[3, Julian, 2021-12-03T13:14:03.300003], +I[4, Fabian, 2021-12-04T15:16:04.400004], +I[5, Tom, 2721-12-04T15:16:04.500005]]");
        this.execInsertSql(this.streamTableEnv, insertInto);
        List<Row> result2 = this.execSelectSql(this.streamTableEnv, "select * from t1", execMode);
        TestData.assertRowsEquals(result2, "[+I[1, Danny, 2021-12-01T01:02:01.100001], +I[2, Stephen, 2021-12-02T03:04:02.200002], +I[3, Julian, 2021-12-03T13:14:03.300003], +I[4, Fabian, 2021-12-04T15:16:04.400004], +I[5, Tom, 2721-12-04T15:16:04.500005]]");
    }

    @ParameterizedTest
    @EnumSource(value=ExecMode.class)
    void testInsertOverwrite(ExecMode execMode) {
        TableEnvironment tableEnv = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
        String insertInto2 = "insert overwrite t1 partition(`partition`='par1') values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n";
        this.execInsertSql(tableEnv, "insert overwrite t1 partition(`partition`='par1') values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n");
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
        String insertInto3 = "insert overwrite t1 values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n";
        this.execInsertSql(tableEnv, "insert overwrite t1 values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n");
        List result2 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]";
        TestData.assertRowsEquals((List<Row>)result2, "[+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]");
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testStreamWriteAndReadWithMiniBatches(HoodieTableType tableType) throws Exception {
        String createSource = TestConfigurations.getFileSourceDDL("source", 4);
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(FlinkOptions.READ_START_COMMIT, (Object)"earliest").option(FlinkOptions.WRITE_BATCH_SIZE, (Object)1.0E-5).noPartition().end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        List<Row> rows = this.execSelectSql(this.streamTableEnv, "select * from t1", 20L);
        TestData.assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
    }

    @ParameterizedTest
    @MethodSource(value={"executionModeAndTableTypeParams"})
    void testBatchUpsertWithMiniBatches(ExecMode execMode, HoodieTableType tableType) {
        TableEnvironment tableEnv = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.WRITE_BATCH_SIZE, (Object)"0.001").option(FlinkOptions.TABLE_TYPE, (Object)tableType).end();
        tableEnv.executeSql(hoodieTableDDL);
        String insertInto1 = "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        String insertInto2 = "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par1'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par1')";
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par1'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par1')");
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]");
    }

    @ParameterizedTest
    @MethodSource(value={"executionModeAndTableTypeParams"})
    void testBatchUpsertWithMiniBatchesGlobalIndex(ExecMode execMode, HoodieTableType tableType) {
        TableEnvironment tableEnv = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.WRITE_BATCH_SIZE, (Object)"0.001").option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(FlinkOptions.INDEX_GLOBAL_ENABLED, (Object)true).end();
        tableEnv.executeSql(hoodieTableDDL);
        String insertInto1 = "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        String insertInto2 = "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3')";
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3')");
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par3]]");
    }

    @Test
    void testUpdateWithDefaultHoodieRecordPayload() {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").field("id int").field("name string").field("price double").field("ts bigint").pkField("id").noPartition().option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.PAYLOAD_CLASS_NAME, (Object)DefaultHoodieRecordPayload.class.getName()).end();
        tableEnv.executeSql(hoodieTableDDL);
        String insertInto1 = "insert into t1 values\n(1,'a1',20,20)";
        this.execInsertSql(tableEnv, "insert into t1 values\n(1,'a1',20,20)");
        String insertInto4 = "insert into t1 values\n(1,'a1',20,1)";
        this.execInsertSql(tableEnv, "insert into t1 values\n(1,'a1',20,1)");
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result, "[+I[1, a1, 20.0, 20]]");
    }

    @ParameterizedTest
    @MethodSource(value={"executionModeAndTableTypeParams"})
    void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType tableType) {
        TableEnvironment tableEnv = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)tableType).noPartition().end();
        tableEnv.executeSql(hoodieTableDDL);
        String insertInto1 = "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        String insertInto2 = "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')";
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')");
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]");
    }

    @Test
    void testWriteGlobalIndex() {
        String createSource = TestConfigurations.getFileSourceDDL("source", "test_source_4.data", 4);
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.INDEX_GLOBAL_ENABLED, (Object)true).option(FlinkOptions.PRE_COMBINE, (Object)true).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto2 = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        List result = CollectionUtil.iterableToList(() -> this.streamTableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result, "[+I[id1, Phoebe, 52, 1970-01-01T00:00:08, par4]]");
    }

    @Test
    void testWriteLocalIndex() {
        String createSource = TestConfigurations.getFileSourceDDL("source", "test_source_4.data", 4);
        this.streamTableEnv.executeSql(createSource);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.INDEX_GLOBAL_ENABLED, (Object)false).option(FlinkOptions.PRE_COMBINE, (Object)true).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto2 = "insert into t1 select * from source";
        this.execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        List result = CollectionUtil.iterableToList(() -> this.streamTableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[id1, Stephen, 34, 1970-01-01T00:00:02, par1], +I[id1, Fabian, 32, 1970-01-01T00:00:04, par2], +I[id1, Jane, 19, 1970-01-01T00:00:06, par3], +I[id1, Phoebe, 52, 1970-01-01T00:00:08, par4]]";
        TestData.assertRowsEquals((List<Row>)result, "[+I[id1, Stephen, 34, 1970-01-01T00:00:02, par1], +I[id1, Fabian, 32, 1970-01-01T00:00:04, par2], +I[id1, Jane, 19, 1970-01-01T00:00:06, par3], +I[id1, Phoebe, 52, 1970-01-01T00:00:08, par4]]", 3);
    }

    @Test
    void testStreamReadEmptyTablePath() throws Exception {
        String createHoodieTable = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object)"true").option(FlinkOptions.TABLE_TYPE, (Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ).end();
        this.streamTableEnv.executeSql(createHoodieTable);
        List<Row> rows1 = this.execSelectSql(this.streamTableEnv, "select * from t1", 10L);
        TestData.assertRowsEquals(rows1, "[]");
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        StreamerUtil.initTableIfNotExists((Configuration)conf);
        List<Row> rows2 = this.execSelectSql(this.streamTableEnv, "select * from t1", 10L);
        TestData.assertRowsEquals(rows2, "[]");
    }

    @Test
    void testBatchReadEmptyTablePath() throws Exception {
        String createHoodieTable = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ).end();
        this.batchTableEnv.executeSql(createHoodieTable);
        Assertions.assertThrows(Exception.class, () -> this.execSelectSql(this.batchTableEnv, "select * from t1", 10L), (String)"Exception should throw when querying non-exists table in batch mode");
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        StreamerUtil.initTableIfNotExists((Configuration)conf);
        List rows2 = CollectionUtil.iteratorToList((Iterator)this.batchTableEnv.executeSql("select * from t1").collect());
        TestData.assertRowsEquals((List<Row>)rows2, "[]");
    }

    @ParameterizedTest
    @EnumSource(value=ExecMode.class)
    void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
        String sourcePath = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("debezium_json.data")).toString();
        String sourceDDL = "CREATE TABLE debezium_source(\n  id INT NOT NULL PRIMARY KEY NOT ENFORCED,\n  ts BIGINT,\n  name STRING,\n  description STRING,\n  weight DOUBLE\n) WITH (\n  'connector' = 'filesystem',\n  'path' = '" + sourcePath + "',\n  'format' = 'debezium-json'\n)";
        this.streamTableEnv.executeSql(sourceDDL);
        String hoodieTableDDL = TestConfigurations.sql("hoodie_sink").field("id INT NOT NULL").field("ts BIGINT").field("name STRING").field("weight DOUBLE").pkField("id").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object)(execMode == ExecMode.STREAM ? 1 : 0)).option(FlinkOptions.PRE_COMBINE, (Object)true).noPartition().end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into hoodie_sink select id, ts, name, weight from debezium_source";
        this.execInsertSql(this.streamTableEnv, insertInto);
        String expected = "[+I[101, 1000, scooter, 3.140000104904175], +I[102, 2000, car battery, 8.100000381469727], +I[103, 3000, 12-pack drill bits, 0.800000011920929], +I[104, 4000, hammer, 0.75], +I[105, 5000, hammer, 0.875], +I[106, 10000, hammer, 1.0], +I[107, 11000, rocks, 5.099999904632568], +I[108, 8000, jacket, 0.10000000149011612], +I[109, 9000, spare tire, 22.200000762939453], +I[110, 14000, jacket, 0.5]]";
        List<Row> result = this.execSelectSql(this.streamTableEnv, "select * from hoodie_sink", execMode);
        TestData.assertRowsEquals(result, "[+I[101, 1000, scooter, 3.140000104904175], +I[102, 2000, car battery, 8.100000381469727], +I[103, 3000, 12-pack drill bits, 0.800000011920929], +I[104, 4000, hammer, 0.75], +I[105, 5000, hammer, 0.875], +I[106, 10000, hammer, 1.0], +I[107, 11000, rocks, 5.099999904632568], +I[108, 8000, jacket, 0.10000000149011612], +I[109, 9000, spare tire, 22.200000762939453], +I[110, 14000, jacket, 0.5]]");
    }

    @ParameterizedTest
    @MethodSource(value={"indexAndPartitioningParams"})
    void testBulkInsert(String indexType, boolean hiveStylePartitioning) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data");
        tableEnv.executeSql(csvSourceDDL);
        String hoodieTableDDL = TestConfigurations.sql("hoodie_sink").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, (Object)"bulk_insert").option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, (Object)true).option(FlinkOptions.INDEX_TYPE, (Object)indexType).option(FlinkOptions.HIVE_STYLE_PARTITIONING, (Object)hiveStylePartitioning).end();
        tableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into hoodie_sink select * from csv_source";
        this.execInsertSql(tableEnv, insertInto);
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from hoodie_sink").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, TestData.DATA_SET_SOURCE_INSERT);
        List result2 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect());
        TestData.assertRowsEquals((List<Row>)result2, "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @Test
    void testBulkInsertNonPartitionedTable() {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, (Object)"bulk_insert").noPartition().end();
        tableEnv.executeSql(hoodieTableDDL);
        String insertInto1 = "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        String insertInto2 = "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')";
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')");
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], +I[id1, Stephen, 33, 1970-01-01T00:00:02, par2], +I[id1, Julian, 53, 1970-01-01T00:00:03, par3], +I[id1, Fabian, 31, 1970-01-01T00:00:04, par4], +I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testAppendWrite(boolean clustering) {
        TableEnvironment tableEnv = this.streamTableEnv;
        String sourceDDL = TestConfigurations.getFileSourceDDL("source");
        tableEnv.executeSql(sourceDDL);
        String hoodieTableDDL = TestConfigurations.sql("hoodie_sink").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, (Object)"insert").option(FlinkOptions.INSERT_CLUSTER, (Object)clustering).end();
        tableEnv.executeSql(hoodieTableDDL);
        String insertInto = "insert into hoodie_sink select * from source";
        this.execInsertSql(tableEnv, insertInto);
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from hoodie_sink").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, TestData.DATA_SET_SOURCE_INSERT);
        List result2 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect());
        TestData.assertRowsEquals((List<Row>)result2, "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @ParameterizedTest
    @MethodSource(value={"executionModeAndPartitioningParams"})
    void testWriteAndReadWithTimestampPartitioning(ExecMode execMode, boolean hiveStylePartitioning) {
        TableEnvironment tableEnv = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.HIVE_STYLE_PARTITIONING, (Object)hiveStylePartitioning).partitionField("ts").end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, TestData.DATA_SET_SOURCE_INSERT);
        List result2 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1 where uuid > 'id5'").execute().collect());
        TestData.assertRowsEquals((List<Row>)result2, "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @Test
    void testMergeOnReadCompactionWithTimestampPartitioning() {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.COMPACTION_DELTA_COMMITS, (Object)1).option(FlinkOptions.COMPACTION_TASKS, (Object)1).partitionField("ts").end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
        List rows = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)rows, TestData.DATA_SET_SOURCE_INSERT);
    }

    @ParameterizedTest
    @ValueSource(strings={"yyyyMMdd", "yyyy-MM-dd"})
    void testWriteAndReadWithDatePartitioning(String partitionFormat) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").field("uuid varchar(20)").field("name varchar(10)").field("age int").field("ts date").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.PARTITION_FORMAT, (Object)partitionFormat).partitionField("ts").end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,DATE '1970-01-01'),\n('id2','Stephen',33,DATE '1970-01-01'),\n('id3','Julian',53,DATE '1970-01-01'),\n('id4','Fabian',31,DATE '1970-01-01'),\n('id5','Sophia',18,DATE '1970-01-01'),\n('id6','Emma',20,DATE '1970-01-01'),\n('id7','Bob',44,DATE '1970-01-01'),\n('id8','Han',56,DATE '1970-01-01')");
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[id1, Danny, 23, 1970-01-01], +I[id2, Stephen, 33, 1970-01-01], +I[id3, Julian, 53, 1970-01-01], +I[id4, Fabian, 31, 1970-01-01], +I[id5, Sophia, 18, 1970-01-01], +I[id6, Emma, 20, 1970-01-01], +I[id7, Bob, 44, 1970-01-01], +I[id8, Han, 56, 1970-01-01]]";
        TestData.assertRowsEquals((List<Row>)result, expected);
    }

    @ParameterizedTest
    @ValueSource(strings={"bulk_insert", "upsert"})
    void testWriteReadDecimals(String operation) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String createTable = TestConfigurations.sql("decimals").field("f0 decimal(3, 2)").field("f1 decimal(10, 2)").field("f2 decimal(20, 2)").field("f3 decimal(38, 18)").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, (Object)operation).option(FlinkOptions.PRECOMBINE_FIELD, (Object)"f1").pkField("f0").noPartition().end();
        tableEnv.executeSql(createTable);
        String insertInto = "insert into decimals values\n(1.23, 12345678.12, 12345.12, 123456789.12345)";
        this.execInsertSql(tableEnv, insertInto);
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from decimals").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, "[+I[1.23, 12345678.12, 12345.12, 123456789.123450000000000000]]");
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testIncrementalRead(HoodieTableType tableType) throws Exception {
        TableEnvironment tableEnv = this.batchTableEnv;
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setString(FlinkOptions.TABLE_NAME, "t1");
        conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
        TestData.writeData(TestData.dataSetInsert(1, 2), conf);
        TestData.writeData(TestData.dataSetInsert(3, 4), conf);
        TestData.writeData(TestData.dataSetInsert(5, 6), conf);
        String latestCommit = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(FlinkOptions.READ_START_COMMIT, (Object)latestCommit).end();
        tableEnv.executeSql(hoodieTableDDL);
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result, TestData.dataSetInsert(5, 6));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws Exception {
        TableEnvironment tableEnv = this.batchTableEnv;
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setString(FlinkOptions.TABLE_NAME, "t1");
        conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
        conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, 3);
        conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, 4);
        conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, 2);
        conf.setString("hoodie.commits.archival.batch", "1");
        for (int i = 0; i < 20; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, conf);
        }
        String secondArchived = TestUtils.getNthArchivedInstant(this.tempFile.getAbsolutePath(), 1);
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(FlinkOptions.READ_START_COMMIT, (Object)secondArchived).end();
        tableEnv.executeSql(hoodieTableDDL);
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result, TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
        TableEnvironment tableEnv = this.batchTableEnv;
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setString(FlinkOptions.TABLE_NAME, "t1");
        conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        String hoodieTableDDL = TestConfigurations.sql("t1").field("uuid varchar(20)").field("name varchar(10)").field("age int").field("salary double").field("ts timestamp(3)").field("`partition` varchar(10)").pkField("uuid").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, (Object)tableType).end();
        tableEnv.executeSql(hoodieTableDDL);
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[id1, Danny, 23, null, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, null, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 53, null, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, null, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, null, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, null, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, null, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, null, 1970-01-01T00:00:00.008, par4]]";
        TestData.assertRowsEquals((List<Row>)result, "[+I[id1, Danny, 23, null, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, null, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 53, null, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, null, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, null, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, null, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, null, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, null, 1970-01-01T00:00:00.008, par4]]");
    }

    @ParameterizedTest
    @ValueSource(strings={"insert", "upsert", "bulk_insert"})
    void testParquetComplexTypes(String operation) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").field("f_int int").field("f_array array<varchar(10)>").field("f_map map<varchar(20), int>").field("f_row row(f_row_f0 int, f_row_f1 varchar(10))").pkField("f_int").noPartition().option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, (Object)operation).end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n(1, array['abc1', 'def1'], map['abc1', 1, 'def1', 3], row(1, 'abc1')),\n(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3], row(2, 'abc2')),\n(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], row(3, 'abc3'))");
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        List<Row> expected = Arrays.asList(TestData.row(1, TestData.array("abc1", "def1"), TestData.map("abc1", 1, "def1", 3), TestData.row(1, "abc1")), TestData.row(2, TestData.array("abc2", "def2"), TestData.map("abc2", 1, "def2", 3), TestData.row(2, "abc2")), TestData.row(3, TestData.array("abc3", "def3"), TestData.map("abc3", 1, "def3", 3), TestData.row(3, "abc3")));
        TestData.assertRowsEqualsUnordered(result, expected);
    }

    @ParameterizedTest
    @ValueSource(strings={"insert", "upsert", "bulk_insert"})
    void testParquetComplexNestedRowTypes(String operation) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").field("f_int int").field("f_array array<varchar(10)>").field("int_array array<int>").field("f_map map<varchar(20), int>").field("f_row row(f_nested_array array<varchar(10)>, f_nested_row row(f_row_f0 int, f_row_f1 varchar(10)))").pkField("f_int").noPartition().option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, (Object)operation).end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n(1, array['abc1', 'def1'], array[1, 1], map['abc1', 1, 'def1', 3], row(array['abc1', 'def1'], row(1, 'abc1'))),\n(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))");
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        List<Row> expected = Arrays.asList(TestData.row(1, TestData.array("abc1", "def1"), TestData.array(1, 1), TestData.map("abc1", 1, "def1", 3), TestData.row(TestData.array("abc1", "def1"), TestData.row(1, "abc1"))), TestData.row(2, TestData.array("abc2", "def2"), TestData.array(2, 2), TestData.map("abc2", 1, "def2", 3), TestData.row(TestData.array("abc2", "def2"), TestData.row(2, "abc2"))), TestData.row(3, TestData.array("abc3", "def3"), TestData.array(3, 3), TestData.map("abc3", 1, "def3", 3), TestData.row(TestData.array("abc3", "def3"), TestData.row(3, "abc3"))));
        TestData.assertRowsEqualsUnordered(result, expected);
    }

    @ParameterizedTest
    @ValueSource(strings={"insert", "upsert", "bulk_insert"})
    void testParquetNullChildColumnsRowTypes(String operation) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").field("f_int int").field("f_row row(f_row_f0 int, f_row_f1 varchar(10))").pkField("f_int").noPartition().option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, (Object)operation).end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n(1, row(cast(null as int), 'abc1')),\n(2, row(2, cast(null as varchar))),\n(3, row(cast(null as int), cast(null as varchar)))");
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[1, +I[null, abc1]], +I[2, +I[2, null]], +I[3, null]]";
        TestData.assertRowsEquals((List<Row>)result, "[+I[1, +I[null, abc1]], +I[2, +I[2, null]], +I[3, null]]");
    }

    @ParameterizedTest
    @ValueSource(strings={"insert", "upsert", "bulk_insert"})
    void testBuiltinFunctionWithCatalog(String operation) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hudiCatalogDDL = TestConfigurations.catalog("hudi_" + operation).catalogPath(this.tempFile.getAbsolutePath()).end();
        tableEnv.executeSql(hudiCatalogDDL);
        tableEnv.executeSql("use catalog " + "hudi_" + operation);
        String dbName = "hudi";
        tableEnv.executeSql("create database " + dbName);
        tableEnv.executeSql("use " + dbName);
        String hoodieTableDDL = TestConfigurations.sql("t1").field("f_int int").field("f_date DATE").pkField("f_int").partitionField("f_int").option(FlinkOptions.PATH, (Object)(this.tempFile.getAbsolutePath() + "/" + dbName + "/" + operation)).option(FlinkOptions.OPERATION, (Object)operation).end();
        tableEnv.executeSql(hoodieTableDDL);
        String insertSql = "insert into t1 values (1, TO_DATE('2022-02-02')), (2, DATE '2022-02-02')";
        this.execInsertSql(tableEnv, insertSql);
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[1, 2022-02-02], +I[2, 2022-02-02]]";
        TestData.assertRowsEquals((List<Row>)result, "[+I[1, 2022-02-02], +I[2, 2022-02-02]]");
        List partitionResult = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1 where f_int = 1").execute().collect());
        TestData.assertRowsEquals((List<Row>)partitionResult, "[+I[1, 2022-02-02]]");
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testWriteAndReadWithDataSkipping(HoodieTableType tableType) {
        TableEnvironment tableEnv = this.batchTableEnv;
        String hoodieTableDDL = TestConfigurations.sql("t1").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.METADATA_ENABLED, (Object)true).option("hoodie.metadata.index.column.stats.enable", (Object)true).option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, (Object)true).option(FlinkOptions.TABLE_TYPE, (Object)tableType).end();
        tableEnv.executeSql(hoodieTableDDL);
        this.execInsertSql(tableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')");
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, TestData.DATA_SET_SOURCE_INSERT);
        List result2 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1 where uuid > 'id5' and age > 20").execute().collect());
        TestData.assertRowsEquals((List<Row>)result2, "[+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
        List result3 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:05'").execute().collect());
        TestData.assertRowsEquals((List<Row>)result3, "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @Test
    void testBuiltinFunctionWithHMSCatalog() {
        TableEnvironment tableEnv = this.batchTableEnv;
        HoodieHiveCatalog hoodieCatalog = HoodieCatalogTestUtils.createHiveCatalog("hudi_catalog");
        tableEnv.registerCatalog("hudi_catalog", (Catalog)hoodieCatalog);
        tableEnv.executeSql("use catalog hudi_catalog");
        String dbName = "hudi";
        tableEnv.executeSql("create database " + dbName);
        tableEnv.executeSql("use " + dbName);
        String hoodieTableDDL = TestConfigurations.sql("t1").field("f_int int").field("f_date DATE").field("f_par string").pkField("f_int").partitionField("f_par").option(FlinkOptions.RECORD_KEY_FIELD, (Object)"f_int").option(FlinkOptions.PRECOMBINE_FIELD, (Object)"f_date").end();
        tableEnv.executeSql(hoodieTableDDL);
        String insertSql = "insert into t1 values (1, TO_DATE('2022-02-02'), '1'), (2, DATE '2022-02-02', '1')";
        this.execInsertSql(tableEnv, insertSql);
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[1, 2022-02-02, 1], +I[2, 2022-02-02, 1]]";
        TestData.assertRowsEquals((List<Row>)result, "[+I[1, 2022-02-02, 1], +I[2, 2022-02-02, 1]]");
        List partitionResult = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1 where f_int = 1").execute().collect());
        TestData.assertRowsEquals((List<Row>)partitionResult, "[+I[1, 2022-02-02, 1]]");
    }

    @Test
    void testWriteReadWithComputedColumns() {
        TableEnvironment tableEnv = this.batchTableEnv;
        String createTable = TestConfigurations.sql("t1").field("f0 int").field("f1 varchar(10)").field("f2 bigint").field("f3 as f0 + f2").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.PRECOMBINE_FIELD, (Object)"f1").pkField("f0").noPartition().end();
        tableEnv.executeSql(createTable);
        String insertInto = "insert into t1 values\n(1, 'abc', 2)";
        this.execInsertSql(tableEnv, insertInto);
        List result1 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result1, "[+I[1, abc, 2, 3]]");
        List result2 = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select f3 from t1").execute().collect());
        TestData.assertRowsEquals((List<Row>)result2, "[+I[3]]");
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testWriteReadWithLocalTimestamp(HoodieTableType tableType) {
        TableEnvironment tableEnv = this.batchTableEnv;
        tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
        String createTable = TestConfigurations.sql("t1").field("f0 int").field("f1 varchar(10)").field("f2 TIMESTAMP_LTZ(3)").field("f4 TIMESTAMP_LTZ(6)").option(FlinkOptions.PATH, (Object)this.tempFile.getAbsolutePath()).option(FlinkOptions.PRECOMBINE_FIELD, (Object)"f1").option(FlinkOptions.TABLE_TYPE, (Object)tableType).pkField("f0").noPartition().end();
        tableEnv.executeSql(createTable);
        String insertInto = "insert into t1 values\n(1, 'abc', TIMESTAMP '1970-01-01 08:00:01', TIMESTAMP '1970-01-01 08:00:02'),\n(2, 'def', TIMESTAMP '1970-01-01 08:00:03', TIMESTAMP '1970-01-01 08:00:04')";
        this.execInsertSql(tableEnv, insertInto);
        List result = CollectionUtil.iterableToList(() -> tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[1, abc, 1970-01-01T00:00:01Z, 1970-01-01T00:00:02Z], +I[2, def, 1970-01-01T00:00:03Z, 1970-01-01T00:00:04Z]]";
        TestData.assertRowsEquals((List<Row>)result, "[+I[1, abc, 1970-01-01T00:00:01Z, 1970-01-01T00:00:02Z], +I[2, def, 1970-01-01T00:00:03Z, 1970-01-01T00:00:04Z]]");
    }

    private static Stream<Arguments> executionModeAndTableTypeParams() {
        Object[][] data = new Object[][]{{ExecMode.BATCH, HoodieTableType.MERGE_ON_READ}, {ExecMode.BATCH, HoodieTableType.COPY_ON_WRITE}, {ExecMode.STREAM, HoodieTableType.MERGE_ON_READ}, {ExecMode.STREAM, HoodieTableType.COPY_ON_WRITE}};
        return Stream.of(data).map(Arguments::of);
    }

    private static Stream<Arguments> executionModeAndPartitioningParams() {
        Object[][] data = new Object[][]{{ExecMode.BATCH, false}, {ExecMode.BATCH, true}, {ExecMode.STREAM, false}, {ExecMode.STREAM, true}};
        return Stream.of(data).map(Arguments::of);
    }

    private static Stream<Arguments> tableTypeAndPartitioningParams() {
        Object[][] data = new Object[][]{{HoodieTableType.COPY_ON_WRITE, false}, {HoodieTableType.COPY_ON_WRITE, true}, {HoodieTableType.MERGE_ON_READ, false}, {HoodieTableType.MERGE_ON_READ, true}};
        return Stream.of(data).map(Arguments::of);
    }

    private static Stream<Arguments> indexAndPartitioningParams() {
        Object[][] data = new Object[][]{{"FLINK_STATE", false}, {"FLINK_STATE", true}, {"BUCKET", false}, {"BUCKET", true}};
        return Stream.of(data).map(Arguments::of);
    }

    private void execInsertSql(TableEnvironment tEnv, String insert) {
        TableResult tableResult = tEnv.executeSql(insert);
        try {
            tableResult.await();
        }
        catch (InterruptedException | ExecutionException exception) {
            // empty catch block
        }
    }

    private List<Row> execSelectSql(TableEnvironment tEnv, String select, ExecMode execMode) throws TableNotExistException, InterruptedException {
        String[] splits = select.split(" ");
        String tableName = splits[splits.length - 1];
        switch (execMode) {
            case STREAM: {
                return this.execSelectSql(tEnv, select, 10L, tableName);
            }
            case BATCH: {
                return CollectionUtil.iterableToList(() -> tEnv.sqlQuery("select * from " + tableName).execute().collect());
            }
        }
        throw new AssertionError();
    }

    private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout) throws InterruptedException, TableNotExistException {
        return this.execSelectSql(tEnv, select, timeout, null);
    }

    private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable) throws InterruptedException, TableNotExistException {
        String sinkDDL;
        if (sourceTable != null) {
            ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable);
            TableSchema schema = ((Catalog)tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).getTable(objectPath).getSchema();
            sinkDDL = TestConfigurations.getCollectSinkDDL("sink", schema);
        } else {
            sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
        }
        return this.execSelectSql(tEnv, select, sinkDDL, timeout);
    }

    private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout) throws InterruptedException {
        tEnv.executeSql("DROP TABLE IF EXISTS sink");
        tEnv.executeSql(sinkDDL);
        TableResult tableResult = tEnv.executeSql("insert into sink " + select);
        TimeUnit.SECONDS.sleep(timeout);
        tableResult.getJobClient().ifPresent(JobClient::cancel);
        tEnv.executeSql("DROP TABLE IF EXISTS sink");
        return CollectSinkTableFactory.RESULT.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
    }

    private static enum ExecMode {
        BATCH,
        STREAM;

    }
}

