package org.apache.flink.connectors.hive;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveRunnerSetup;
import com.klarna.hiverunner.annotations.HiveSQL;
import com.klarna.hiverunner.config.HiveRunnerConfig;
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.ArrayUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(FlinkEmbeddedHiveRunner.class)
/* loaded from: input_file:org/apache/flink/connectors/hive/HiveRunnerITCase.class */
public class HiveRunnerITCase {

    @HiveSQL(files = {})
    private static HiveShell hiveShell;

    @HiveRunnerSetup
    private static final HiveRunnerConfig CONFIG = new HiveRunnerConfig() { // from class: org.apache.flink.connectors.hive.HiveRunnerITCase.1
        {
            getHiveConfSystemOverride().put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname, DbTxnManager.class.getName());
            getHiveConfSystemOverride().put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
            getHiveConfSystemOverride().put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
        }
    };
    private static HiveCatalog hiveCatalog;

    @BeforeClass
    public static void createCatalog() throws IOException {
        hiveCatalog = HiveTestUtils.createHiveCatalog(hiveShell.getHiveConf());
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testInsertIntoNonPartitionTable() throws Exception {
        List<Row> generateRecords = generateRecords(5);
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(generateRecords);
        TableEnvironment createTableEnvWithHiveCatalog = HiveTestUtils.createTableEnvWithHiveCatalog(hiveCatalog);
        createTableEnvWithHiveCatalog.executeSql("create table default_catalog.default_database.src (i int,l bigint,d double,s string) with ('connector'='COLLECTION','is-bounded' = 'true')");
        createTableEnvWithHiveCatalog.getConfig().setSqlDialect(SqlDialect.HIVE);
        createTableEnvWithHiveCatalog.executeSql("create table dest (i int,l bigint,d double,s string)");
        try {
            createTableEnvWithHiveCatalog.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            createTableEnvWithHiveCatalog.executeSql("insert into dest select * from default_catalog.default_database.src").await();
            verifyWrittenData(generateRecords, hiveShell.executeQuery("select * from dest"));
        } finally {
            createTableEnvWithHiveCatalog.executeSql("drop table dest");
        }
    }

    @Test
    public void testWriteComplexType() throws Exception {
        TableEnvironment createTableEnvWithHiveCatalog = HiveTestUtils.createTableEnvWithHiveCatalog(hiveCatalog);
        Row row = new Row(3);
        HashMap hashMap = new HashMap();
        hashMap.put(1, "a");
        hashMap.put(2, "b");
        Row row2 = new Row(2);
        row2.setField(0, 3);
        row2.setField(1, "c");
        row.setField(0, new Object[]{1, 2, 3});
        row.setField(1, hashMap);
        row.setField(2, row2);
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(Collections.singletonList(row));
        createTableEnvWithHiveCatalog.executeSql("create table default_catalog.default_database.complexSrc (a array<int>,m map<int, string>,s row<f1 int,f2 string>) with ('connector'='COLLECTION','is-bounded' = 'true')");
        createTableEnvWithHiveCatalog.getConfig().setSqlDialect(SqlDialect.HIVE);
        createTableEnvWithHiveCatalog.executeSql("create table dest (a array<int>,m map<int, string>,s struct<f1:int,f2:string>)");
        try {
            createTableEnvWithHiveCatalog.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            createTableEnvWithHiveCatalog.executeSql("insert into dest select * from default_catalog.default_database.complexSrc").await();
            List executeQuery = hiveShell.executeQuery("select * from dest");
            Assert.assertEquals(1L, executeQuery.size());
            Assert.assertEquals("[1,2,3]\t{1:\"a\",2:\"b\"}\t{\"f1\":3,\"f2\":\"c\"}", executeQuery.get(0));
            createTableEnvWithHiveCatalog.executeSql("drop table dest");
        } catch (Throwable th) {
            createTableEnvWithHiveCatalog.executeSql("drop table dest");
            throw th;
        }
    }

    @Test
    public void testWriteNestedComplexType() throws Exception {
        TableEnvironment createTableEnvWithHiveCatalog = HiveTestUtils.createTableEnvWithHiveCatalog(hiveCatalog);
        Row row = new Row(1);
        Object[] objArr = new Object[3];
        row.setField(0, objArr);
        for (int i = 0; i < objArr.length; i++) {
            Row row2 = new Row(2);
            row2.setField(0, Integer.valueOf(1 + i));
            row2.setField(1, String.valueOf((char) (97 + i)));
            objArr[i] = row2;
        }
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(Collections.singletonList(row));
        createTableEnvWithHiveCatalog.executeSql("create table default_catalog.default_database.nestedSrc (a array<row<f1 int,f2 string>>) with ('connector'='COLLECTION','is-bounded' = 'true')");
        createTableEnvWithHiveCatalog.getConfig().setSqlDialect(SqlDialect.HIVE);
        createTableEnvWithHiveCatalog.executeSql("create table dest (a array<struct<f1:int,f2:string>>)");
        try {
            createTableEnvWithHiveCatalog.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            createTableEnvWithHiveCatalog.executeSql("insert into dest select * from default_catalog.default_database.nestedSrc").await();
            List executeQuery = hiveShell.executeQuery("select * from dest");
            Assert.assertEquals(1L, executeQuery.size());
            Assert.assertEquals("[{\"f1\":1,\"f2\":\"a\"},{\"f1\":2,\"f2\":\"b\"},{\"f1\":3,\"f2\":\"c\"}]", executeQuery.get(0));
            createTableEnvWithHiveCatalog.executeSql("drop table dest");
        } catch (Throwable th) {
            createTableEnvWithHiveCatalog.executeSql("drop table dest");
            throw th;
        }
    }

    @Test
    public void testWriteNullValues() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("create database db1");
        try {
            createTableEnvInBatchMode.executeSql("create table db1.src(t tinyint,s smallint,i int,b bigint,f float,d double,de decimal(10,5),ts timestamp,dt date,str string,ch char(5),vch varchar(8),bl boolean,bin binary,arr array<int>,mp map<int,string>,strt struct<f1:int,f2:string>)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null}).commit();
            hiveShell.execute("create table db1.dest like db1.src");
            createTableEnvInBatchMode.executeSql("insert into db1.dest select * from db1.src").await();
            List executeQuery = hiveShell.executeQuery("select * from db1.dest");
            Assert.assertEquals(1L, executeQuery.size());
            String[] split = ((String) executeQuery.get(0)).split("\t");
            Assert.assertEquals(17L, split.length);
            Assert.assertEquals("NULL", split[0]);
            Assert.assertEquals(1L, new HashSet(Arrays.asList(split)).size());
            createTableEnvInBatchMode.executeSql("drop database db1 cascade");
        } catch (Throwable th) {
            createTableEnvInBatchMode.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    @Test
    public void testDifferentFormats() throws Exception {
        for (String str : new String[]{"orc", "parquet", "sequencefile", "csv", "avro"}) {
            if (!str.equals("avro") || HiveVersionTestUtil.HIVE_110_OR_LATER) {
                readWriteFormat(str);
            }
        }
    }

    @Test
    public void testDecimal() throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src1 (x decimal(12,2))");
            tableEnvWithHiveCatalog.executeSql("create table db1.src2 (x decimal(12,2))");
            tableEnvWithHiveCatalog.executeSql("create table db1.dest (x decimal(12,2))");
            hiveShell.execute("insert into table db1.src1 values (1.0),(2.12),(5.123),(5.456),(123456789.12)");
            tableEnvWithHiveCatalog.executeSql("insert into db1.src2 values (1.0),(2.12),(5.123),(5.456),(123456789.12)").await();
            verifyHiveQueryResult("select * from db1.src2", hiveShell.executeQuery("select * from db1.src1"));
            tableEnvWithHiveCatalog.executeSql("insert into db1.dest select * from db1.src1").await();
            verifyHiveQueryResult("select * from db1.dest", hiveShell.executeQuery("select * from db1.src1"));
        } finally {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testInsertOverwrite() throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.dest (x int, y string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "dest").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
            verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));
            tableEnvWithHiveCatalog.executeSql("insert overwrite db1.dest values (3, 'c')").await();
            verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("3\tc"));
            tableEnvWithHiveCatalog.executeSql("create table db1.part(x int) partitioned by (y int)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{1}).commit("y=1");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "part").addRow(new Object[]{2}).commit("y=2");
            getTableEnvWithHiveCatalog().executeSql("insert overwrite db1.part partition (y=1) select 100").await();
            verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "2\t2"));
            tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
            tableEnvWithHiveCatalog.executeSql("insert overwrite db1.part values (200,2),(3,3)").await();
            verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "200\t2", "3\t3"));
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        } catch (Throwable th) {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    @Test
    public void testStaticPartition() throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src (x int)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{1}).addRow(new Object[]{2}).commit();
            tableEnvWithHiveCatalog.executeSql("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
            tableEnvWithHiveCatalog.executeSql("insert into db1.dest partition (p1='1\\'1', p2=1.1) select x from db1.src").await();
            Assert.assertEquals(1L, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size());
            verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1'1\t1.1", "2\t1'1\t1.1"));
        } finally {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testDynamicPartition() throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src (x int, y string, z double)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{1, "a", Double.valueOf(1.1d)}).addRow(new Object[]{2, "a", Double.valueOf(2.2d)}).addRow(new Object[]{3, "b", Double.valueOf(3.3d)}).commit();
            tableEnvWithHiveCatalog.executeSql("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
            tableEnvWithHiveCatalog.executeSql("insert into db1.dest select * from db1.src").await();
            Assert.assertEquals(3L, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size());
            verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta\t1.1", "2\ta\t2.2", "3\tb\t3.3"));
        } finally {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testPartialDynamicPartition() throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src (x int, y string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
            tableEnvWithHiveCatalog.executeSql("create table db1.dest (x int) partitioned by (p1 double, p2 string)");
            tableEnvWithHiveCatalog.executeSql("insert into db1.dest partition (p1=1.1,p2) select x,y from db1.src").await();
            Assert.assertEquals(2L, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size());
            verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1.1\ta", "2\t1.1\tb"));
        } finally {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testBatchCompressTextTable() throws Exception {
        testCompressTextTable(true);
    }

    @Test
    public void testStreamCompressTextTable() throws Exception {
        testCompressTextTable(false);
    }

    @Test
    public void testTimestamp() throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src (ts timestamp)");
            tableEnvWithHiveCatalog.executeSql("create table db1.dest (ts timestamp)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{Timestamp.valueOf("2019-11-11 00:00:00")}).addRow(new Object[]{Timestamp.valueOf("2019-12-03 15:43:32.123456789")}).commit();
            List iteratorToList = CollectionUtil.iteratorToList(tableEnvWithHiveCatalog.sqlQuery("select * from db1.src").execute().collect());
            Assert.assertEquals(2L, iteratorToList.size());
            Assert.assertEquals(LocalDateTime.of(2019, 11, 11, 0, 0), ((Row) iteratorToList.get(0)).getField(0));
            Assert.assertEquals(LocalDateTime.of(2019, 12, 3, 15, 43, 32, 123456789), ((Row) iteratorToList.get(1)).getField(0));
            tableEnvWithHiveCatalog.executeSql("insert into db1.dest select max(ts) from db1.src").await();
            verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("2019-12-03 15:43:32.123456789"));
        } finally {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testDate() throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src (dt date)");
            tableEnvWithHiveCatalog.executeSql("create table db1.dest (dt date)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{Date.valueOf("2019-12-09")}).addRow(new Object[]{Date.valueOf("2019-12-12")}).commit();
            List iteratorToList = CollectionUtil.iteratorToList(tableEnvWithHiveCatalog.sqlQuery("select * from db1.src").execute().collect());
            Assert.assertEquals(2L, iteratorToList.size());
            Assert.assertEquals(LocalDate.of(2019, 12, 9), ((Row) iteratorToList.get(0)).getField(0));
            Assert.assertEquals(LocalDate.of(2019, 12, 12), ((Row) iteratorToList.get(1)).getField(0));
            tableEnvWithHiveCatalog.executeSql("insert into db1.dest select max(dt) from db1.src").await();
            verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("2019-12-12"));
        } finally {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testViews() throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src (key int,val string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{1, "a"}).addRow(new Object[]{1, "aa"}).addRow(new Object[]{1, "aaa"}).addRow(new Object[]{2, "b"}).addRow(new Object[]{3, "c"}).addRow(new Object[]{3, "ccc"}).commit();
            tableEnvWithHiveCatalog.executeSql("create table db1.keys (key int,name string)");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "keys").addRow(new Object[]{1, "key1"}).addRow(new Object[]{2, "key2"}).addRow(new Object[]{3, "key3"}).addRow(new Object[]{4, "key4"}).commit();
            hiveShell.execute("create view db1.v1 as select key as k,val as v from db1.src limit 2");
            hiveShell.execute("create view db1.v2 as select key,count(*) from db1.src group by key having count(*)>1 order by key");
            hiveShell.execute("create view db1.v3 as select k.key,k.name,count(*) from db1.src s join db1.keys k on s.key=k.key group by k.key,k.name order by k.key");
            Assert.assertEquals("[+I[2]]", CollectionUtil.iteratorToList(tableEnvWithHiveCatalog.sqlQuery("select count(v) from db1.v1").execute().collect()).toString());
            Assert.assertEquals("[+I[1, 3], +I[3, 2]]", CollectionUtil.iteratorToList(tableEnvWithHiveCatalog.sqlQuery("select * from db1.v2").execute().collect()).toString());
            Assert.assertEquals("[+I[1, key1, 3], +I[2, key2, 1], +I[3, key3, 2]]", CollectionUtil.iteratorToList(tableEnvWithHiveCatalog.sqlQuery("select * from db1.v3").execute().collect()).toString());
        } finally {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testWhitespacePartValue() throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.dest (x int) partitioned by (p string)");
            StatementSet createStatementSet = tableEnvWithHiveCatalog.createStatementSet();
            createStatementSet.addInsertSql("insert into db1.dest select 1,'  '");
            createStatementSet.addInsertSql("insert into db1.dest select 2,'a \t'");
            createStatementSet.execute().await();
            Assert.assertEquals("[p=  , p=a %09]", hiveShell.executeQuery("show partitions db1.dest").toString());
        } finally {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testBatchTransactionalTable() {
        testTransactionalTable(true);
    }

    @Test
    public void testStreamTransactionalTable() {
        testTransactionalTable(false);
    }

    @Test
    public void testOrcSchemaEvol() throws Exception {
        Assume.assumeTrue(HiveVersionTestUtil.HIVE_210_OR_LATER);
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src (x smallint,y int) stored as orc");
            hiveShell.execute("insert into table db1.src values (1,100),(2,200)");
            tableEnvWithHiveCatalog.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
            tableEnvWithHiveCatalog.executeSql("alter table db1.src change x x int");
            Assert.assertEquals("[+I[1, 100], +I[2, 200]]", CollectionUtil.iteratorToList(tableEnvWithHiveCatalog.sqlQuery("select * from db1.src").execute().collect()).toString());
            tableEnvWithHiveCatalog.executeSql("alter table db1.src change y y string");
            Assert.assertEquals("[+I[1, 100], +I[2, 200]]", CollectionUtil.iteratorToList(tableEnvWithHiveCatalog.sqlQuery("select * from db1.src").execute().collect()).toString());
        } finally {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        }
    }

    private void testTransactionalTable(boolean z) {
        TableEnvironment tableEnvWithHiveCatalog = z ? getTableEnvWithHiveCatalog() : getStreamTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src (x string,y string)");
            hiveShell.execute("create table db1.dest (x string,y string) clustered by (x) into 3 buckets stored as orc tblproperties ('transactional'='true')");
            ArrayList arrayList = new ArrayList();
            try {
                tableEnvWithHiveCatalog.executeSql("insert into db1.src select * from db1.dest").await();
            } catch (Exception e) {
                arrayList.add(e);
            }
            try {
                tableEnvWithHiveCatalog.executeSql("insert into db1.dest select * from db1.src").await();
            } catch (Exception e2) {
                arrayList.add(e2);
            }
            Assert.assertEquals(2L, arrayList.size());
            arrayList.forEach(exc -> {
                Assert.assertTrue(exc instanceof FlinkHiveException);
                Assert.assertEquals("Reading or writing ACID table db1.dest is not supported.", exc.getMessage());
            });
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        } catch (Throwable th) {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    private void testCompressTextTable(boolean z) throws Exception {
        TableEnvironment tableEnvWithHiveCatalog = z ? getTableEnvWithHiveCatalog() : getStreamTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        try {
            tableEnvWithHiveCatalog.executeSql("create table db1.src (x string,y string)");
            hiveShell.execute("create table db1.dest like db1.src");
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src").addRow(new Object[]{"a", "b"}).addRow(new Object[]{"c", "d"}).commit();
            hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.COMPRESSRESULT, true);
            tableEnvWithHiveCatalog.executeSql("insert into db1.dest select * from db1.src").await();
            List asList = Arrays.asList("a\tb", "c\td");
            verifyHiveQueryResult("select * from db1.dest", asList);
            verifyFlinkQueryResult(tableEnvWithHiveCatalog.sqlQuery("select * from db1.dest"), asList);
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
        } catch (Throwable th) {
            tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    private static TableEnvironment getTableEnvWithHiveCatalog() {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        return createTableEnvInBatchMode;
    }

    private TableEnvironment getStreamTableEnvWithHiveCatalog() {
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(StreamExecutionEnvironment.getExecutionEnvironment(), SqlDialect.HIVE);
        createTableEnvInStreamingMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInStreamingMode.useCatalog(hiveCatalog.getName());
        return createTableEnvInStreamingMode;
    }

    private void readWriteFormat(String str) throws Exception {
        Object obj;
        TableEnvironment tableEnvWithHiveCatalog = getTableEnvWithHiveCatalog();
        tableEnvWithHiveCatalog.executeSql("create database db1");
        String str2 = str.equals("csv") ? "row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'" : "stored as " + str;
        ArrayList arrayList = new ArrayList(Arrays.asList(1, "a", "2018-08-20 00:00:00.1"));
        ArrayList arrayList2 = new ArrayList(Arrays.asList(2, "b", "2019-08-26 00:00:00.1"));
        if (HiveVersionTestUtil.HIVE_120_OR_LATER || !str.equals("parquet")) {
            obj = "(i int,s string,ts timestamp,dt date)";
            arrayList.add("2018-08-20");
            arrayList2.add("2019-08-26");
        } else {
            obj = "(i int,s string,ts timestamp)";
        }
        tableEnvWithHiveCatalog.executeSql(String.format("create table db1.src %s partitioned by (p1 string, p2 timestamp) %s", obj, str2));
        tableEnvWithHiveCatalog.executeSql(String.format("create table db1.dest %s partitioned by (p1 string, p2 timestamp) %s", obj, str2));
        hiveShell.execute(String.format("insert into table db1.src partition(p1='first',p2='2018-08-20 00:00:00.1') values (%s)", toRowValue(arrayList)));
        hiveShell.execute(String.format("insert into table db1.src partition(p1='second',p2='2018-08-26 00:00:00.1') values (%s)", toRowValue(arrayList2)));
        List asList = Arrays.asList(String.join("\t", ArrayUtils.concat((String[]) arrayList.stream().map((v0) -> {
            return v0.toString();
        }).toArray(i -> {
            return new String[i];
        }), new String[]{"first", "2018-08-20 00:00:00.1"})), String.join("\t", ArrayUtils.concat((String[]) arrayList2.stream().map((v0) -> {
            return v0.toString();
        }).toArray(i2 -> {
            return new String[i2];
        }), new String[]{"second", "2018-08-26 00:00:00.1"})));
        verifyFlinkQueryResult(tableEnvWithHiveCatalog.sqlQuery("select * from db1.src"), asList);
        if (!str.equals("orc") || !HiveShimLoader.getHiveVersion().startsWith("2.0")) {
            tableEnvWithHiveCatalog.executeSql("insert into db1.dest select * from db1.src").await();
            verifyHiveQueryResult("select * from db1.dest", asList);
        }
        tableEnvWithHiveCatalog.executeSql("drop database db1 cascade");
    }

    private static void verifyWrittenData(List<Row> list, List<String> list2) throws Exception {
        Assert.assertEquals(list.size(), list2.size());
        HashSet hashSet = new HashSet();
        for (int i = 0; i < list2.size(); i++) {
            String row = list.get(i).toString();
            hashSet.add(row.substring(3, row.length() - 1).replaceAll(", ", "\t"));
        }
        Assert.assertEquals(hashSet, new HashSet(list2));
    }

    private static List<Row> generateRecords(int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            Row row = new Row(4);
            row.setField(0, Integer.valueOf(i2));
            row.setField(1, Long.valueOf(i2));
            row.setField(2, Double.valueOf(String.valueOf(String.format("%d.%d", Integer.valueOf(i2), Integer.valueOf(i2)))));
            row.setField(3, String.valueOf((char) (97 + i2)));
            arrayList.add(row);
        }
        return arrayList;
    }

    private static void verifyHiveQueryResult(String str, List<String> list) {
        List executeQuery = hiveShell.executeQuery(str);
        Assert.assertEquals(list.size(), executeQuery.size());
        Assert.assertEquals(new HashSet(list), new HashSet(executeQuery));
    }

    private static void verifyFlinkQueryResult(Table table, List<String> list) throws Exception {
        List list2 = (List) CollectionUtil.iteratorToList(table.execute().collect()).stream().map(row -> {
            IntStream range = IntStream.range(0, row.getArity());
            row.getClass();
            return (String) range.mapToObj(row::getField).map(obj -> {
                return obj instanceof LocalDateTime ? Timestamp.valueOf((LocalDateTime) obj) : obj;
            }).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining("\t"));
        }).collect(Collectors.toList());
        Assert.assertEquals(list.size(), list2.size());
        Assert.assertEquals(new HashSet(list), new HashSet(list2));
    }

    private static String toRowValue(List<Object> list) {
        return (String) list.stream().map(obj -> {
            String obj = obj.toString();
            if (obj instanceof String) {
                obj = "'" + obj + "'";
            }
            return obj;
        }).collect(Collectors.joining(","));
    }

    @Test
    public void testCatalogLock() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("create database db1");
        try {
            createTableEnvInBatchMode.useDatabase("db1");
            createTableEnvInBatchMode.executeSql("create table src (x int) with ('connector'='datagen','number-of-rows'='2')");
            createTableEnvInBatchMode.executeSql("create table lock_t (x int) with ('connector'='test-lock')");
            createTableEnvInBatchMode.executeSql("insert into lock_t select * from src").await();
        } finally {
            createTableEnvInBatchMode.executeSql("drop database db1 cascade");
        }
    }
}
