package org.apache.flink.connectors.hive;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSourceITCase.class */
public class HiveTableSourceITCase extends BatchAbstractTestBase {
    private static HiveCatalog hiveCatalog;
    private static TableEnvironment batchTableEnv;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSourceITCase$TestConfigSource.class */
    public static class TestConfigSource extends HiveTableSource {
        private final boolean inferParallelism;

        TestConfigSource(JobConf jobConf, ReadableConfig readableConfig, ObjectPath objectPath, ResolvedCatalogTable resolvedCatalogTable, boolean z) {
            super(jobConf, readableConfig, objectPath, resolvedCatalogTable);
            this.inferParallelism = z;
        }

        public DataStream<RowData> getDataStream(ProviderContext providerContext, StreamExecutionEnvironment streamExecutionEnvironment) {
            DataStreamSource dataStream = super.getDataStream(providerContext, streamExecutionEnvironment);
            Assertions.assertThat(dataStream.getTransformation().getParallelism()).isEqualTo(this.inferParallelism ? 1 : 2);
            return dataStream;
        }
    }

    /* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSourceITCase$TestPartitionFilterCatalog.class */
    private static class TestPartitionFilterCatalog extends HiveCatalog {
        private boolean fallback;

        TestPartitionFilterCatalog(String str, String str2, @Nullable HiveConf hiveConf, String str3) {
            super(str, str2, hiveConf, str3, true);
            this.fallback = false;
        }

        public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
            this.fallback = true;
            return super.listPartitions(objectPath);
        }
    }

    @BeforeClass
    public static void createCatalog() {
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        hiveCatalog.open();
        batchTableEnv = createTableEnv();
    }

    @AfterClass
    public static void closeCatalog() {
        if (null != hiveCatalog) {
            hiveCatalog.close();
        }
    }

    @Before
    public void setupSourceDatabaseAndData() {
        batchTableEnv.executeSql("CREATE DATABASE IF NOT EXISTS source_db");
    }

    @Test
    public void testReadNonPartitionedTable() throws Exception {
        batchTableEnv.executeSql("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test").addRow(new Object[]{1, 1, "a", 1000L, Double.valueOf(1.11d)}).addRow(new Object[]{2, 2, "b", 2000L, Double.valueOf(2.22d)}).addRow(new Object[]{3, 3, "c", 3000L, Double.valueOf(3.33d)}).addRow(new Object[]{4, 4, "d", 4000L, Double.valueOf(4.44d)}).commit();
        List iteratorToList = CollectionUtil.iteratorToList(batchTableEnv.sqlQuery("select * from hive.source_db.test").execute().collect());
        Assertions.assertThat(iteratorToList).hasSize(4);
        Assertions.assertThat(((Row) iteratorToList.get(0)).toString()).isEqualTo("+I[1, 1, a, 1000, 1.11]");
        Assertions.assertThat(((Row) iteratorToList.get(1)).toString()).isEqualTo("+I[2, 2, b, 2000, 2.22]");
        Assertions.assertThat(((Row) iteratorToList.get(2)).toString()).isEqualTo("+I[3, 3, c, 3000, 3.33]");
        Assertions.assertThat(((Row) iteratorToList.get(3)).toString()).isEqualTo("+I[4, 4, d, 4000, 4.44]");
    }

    @Test
    public void testReadComplexDataType() throws Exception {
        batchTableEnv.executeSql("create table source_db.complex_test(a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>)");
        Integer[] numArr = {1, 2, 3};
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(1, "a");
        linkedHashMap.put(2, "b");
        Object[] objArr = {3, 3L};
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "complex_test").addRow(new Object[]{numArr, linkedHashMap, objArr}).commit();
        List iteratorToList = CollectionUtil.iteratorToList(batchTableEnv.sqlQuery("select * from hive.source_db.complex_test").execute().collect());
        Assertions.assertThat(iteratorToList).hasSize(1);
        Assertions.assertThat((Integer[]) ((Row) iteratorToList.get(0)).getField(0)).isEqualTo(numArr);
        Assertions.assertThat(((Row) iteratorToList.get(0)).getField(1)).isEqualTo(linkedHashMap);
        Assertions.assertThat(((Row) iteratorToList.get(0)).getField(2)).isEqualTo(Row.of(new Object[]{objArr[0], objArr[1]}));
    }

    @Test
    public void testReadParquetComplexDataType() throws Exception {
        batchTableEnv.executeSql("create table parquet_complex_type_test(a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>) stored as parquet");
        batchTableEnv.listModules();
        batchTableEnv.loadModule("hive", new HiveModule());
        batchTableEnv.useModules(new String[]{"hive", "core"});
        batchTableEnv.executeSql("insert into parquet_complex_type_test select array(1, 2), map(1, 'val1', 2, 'val2'), named_struct('f1', 1,  'f2', 2)").await();
        Assertions.assertThat(CollectionUtil.iteratorToList(batchTableEnv.sqlQuery("select * from parquet_complex_type_test").execute().collect()).toString()).isEqualTo("[+I[[1, 2], {1=val1, 2=val2}, +I[1, 2]]]");
        batchTableEnv.unloadModule("hive");
    }

    @Test
    public void testReadPartitionTable() throws Exception {
        batchTableEnv.executeSql("CREATE TABLE source_db.test_table_pt (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_table_pt").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_table_pt").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        List iteratorToList = CollectionUtil.iteratorToList(batchTableEnv.sqlQuery("select * from hive.source_db.test_table_pt").execute().collect());
        Assertions.assertThat(iteratorToList).hasSize(4);
        Assertions.assertThat(iteratorToList.stream().map((v0) -> {
            return v0.toString();
        }).sorted().toArray()).isEqualTo(new String[]{"+I[2014, 3, 0]", "+I[2014, 4, 0]", "+I[2015, 2, 1]", "+I[2015, 5, 1]"});
    }

    @Test
    public void testPartitionPrunning() throws Exception {
        batchTableEnv.executeSql("CREATE TABLE source_db.test_table_pt_1 (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_table_pt_1").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_table_pt_1").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        Table sqlQuery = batchTableEnv.sqlQuery("select * from hive.source_db.test_table_pt_1 where pt = 0");
        String[] split = sqlQuery.explain(new ExplainDetail[0]).split("==.*==\n");
        Assertions.assertThat(split).hasSize(4);
        String str = split[2];
        ((AbstractStringAssert) Assertions.assertThat(str).as(str, new Object[0])).contains(new CharSequence[]{"table=[[hive, source_db, test_table_pt_1, partitions=[{pt=0}], project=[year, value]]]"});
        List iteratorToList = CollectionUtil.iteratorToList(sqlQuery.execute().collect());
        Assertions.assertThat(iteratorToList).hasSize(2);
        Assertions.assertThat(iteratorToList.stream().map((v0) -> {
            return v0.toString();
        }).sorted().toArray()).isEqualTo(new String[]{"+I[2014, 3, 0]", "+I[2014, 4, 0]"});
        batchTableEnv.executeSql("insert into source_db.test_table_pt_1 values ('2014', 1, null), ('2015', 2, null)").await();
        Assertions.assertThat(CollectionUtil.iteratorToList(batchTableEnv.sqlQuery("select * from hive.source_db.test_table_pt_1 where pt is null").execute().collect()).toString()).isEqualTo("[+I[2014, 1, null], +I[2015, 2, null]]");
    }

    @Test
    public void testPartitionFilter() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        TestPartitionFilterCatalog testPartitionFilterCatalog = new TestPartitionFilterCatalog(hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion());
        createTableEnvInBatchMode.registerCatalog(testPartitionFilterCatalog.getName(), testPartitionFilterCatalog);
        createTableEnvInBatchMode.useCatalog(testPartitionFilterCatalog.getName());
        createTableEnvInBatchMode.executeSql("create database db1");
        try {
            createTableEnvInBatchMode.executeSql("create table db1.part(x int) partitioned by (p1 int,p2 string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{1}).commit("p1=1,p2='a'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{2}).commit("p1=2,p2='b'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{3}).commit("p1=3,p2='c'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{4}).commit("p1=4,p2='c:2'");
            Table sqlQuery = createTableEnvInBatchMode.sqlQuery("select x from db1.part where p1>1 or p2<>'a' order by x");
            String[] split = sqlQuery.explain(new ExplainDetail[0]).split("==.*==\n");
            Assertions.assertThat(testPartitionFilterCatalog.fallback).isFalse();
            String str = split[2];
            ((AbstractStringAssert) Assertions.assertThat(str).as(str, new Object[0])).contains(new CharSequence[]{"table=[[test-catalog, db1, part, partitions=[{p1=2, p2=b}, {p1=3, p2=c}, {p1=4, p2=c:2}]"});
            Assertions.assertThat(CollectionUtil.iteratorToList(sqlQuery.execute().collect()).toString()).isEqualTo("[+I[2], +I[3], +I[4]]");
            Table sqlQuery2 = createTableEnvInBatchMode.sqlQuery("select x from db1.part where p1>2 and p2<='a' order by x");
            String[] split2 = sqlQuery2.explain(new ExplainDetail[0]).split("==.*==\n");
            Assertions.assertThat(testPartitionFilterCatalog.fallback).isFalse();
            String str2 = split2[2];
            ((AbstractStringAssert) Assertions.assertThat(str2).as(str2, new Object[0])).contains(new CharSequence[]{"Values(tuples=[[]], values=[x])"});
            Assertions.assertThat(CollectionUtil.iteratorToList(sqlQuery2.execute().collect()).toString()).isEqualTo("[]");
            Table sqlQuery3 = createTableEnvInBatchMode.sqlQuery("select x from db1.part where p1 in (1,3,5) order by x");
            String[] split3 = sqlQuery3.explain(new ExplainDetail[0]).split("==.*==\n");
            Assertions.assertThat(testPartitionFilterCatalog.fallback).isFalse();
            String str3 = split3[2];
            ((AbstractStringAssert) Assertions.assertThat(str3).as(str3, new Object[0])).contains(new CharSequence[]{"table=[[test-catalog, db1, part, partitions=[{p1=1, p2=a}, {p1=3, p2=c}], project=[x]]]"});
            Assertions.assertThat(CollectionUtil.iteratorToList(sqlQuery3.execute().collect()).toString()).isEqualTo("[+I[1], +I[3]]");
            Table sqlQuery4 = createTableEnvInBatchMode.sqlQuery("select x from db1.part where (p1=1 and p2='a') or ((p1=2 and p2='b') or p2='d') order by x");
            String[] split4 = sqlQuery4.explain(new ExplainDetail[0]).split("==.*==\n");
            Assertions.assertThat(testPartitionFilterCatalog.fallback).isFalse();
            String str4 = split4[2];
            ((AbstractStringAssert) Assertions.assertThat(str4).as(str4, new Object[0])).contains(new CharSequence[]{"table=[[test-catalog, db1, part, partitions=[{p1=1, p2=a}, {p1=2, p2=b}], project=[x]]]"});
            Assertions.assertThat(CollectionUtil.iteratorToList(sqlQuery4.execute().collect()).toString()).isEqualTo("[+I[1], +I[2]]");
            Table sqlQuery5 = createTableEnvInBatchMode.sqlQuery("select x from db1.part where p2 = 'c:2' order by x");
            String[] split5 = sqlQuery5.explain(new ExplainDetail[0]).split("==.*==\n");
            Assertions.assertThat(testPartitionFilterCatalog.fallback).isFalse();
            String str5 = split5[2];
            ((AbstractStringAssert) Assertions.assertThat(str5).as(str5, new Object[0])).contains(new CharSequence[]{"table=[[test-catalog, db1, part, partitions=[{p1=4, p2=c:2}], project=[x]]]"});
            Assertions.assertThat(CollectionUtil.iteratorToList(sqlQuery5.execute().collect()).toString()).isEqualTo("[+I[4]]");
            Table sqlQuery6 = createTableEnvInBatchMode.sqlQuery("select x from db1.part where '' = p2");
            String[] split6 = sqlQuery6.explain(new ExplainDetail[0]).split("==.*==\n");
            Assertions.assertThat(testPartitionFilterCatalog.fallback).isFalse();
            String str6 = split6[2];
            ((AbstractStringAssert) Assertions.assertThat(str6).as(str6, new Object[0])).contains(new CharSequence[]{"Values(tuples=[[]], values=[x])"});
            Assertions.assertThat(CollectionUtil.iteratorToList(sqlQuery6.execute().collect()).toString()).isEqualTo("[]");
            createTableEnvInBatchMode.executeSql("drop database db1 cascade");
        } catch (Throwable th) {
            createTableEnvInBatchMode.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    @Test
    public void testPartitionFilterDateTimestamp() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        TestPartitionFilterCatalog testPartitionFilterCatalog = new TestPartitionFilterCatalog(hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion());
        createTableEnvInBatchMode.registerCatalog(testPartitionFilterCatalog.getName(), testPartitionFilterCatalog);
        createTableEnvInBatchMode.useCatalog(testPartitionFilterCatalog.getName());
        createTableEnvInBatchMode.executeSql("create database db1");
        try {
            createTableEnvInBatchMode.executeSql("create table db1.part(x int) partitioned by (p1 date,p2 timestamp)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{1}).commit("p1='2018-08-08',p2='2018-08-08 08:08:08.1'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{2}).commit("p1='2018-08-09',p2='2018-08-08 08:08:09.1'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{3}).commit("p1='2018-08-10',p2='2018-08-08 08:08:10.1'");
            Table sqlQuery = createTableEnvInBatchMode.sqlQuery("select x from db1.part where p1>cast('2018-08-09' as date) and p2<>cast('2018-08-08 08:08:09.1' as timestamp)");
            String[] split = sqlQuery.explain(new ExplainDetail[0]).split("==.*==\n");
            Assertions.assertThat(testPartitionFilterCatalog.fallback).isTrue();
            String str = split[2];
            ((AbstractStringAssert) Assertions.assertThat(str).as(str, new Object[0])).contains(new CharSequence[]{"table=[[test-catalog, db1, part, partitions=[{p1=2018-08-10, p2=2018-08-08 08:08:10.1}]"});
            Assertions.assertThat(CollectionUtil.iteratorToList(sqlQuery.execute().collect()).toString()).isEqualTo("[+I[3]]");
            Assertions.assertThat(CollectionUtil.iteratorToList(createTableEnvInBatchMode.sqlQuery("select x from db1.part where timestamp '2018-08-08 08:08:09.1' = p2").execute().collect()).toString()).isEqualTo("[+I[2]]");
            createTableEnvInBatchMode.executeSql("drop database db1 cascade");
        } catch (Throwable th) {
            createTableEnvInBatchMode.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    @Test
    public void testProjectionPushDown() throws Exception {
        batchTableEnv.executeSql("create table src(x int,y string) partitioned by (p1 bigint, p2 string)");
        try {
            HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit("p1=2013, p2='2013'");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src").addRow(new Object[]{3, "c"}).commit("p1=2014, p2='2014'");
            Table sqlQuery = batchTableEnv.sqlQuery("select p1, count(y) from hive.`default`.src group by p1");
            String[] split = sqlQuery.explain(new ExplainDetail[0]).split("==.*==\n");
            Assertions.assertThat(split).hasSize(4);
            String str = split[2];
            ((AbstractStringAssert) Assertions.assertThat(str).as(str, new Object[0])).contains(new CharSequence[]{"table=[[hive, default, src, project=[p1, y]]]"});
            List iteratorToList = CollectionUtil.iteratorToList(sqlQuery.execute().collect());
            Assertions.assertThat(iteratorToList).hasSize(2);
            Assertions.assertThat(iteratorToList.stream().map((v0) -> {
                return v0.toString();
            }).sorted().toArray()).isEqualTo(new String[]{"+I[2013, 2]", "+I[2014, 1]"});
            batchTableEnv.executeSql("drop table src");
        } catch (Throwable th) {
            batchTableEnv.executeSql("drop table src");
            throw th;
        }
    }

    @Test
    public void testLimitPushDown() throws Exception {
        batchTableEnv.executeSql("create table src (a string)");
        try {
            HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src").addRow(new Object[]{"a"}).addRow(new Object[]{"b"}).addRow(new Object[]{"c"}).addRow(new Object[]{"d"}).commit();
            Table sqlQuery = batchTableEnv.sqlQuery("select * from hive.`default`.src limit 1");
            String[] split = sqlQuery.explain(new ExplainDetail[0]).split("==.*==\n");
            Assertions.assertThat(split).hasSize(4);
            String str = split[2];
            ((AbstractStringAssert) Assertions.assertThat(str).as(str, new Object[0])).contains(new CharSequence[]{"table=[[hive, default, src, limit=[1]]]"});
            List iteratorToList = CollectionUtil.iteratorToList(sqlQuery.execute().collect());
            Assertions.assertThat(iteratorToList).hasSize(1);
            Assertions.assertThat(iteratorToList.stream().map((v0) -> {
                return v0.toString();
            }).sorted().toArray()).isEqualTo(new String[]{"+I[a]"});
            batchTableEnv.executeSql("drop table src");
        } catch (Throwable th) {
            batchTableEnv.executeSql("drop table src");
            throw th;
        }
    }

    @Test
    public void testParallelismSetting() throws Exception {
        batchTableEnv.executeSql("CREATE TABLE source_db.test_parallelism (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        testParallelismSettingTranslateAndAssert(2, batchTableEnv.sqlQuery("select * from hive.source_db.test_parallelism"), batchTableEnv);
    }

    @Test
    public void testParallelismSettingWithFileNum() throws IOException {
        File file = Files.createTempDirectory("testParallelismSettingWithFileNum", new FileAttribute[0]).toFile();
        file.deleteOnExit();
        for (int i = 0; i < 3; i++) {
            File file2 = new File(file, "data" + i + ".csv");
            file2.createNewFile();
            FileUtils.writeFileUtf8(file2, "1|100\n2|200\n");
        }
        TableEnvironment createTableEnv = createTableEnv();
        createTableEnv.executeSql("CREATE EXTERNAL TABLE source_db.test_parallelism_setting_with_file_num (a INT, b INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LOCATION '" + file.toString() + "'");
        Table sqlQuery = createTableEnv.sqlQuery("select * from hive.source_db.test_parallelism_setting_with_file_num");
        testParallelismSettingTranslateAndAssert(3, sqlQuery, createTableEnv);
        createTableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 2);
        testParallelismSettingTranslateAndAssert(2, sqlQuery, createTableEnv);
    }

    private void testParallelismSettingTranslateAndAssert(int i, Table table, TableEnvironment tableEnvironment) {
        PlannerBase planner = ((TableEnvironmentImpl) tableEnvironment).getPlanner();
        Assertions.assertThat(((ExecNode) planner.translateToExecNodeGraph(JavaScalaConversionUtil.toScala(Collections.singletonList(planner.optimize(TableTestUtil.toRelNode(table)))), false).getRootNodes().get(0)).translateToPlan(planner).getParallelism()).isEqualTo(i);
    }

    @Test
    public void testParallelismOnLimitPushDown() throws Exception {
        TableEnvironmentImpl createTableEnv = createTableEnv();
        createTableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
        createTableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
        createTableEnv.executeSql("CREATE TABLE source_db.test_parallelism_limit_pushdown (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism_limit_pushdown").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism_limit_pushdown").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        Table sqlQuery = createTableEnv.sqlQuery("select * from hive.source_db.test_parallelism_limit_pushdown limit 1");
        PlannerBase planner = createTableEnv.getPlanner();
        Assertions.assertThat(((Transformation) ((Transformation) ((ExecNode) planner.translateToExecNodeGraph(JavaScalaConversionUtil.toScala(Collections.singletonList(planner.optimize(TableTestUtil.toRelNode(sqlQuery)))), false).getRootNodes().get(0)).translateToPlan(planner).getInputs().get(0)).getInputs().get(0)).getParallelism()).isEqualTo(2);
    }

    @Test
    public void testParallelismWithoutParallelismInfer() throws Exception {
        TableEnvironmentImpl create = TableEnvironment.create(EnvironmentSettings.inBatchMode());
        create.getConfig().setSqlDialect(SqlDialect.HIVE);
        create.registerCatalog("hive", hiveCatalog);
        create.useCatalog("hive");
        create.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
        create.executeSql("CREATE TABLE source_db.test_parallelism_no_infer (`year` STRING, `value` INT) partitioned by (pt int)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism_no_infer").addRow(new Object[]{"2014", 3}).addRow(new Object[]{"2014", 4}).commit("pt=0");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "test_parallelism_no_infer").addRow(new Object[]{"2015", 2}).addRow(new Object[]{"2015", 5}).commit("pt=1");
        Table sqlQuery = create.sqlQuery("select * from hive.source_db.test_parallelism_no_infer limit 1");
        PlannerBase planner = create.getPlanner();
        Assertions.assertThat(((Transformation) ((Transformation) ((ExecNode) planner.translateToExecNodeGraph(JavaScalaConversionUtil.toScala(Collections.singletonList(planner.optimize(TableTestUtil.toRelNode(sqlQuery)))), false).getRootNodes().get(0)).translateToPlan(planner).getInputs().get(0)).getInputs().get(0)).getParallelism()).isEqualTo(((Integer) ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue()).intValue());
    }

    @Test
    public void testSourceConfig() throws Exception {
        Assume.assumeTrue(HiveVersionTestUtil.HIVE_230_OR_LATER);
        Map<String, String> map = System.getenv();
        batchTableEnv.executeSql("create database db1");
        try {
            batchTableEnv.executeSql("create table db1.src (x int,y string) stored as orc");
            batchTableEnv.executeSql("insert into db1.src values (1,'a'),(2,'b')").await();
            testSourceConfig(true, true);
            testSourceConfig(false, false);
        } finally {
            CommonTestUtils.setEnv(map);
            batchTableEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test(timeout = 120000)
    public void testStreamPartitionReadByPartitionName() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(executionEnvironment, SqlDialect.HIVE);
        createTableEnvInStreamingMode.registerCatalog("hive", hiveCatalog);
        createTableEnvInStreamingMode.useCatalog("hive");
        createTableEnvInStreamingMode.executeSql("CREATE TABLE source_db.stream_partition_name_test (x int, y string, z int) PARTITIONED BY ( pt_year int, pt_mon string, pt_day string) TBLPROPERTIES('streaming-source.enable'='true','streaming-source.monitor-interval'='1s','streaming-source.consume-start-offset'='pt_year=2019/pt_month=09/pt_day=02')");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_partition_name_test").addRow(new Object[]{0, "a", 11}).commit("pt_year='2019',pt_mon='09',pt_day='01'");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_partition_name_test").addRow(new Object[]{1, "b", 12}).commit("pt_year='2020',pt_mon='09',pt_day='03'");
        TableResult executeSql = createTableEnvInStreamingMode.executeSql("select * from hive.source_db.stream_partition_name_test");
        CloseableIterator collect = executeSql.collect();
        Assertions.assertThat(fetchRows(collect, 1).get(0)).isEqualTo(Row.of(new Object[]{1, "b", "12", "2020", "09", "03"}).toString());
        for (int i = 2; i < 6; i++) {
            try {
                Thread.sleep(1000L);
                HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_partition_name_test").addRow(new Object[]{Integer.valueOf(i), "new_add", Integer.valueOf(11 + i)}).addRow(new Object[]{Integer.valueOf(i), "new_add_1", Integer.valueOf(11 + i)}).commit("pt_year='2020',pt_mon='10',pt_day='0" + i + "'");
                Assertions.assertThat(fetchRows(collect, 2)).isEqualTo(Arrays.asList(Row.of(new Object[]{Integer.valueOf(i), "new_add", Integer.valueOf(11 + i), "2020", "10", "0" + i}).toString(), Row.of(new Object[]{Integer.valueOf(i), "new_add_1", Integer.valueOf(11 + i), "2020", "10", "0" + i}).toString()));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        ((JobClient) executeSql.getJobClient().get()).cancel();
    }

    @Test(timeout = 120000)
    public void testStreamPartitionReadByCreateTime() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(executionEnvironment, SqlDialect.HIVE);
        createTableEnvInStreamingMode.registerCatalog("hive", hiveCatalog);
        createTableEnvInStreamingMode.useCatalog("hive");
        createTableEnvInStreamingMode.executeSql("CREATE TABLE source_db.stream_create_time_test (x int, y string, z int) PARTITIONED BY ( p1 string, p2 string, p3 string) TBLPROPERTIES('streaming-source.enable'='true','streaming-source.partition-include'='all','streaming-source.consume-order'='create-time','streaming-source.monitor-interval'='1s','streaming-source.consume-start-offset'='2020-10-02 00:00:00')");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_create_time_test").addRow(new Object[]{0, "a", 11}).commit("p1='A1',p2='B1',p3='C1'");
        TableResult executeSql = createTableEnvInStreamingMode.executeSql("select * from hive.source_db.stream_create_time_test");
        CloseableIterator collect = executeSql.collect();
        Assertions.assertThat(fetchRows(collect, 1).get(0)).isEqualTo(Row.of(new Object[]{0, "a", "11", "A1", "B1", "C1"}).toString());
        for (int i = 1; i < 6; i++) {
            try {
                Thread.sleep(1000L);
                HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_create_time_test").addRow(new Object[]{Integer.valueOf(i), "new_add", Integer.valueOf(11 + i)}).addRow(new Object[]{Integer.valueOf(i), "new_add_1", Integer.valueOf(11 + i)}).commit("p1='A',p2='B',p3='" + i + "'");
                Assertions.assertThat(fetchRows(collect, 2)).isEqualTo(Arrays.asList(Row.of(new Object[]{Integer.valueOf(i), "new_add", Integer.valueOf(11 + i), "A", "B", Integer.valueOf(i)}).toString(), Row.of(new Object[]{Integer.valueOf(i), "new_add_1", Integer.valueOf(11 + i), "A", "B", Integer.valueOf(i)}).toString()));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        ((JobClient) executeSql.getJobClient().get()).cancel();
    }

    @Test(timeout = 120000)
    public void testStreamPartitionReadByPartitionTime() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(executionEnvironment, SqlDialect.HIVE);
        createTableEnvInStreamingMode.registerCatalog("hive", hiveCatalog);
        createTableEnvInStreamingMode.useCatalog("hive");
        createTableEnvInStreamingMode.executeSql("CREATE TABLE source_db.stream_test ( a INT, b STRING) PARTITIONED BY (ts STRING) TBLPROPERTIES ('streaming-source.enable'='true','streaming-source.monitor-interval'='1s','streaming-source.consume-order'='partition-time')");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_test").addRow(new Object[]{0, "0"}).commit("ts='2020-05-06 00:00:00'");
        TableResult executeSql = createTableEnvInStreamingMode.executeSql("select * from hive.source_db.stream_test");
        CloseableIterator collect = executeSql.collect();
        Assertions.assertThat(fetchRows(collect, 1).get(0)).isEqualTo(Row.of(new Object[]{0, "0", "2020-05-06 00:00:00"}).toString());
        for (int i = 1; i < 6; i++) {
            try {
                Thread.sleep(1000L);
                HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_test").addRow(new Object[]{Integer.valueOf(i), String.valueOf(i)}).addRow(new Object[]{Integer.valueOf(i), i + "_copy"}).commit("ts='2020-05-06 00:" + i + "0:00'");
                Assertions.assertThat(fetchRows(collect, 2)).isEqualTo(Arrays.asList(Row.of(new Object[]{Integer.valueOf(i), String.valueOf(i), "2020-05-06 00:" + i + "0:00"}).toString(), Row.of(new Object[]{Integer.valueOf(i), i + "_copy", "2020-05-06 00:" + i + "0:00"}).toString()));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        ((JobClient) executeSql.getJobClient().get()).cancel();
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            Assertions.assertThat(it.hasNext()).isTrue();
            arrayList.add(it.next().toString());
        }
        arrayList.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        return arrayList;
    }

    @Test(timeout = 30000)
    public void testNonPartitionStreamingSourceWithMapredReader() throws Exception {
        testNonPartitionStreamingSource(true, "test_mapred_reader");
    }

    @Test(timeout = 30000)
    public void testNonPartitionStreamingSourceWithVectorizedReader() throws Exception {
        testNonPartitionStreamingSource(false, "test_vectorized_reader");
    }

    private void testNonPartitionStreamingSource(Boolean bool, String str) throws Exception {
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(StreamExecutionEnvironment.getExecutionEnvironment(), SqlDialect.HIVE);
        createTableEnvInStreamingMode.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, bool);
        createTableEnvInStreamingMode.registerCatalog("hive", hiveCatalog);
        createTableEnvInStreamingMode.useCatalog("hive");
        createTableEnvInStreamingMode.executeSql("CREATE TABLE source_db." + str + " (  a INT,  b CHAR(1) ) stored as parquet TBLPROPERTIES (  'streaming-source.enable'='true',  'streaming-source.partition-order'='create-time',  'streaming-source.monitor-interval'='100ms')");
        TableResult executeSql = createTableEnvInStreamingMode.executeSql("select * from hive.source_db." + str);
        CloseableIterator collect = executeSql.collect();
        for (int i = 1; i < 3; i++) {
            try {
                Thread.sleep(1000L);
                batchTableEnv.executeSql("insert into table source_db." + str + " values (1,'a'), (2,'b')").await();
                Assertions.assertThat(fetchRows(collect, 2)).isEqualTo(Arrays.asList(Row.of(new Object[]{1, "a"}).toString(), Row.of(new Object[]{2, "b"}).toString()));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        ((JobClient) executeSql.getJobClient().get()).cancel();
    }

    private void testSourceConfig(boolean z, boolean z2) throws Exception {
        HiveDynamicTableFactory hiveDynamicTableFactory = (HiveDynamicTableFactory) Mockito.spy((HiveDynamicTableFactory) hiveCatalog.getFactory().get());
        ((HiveDynamicTableFactory) Mockito.doAnswer(invocationOnMock -> {
            DynamicTableFactory.Context context = (DynamicTableFactory.Context) invocationOnMock.getArgument(0);
            Assertions.assertThat((Boolean) context.getConfiguration().get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)).isEqualTo(z);
            return new TestConfigSource(new JobConf(hiveCatalog.getHiveConf()), context.getConfiguration(), context.getObjectIdentifier().toObjectPath(), context.getCatalogTable(), z2);
        }).when(hiveDynamicTableFactory)).createDynamicTableSource((DynamicTableFactory.Context) ArgumentMatchers.any(DynamicTableFactory.Context.class));
        HiveCatalog hiveCatalog2 = (HiveCatalog) Mockito.spy(hiveCatalog);
        ((HiveCatalog) Mockito.doReturn(Optional.of(hiveDynamicTableFactory)).when(hiveCatalog2)).getTableFactory();
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode();
        createTableEnvInBatchMode.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, Boolean.valueOf(z));
        createTableEnvInBatchMode.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, Boolean.valueOf(z2));
        createTableEnvInBatchMode.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog2.getName(), hiveCatalog2);
        createTableEnvInBatchMode.useCatalog(hiveCatalog2.getName());
        Assertions.assertThat(CollectionUtil.iteratorToList(createTableEnvInBatchMode.sqlQuery("select * from db1.src order by x").execute().collect()).toString()).isEqualTo("[+I[1, a], +I[2, b]]");
    }

    @Test
    public void testParquetCaseInsensitive() throws Exception {
        testCaseInsensitive("parquet");
    }

    private void testCaseInsensitive(String str) throws Exception {
        TableEnvironment createTableEnvWithHiveCatalog = HiveTestUtils.createTableEnvWithHiveCatalog(hiveCatalog);
        String uri = createTempFolder().toURI().toString();
        createTableEnvWithHiveCatalog.executeSql(String.format("create table parquet_t (I int, J int) with ('connector'='filesystem','format'='%s','path'='%s')", str, uri));
        createTableEnvWithHiveCatalog.executeSql("insert into parquet_t select 1, 2").await();
        createTableEnvWithHiveCatalog.executeSql("drop table parquet_t");
        createTableEnvWithHiveCatalog.getConfig().setSqlDialect(SqlDialect.HIVE);
        createTableEnvWithHiveCatalog.executeSql(String.format("create external table parquet_t (i int, j int) stored as %s location '%s'", str, uri));
        Assertions.assertThat(createTableEnvWithHiveCatalog.executeSql("select * from parquet_t").collect().next()).isEqualTo(Row.of(new Object[]{1, 2}));
    }

    @Test(timeout = 120000)
    public void testStreamReadWithProjectPushDown() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(executionEnvironment, SqlDialect.HIVE);
        createTableEnvInStreamingMode.registerCatalog("hive", hiveCatalog);
        createTableEnvInStreamingMode.useCatalog("hive");
        createTableEnvInStreamingMode.executeSql("CREATE TABLE source_db.stream_project_pushdown_test (x int, y string, z int) PARTITIONED BY ( pt_year int, pt_mon string, pt_day string) TBLPROPERTIES('streaming-source.enable'='true','streaming-source.monitor-interval'='1s','streaming-source.consume-start-offset'='pt_year=2019/pt_month=09/pt_day=02')");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_project_pushdown_test").addRow(new Object[]{0, "a", 11}).commit("pt_year='2019',pt_mon='09',pt_day='01'");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_project_pushdown_test").addRow(new Object[]{1, "b", 12}).commit("pt_year='2020',pt_mon='09',pt_day='03'");
        TableResult executeSql = createTableEnvInStreamingMode.executeSql("select x, y from hive.source_db.stream_project_pushdown_test where pt_year = '2020'");
        CloseableIterator collect = executeSql.collect();
        Assertions.assertThat(fetchRows(collect, 1).get(0)).isEqualTo(Row.of(new Object[]{1, "b"}).toString());
        for (int i = 2; i < 6; i++) {
            try {
                Thread.sleep(1000L);
                HiveTestUtils.createTextTableInserter(hiveCatalog, "source_db", "stream_project_pushdown_test").addRow(new Object[]{Integer.valueOf(i), "new_add", Integer.valueOf(11 + i)}).addRow(new Object[]{Integer.valueOf(i), "new_add_1", Integer.valueOf(11 + i)}).commit("pt_year='2020',pt_mon='10',pt_day='0" + i + "'");
                Assertions.assertThat(fetchRows(collect, 2)).isEqualTo(Arrays.asList(Row.of(new Object[]{Integer.valueOf(i), "new_add"}).toString(), Row.of(new Object[]{Integer.valueOf(i), "new_add_1"}).toString()));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        ((JobClient) executeSql.getJobClient().get()).cancel();
    }

    @Test(timeout = 120000)
    public void testReadParquetWithNullableComplexType() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        executionEnvironment.enableCheckpointing(100L);
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(executionEnvironment, SqlDialect.HIVE);
        createTableEnvInStreamingMode.registerCatalog("hive", hiveCatalog);
        createTableEnvInStreamingMode.useCatalog("hive");
        List<Row> generateRows = generateRows();
        List<Row> generateExpectedRows = generateExpectedRows(generateRows);
        createTableEnvInStreamingMode.createTemporaryView("my_table", executionEnvironment.addSource(new FiniteTestSource(generateRows), new RowTypeInfo(new TypeInformation[]{Types.INT, Types.STRING, new RowTypeInfo(new TypeInformation[]{Types.STRING, Types.INT, Types.INT}, new String[]{"c1", "c2", "c3"}), new MapTypeInfo(Types.STRING, Types.STRING), Types.OBJECT_ARRAY(Types.STRING), Types.STRING}, new String[]{"a", "b", "c", "d", "e", "f"})).filter(row -> {
            return true;
        }).setParallelism(3));
        assertResults(executeAndGetResult(createTableEnvInStreamingMode), generateExpectedRows);
    }

    private static List<Row> generateRows() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10000; i++) {
            HashMap hashMap = new HashMap();
            hashMap.put(i + "", i % 2 == 0 ? null : i + "");
            String[] strArr = new String[2];
            strArr[0] = i % 3 == 0 ? null : i + "";
            strArr[1] = i % 3 == 2 ? null : i + "";
            Object[] objArr = new Object[6];
            objArr[0] = Integer.valueOf(i);
            objArr[1] = String.valueOf(i % 10);
            Object[] objArr2 = new Object[3];
            objArr2[0] = i % 2 == 0 ? null : String.valueOf(i % 10);
            objArr2[1] = i % 3 == 0 ? null : Integer.valueOf(i % 10);
            objArr2[2] = i % 5 == 0 ? null : Integer.valueOf(i % 10);
            objArr[2] = Row.of(objArr2);
            objArr[3] = hashMap;
            objArr[4] = strArr;
            objArr[5] = String.valueOf(i % 10);
            arrayList.add(Row.of(objArr));
        }
        return arrayList;
    }

    private static List<Row> generateExpectedRows(List<Row> list) {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(list);
        arrayList.addAll(list);
        arrayList.sort(Comparator.comparingInt(row -> {
            return ((Integer) row.getField(0)).intValue();
        }));
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < arrayList.size(); i++) {
            Row copy = Row.copy((Row) arrayList.get(i));
            Row row2 = (Row) copy.getField(2);
            if (row2.getField(0) == null && row2.getField(1) == null && row2.getField(2) == null) {
                copy.setField(2, (Object) null);
            }
            arrayList2.add(copy);
        }
        return arrayList2;
    }

    private static CloseableIterator<Row> executeAndGetResult(StreamTableEnvironment streamTableEnvironment) throws Exception {
        streamTableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
        streamTableEnvironment.executeSql("CREATE TABLE sink_table (a int, b string,c struct<c1:string, c2:int, c3:int>,d map<string, string>, e array<string>, f string )  stored as parquet TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore,success-file','auto-compaction'='true','compaction.file-size' = '128MB','sink.rolling-policy.file-size' = '1b')");
        streamTableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        streamTableEnvironment.executeSql("insert into sink_table /*+ OPTIONS('sink.parallelism' = '3') */ select * from my_table").await();
        return streamTableEnvironment.executeSql("select * from sink_table").collect();
    }

    private static void assertResults(CloseableIterator<Row> closeableIterator, List<Row> list) throws Exception {
        List iteratorToList = CollectionUtil.iteratorToList(closeableIterator);
        closeableIterator.close();
        iteratorToList.sort(Comparator.comparingInt(row -> {
            return ((Integer) row.getField(0)).intValue();
        }));
        Assertions.assertThat(iteratorToList).isEqualTo(list);
    }

    private static TableEnvironment createTableEnv() {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog("hive", hiveCatalog);
        createTableEnvInBatchMode.useCatalog("hive");
        return createTableEnvInBatchMode;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1015814001:
                if (implMethodName.equals("lambda$testReadParquetWithNullableComplexType$6ecb2930$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/flink/connectors/hive/HiveTableSourceITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Z")) {
                    return row -> {
                        return true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
