package org.apache.flink.connectors.hive;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSinkITCase.class */
public class HiveTableSinkITCase {
    private static HiveCatalog hiveCatalog;

    @BeforeClass
    public static void createCatalog() throws IOException {
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testHiveTableSinkWithParallelismInBatch() {
        testHiveTableSinkWithParallelismBase(HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE), "/explain/testHiveTableSinkWithParallelismInBatch.out");
    }

    @Test
    public void testHiveTableSinkWithParallelismInStreaming() {
        testHiveTableSinkWithParallelismBase(HiveTestUtils.createTableEnvInStreamingMode(StreamExecutionEnvironment.getExecutionEnvironment(), SqlDialect.HIVE), "/explain/testHiveTableSinkWithParallelismInStreaming.out");
    }

    private void testHiveTableSinkWithParallelismBase(TableEnvironment tableEnvironment, String str) {
        tableEnvironment.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        tableEnvironment.useCatalog(hiveCatalog.getName());
        tableEnvironment.executeSql("create database db1");
        tableEnvironment.useDatabase("db1");
        tableEnvironment.executeSql(String.format("CREATE TABLE test_table ( id int, real_col int) TBLPROPERTIES ( 'sink.parallelism' = '8')", new Object[0]));
        tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        Assertions.assertThat(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(tableEnvironment.explainSql("insert into test_table select 1, 1", new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN}))))).isEqualTo(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(TableTestUtil.readFromResource(str)))));
        tableEnvironment.executeSql("drop database db1 cascade");
    }

    @Test
    public void testBatchAppend() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("create database db1");
        createTableEnvInBatchMode.useDatabase("db1");
        try {
            createTableEnvInBatchMode.executeSql("create table append_table (i int, j int)");
            createTableEnvInBatchMode.executeSql("insert into append_table select 1, 1").await();
            createTableEnvInBatchMode.executeSql("insert into append_table select 2, 2").await();
            List iteratorToList = CollectionUtil.iteratorToList(createTableEnvInBatchMode.executeSql("select * from append_table").collect());
            iteratorToList.sort(Comparator.comparingInt(row -> {
                return ((Integer) row.getField(0)).intValue();
            }));
            Assertions.assertThat(iteratorToList).isEqualTo(Arrays.asList(Row.of(new Object[]{1, 1}), Row.of(new Object[]{2, 2})));
        } finally {
            createTableEnvInBatchMode.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testDefaultSerPartStreamingWrite() throws Exception {
        testStreamingWrite(true, false, "textfile", this::checkSuccessFiles);
    }

    @Test
    public void testPartStreamingWrite() throws Exception {
        testStreamingWrite(true, false, "parquet", this::checkSuccessFiles);
        if (hiveCatalog.getHiveVersion().startsWith("2.")) {
            return;
        }
        testStreamingWrite(true, false, "orc", this::checkSuccessFiles);
    }

    @Test
    public void testNonPartStreamingWrite() throws Exception {
        testStreamingWrite(false, false, "parquet", str -> {
        });
        if (hiveCatalog.getHiveVersion().startsWith("2.")) {
            return;
        }
        testStreamingWrite(false, false, "orc", str2 -> {
        });
    }

    @Test
    public void testPartStreamingMrWrite() throws Exception {
        testStreamingWrite(true, true, "parquet", this::checkSuccessFiles);
        if (hiveCatalog.getHiveVersion().startsWith("2.0")) {
            return;
        }
        testStreamingWrite(true, true, "orc", this::checkSuccessFiles);
    }

    @Test
    public void testNonPartStreamingMrWrite() throws Exception {
        testStreamingWrite(false, true, "parquet", str -> {
        });
        if (hiveCatalog.getHiveVersion().startsWith("2.0")) {
            return;
        }
        testStreamingWrite(false, true, "orc", str2 -> {
        });
    }

    @Test
    public void testStreamingAppend() throws Exception {
        testStreamingWrite(false, false, "parquet", str -> {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(1);
            StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(executionEnvironment);
            createTableEnvInStreamingMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
            createTableEnvInStreamingMode.useCatalog(hiveCatalog.getName());
            try {
                createTableEnvInStreamingMode.executeSql("insert into db1.sink_table select 6,'a','b','2020-05-03','12'").await();
            } catch (Exception e) {
                Assertions.fail("Failed to execute sql: " + e.getMessage());
            }
            assertBatch("db1.sink_table", Arrays.asList("+I[1, a, b, 2020-05-03, 7]", "+I[1, a, b, 2020-05-03, 7]", "+I[2, p, q, 2020-05-03, 8]", "+I[2, p, q, 2020-05-03, 8]", "+I[3, x, y, 2020-05-03, 9]", "+I[3, x, y, 2020-05-03, 9]", "+I[4, x, y, 2020-05-03, 10]", "+I[4, x, y, 2020-05-03, 10]", "+I[5, x, y, 2020-05-03, 11]", "+I[5, x, y, 2020-05-03, 11]", "+I[6, a, b, 2020-05-03, 12]"));
        });
    }

    @Test
    public void testStreamingSinkWithTimestampLtzWatermark() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(100L);
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(executionEnvironment);
        createTableEnvInStreamingMode.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
        createTableEnvInStreamingMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInStreamingMode.useCatalog(hiveCatalog.getName());
        createTableEnvInStreamingMode.getConfig().setSqlDialect(SqlDialect.HIVE);
        try {
            createTableEnvInStreamingMode.executeSql("create database db1");
            createTableEnvInStreamingMode.useDatabase("db1");
            createTableEnvInStreamingMode.executeSql("create external table source_table ( a int, b string, c string, epoch_ts bigint) partitioned by ( pt_day string, pt_hour string) TBLPROPERTIES('partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00','streaming-source.enable'='true','streaming-source.monitor-interval'='1s','streaming-source.consume-order'='partition-time')");
            createTableEnvInStreamingMode.executeSql("create external table sink_table ( a int, b string, c string) partitioned by ( d string, e string) TBLPROPERTIES( 'partition.time-extractor.timestamp-pattern' = '$d $e:00:00', 'auto-compaction'='true', 'compaction.file-size' = '128MB', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='30min', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.partition-commit.success-file.name'='_MY_SUCCESS', 'streaming-source.enable'='true', 'streaming-source.monitor-interval'='1s', 'streaming-source.consume-order'='partition-time')");
            createTableEnvInStreamingMode.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            createTableEnvInStreamingMode.createTemporaryView("my_table", createTableEnvInStreamingMode.fromDataStream(createTableEnvInStreamingMode.toDataStream(createTableEnvInStreamingMode.sqlQuery("select a, b, c, epoch_ts, pt_day, pt_hour from source_table")), Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()).column("c", DataTypes.STRING()).column("epoch_ts", DataTypes.BIGINT()).column("pt_day", DataTypes.STRING()).column("pt_hour", DataTypes.STRING()).columnByExpression("ts_ltz", Expressions.callSql("TO_TIMESTAMP_LTZ(epoch_ts, 3)")).watermark("ts_ltz", "ts_ltz - INTERVAL '1' SECOND").build()));
            HashMap hashMap = new HashMap();
            hashMap.put(1, new Object[]{1, "a", "b", 1588461300000L});
            hashMap.put(2, new Object[]{1, "a", "b", 1588463100000L});
            hashMap.put(3, new Object[]{2, "p", "q", 1588464300000L});
            hashMap.put(4, new Object[]{2, "p", "q", 1588466400000L});
            hashMap.put(5, new Object[]{3, "x", "y", 1588468800000L});
            hashMap.put(6, new Object[]{3, "x", "y", 1588470900000L});
            hashMap.put(7, new Object[]{4, "x", "y", 1588471800000L});
            hashMap.put(8, new Object[]{4, "x", "y", 1588473300000L});
            hashMap.put(9, new Object[]{5, "x", "y", 1588476300000L});
            hashMap.put(10, new Object[]{5, "x", "y", 1588477800000L});
            HashMap hashMap2 = new HashMap();
            hashMap2.put(1, "pt_day='2020-05-03',pt_hour='7'");
            hashMap2.put(2, "pt_day='2020-05-03',pt_hour='8'");
            hashMap2.put(3, "pt_day='2020-05-03',pt_hour='9'");
            hashMap2.put(4, "pt_day='2020-05-03',pt_hour='10'");
            hashMap2.put(5, "pt_day='2020-05-03',pt_hour='11'");
            HashMap hashMap3 = new HashMap();
            hashMap3.put(1, new Object[]{1, "a", "b", "2020-05-03", "7"});
            hashMap3.put(2, new Object[]{2, "p", "q", "2020-05-03", "8"});
            hashMap3.put(3, new Object[]{3, "x", "y", "2020-05-03", "9"});
            hashMap3.put(4, new Object[]{4, "x", "y", "2020-05-03", "10"});
            hashMap3.put(5, new Object[]{5, "x", "y", "2020-05-03", "11"});
            createTableEnvInStreamingMode.executeSql("insert into sink_table select a, b, c, pt_day, pt_hour from my_table");
            CloseableIterator collect = createTableEnvInStreamingMode.executeSql("select * from sink_table").collect();
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table").addRow((Object[]) hashMap.get(1)).addRow((Object[]) hashMap.get(2)).commit((String) hashMap2.get(1));
            for (int i = 2; i < 7; i++) {
                try {
                    Thread.sleep(1000L);
                    Assertions.assertThat(fetchRows(collect, 2)).isEqualTo(Arrays.asList(Row.of((Object[]) hashMap3.get(Integer.valueOf(i - 1))).toString(), Row.of((Object[]) hashMap3.get(Integer.valueOf(i - 1))).toString()));
                    if (i < 6) {
                        HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table").addRow((Object[]) hashMap.get(Integer.valueOf((2 * i) - 1))).addRow((Object[]) hashMap.get(Integer.valueOf(2 * i))).commit((String) hashMap2.get(Integer.valueOf(i)));
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            checkSuccessFiles(URI.create(hiveCatalog.getHiveTable(ObjectPath.fromString("db1.sink_table")).getSd().getLocation()).getPath());
            createTableEnvInStreamingMode.executeSql("drop database db1 cascade");
        } catch (Throwable th) {
            createTableEnvInStreamingMode.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    @Test
    public void testStreamingSinkWithoutCommitPolicy() throws Exception {
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(StreamExecutionEnvironment.getExecutionEnvironment());
        createTableEnvInStreamingMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInStreamingMode.useCatalog(hiveCatalog.getName());
        createTableEnvInStreamingMode.executeSql("create database db1");
        try {
            try {
                createTableEnvInStreamingMode.useDatabase("db1");
                createTableEnvInStreamingMode.getConfig().setSqlDialect(SqlDialect.HIVE);
                createTableEnvInStreamingMode.executeSql("create table dest(x int) partitioned by (p string)");
                createTableEnvInStreamingMode.getConfig().setSqlDialect(SqlDialect.DEFAULT);
                createTableEnvInStreamingMode.executeSql("create table src (i int, p string) with ('connector'='datagen','number-of-rows'='5')");
                createTableEnvInStreamingMode.executeSql("insert into dest select * from src").await();
                Assertions.fail("Streaming write partitioned table without commit policy should fail");
                createTableEnvInStreamingMode.executeSql("drop database db1 cascade");
            } catch (FlinkHiveException e) {
                Assertions.assertThat(e.getMessage()).contains(new CharSequence[]{String.format("Streaming write to partitioned hive table `%s`.`%s`.`%s` without providing a commit policy", hiveCatalog.getName(), "db1", "dest")});
                createTableEnvInStreamingMode.executeSql("drop database db1 cascade");
            }
        } catch (Throwable th) {
            createTableEnvInStreamingMode.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    @Test
    public void testCustomPartitionCommitPolicyNotFound() {
        String str = "NotExistPartitionCommitPolicyClass";
        Assertions.assertThatThrownBy(() -> {
            testStreamingWriteWithCustomPartitionCommitPolicy(str);
        }).hasStackTraceContaining("Can not create new instance for custom class from NotExistPartitionCommitPolicyClass");
    }

    @Test
    public void testCustomPartitionCommitPolicy() throws Exception {
        testStreamingWriteWithCustomPartitionCommitPolicy(TestCustomCommitPolicy.class.getName());
    }

    @Test
    public void testWritingNoDataToPartition() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("CREATE TABLE src_table (name string) PARTITIONED BY (`dt` string)");
        createTableEnvInBatchMode.executeSql("CREATE TABLE target_table (name string) PARTITIONED BY (`dt` string)");
        createTableEnvInBatchMode.executeSql("INSERT INTO target_table partition (dt='2022-07-27') SELECT name FROM src_table where dt = '2022-07-27'").await();
        List iteratorToList = CollectionUtil.iteratorToList(createTableEnvInBatchMode.executeSql("show partitions target_table").collect());
        Assertions.assertThat(iteratorToList).hasSize(1);
        Assertions.assertThat(iteratorToList.toString()).contains(new CharSequence[]{"dt=2022-07-27"});
        createTableEnvInBatchMode.executeSql("INSERT OVERWRITE target_table partition (dt='2022-07-28') SELECT name FROM src_table where dt = '2022-07-28'").await();
        List iteratorToList2 = CollectionUtil.iteratorToList(createTableEnvInBatchMode.executeSql("show partitions target_table").collect());
        Assertions.assertThat(iteratorToList2).hasSize(2);
        Assertions.assertThat(iteratorToList2.toString()).contains(new CharSequence[]{"dt=2022-07-28"});
        createTableEnvInBatchMode.executeSql("INSERT INTO target_table partition (dt='2022-07-29') VALUES ('zm')").await();
        assertBatch("target_table", Arrays.asList("+I[zm, 2022-07-29]"));
        createTableEnvInBatchMode.executeSql("INSERT INTO target_table partition (dt='2022-07-29') SELECT name FROM src_table where dt = '2022-07-29'").await();
        List iteratorToList3 = CollectionUtil.iteratorToList(createTableEnvInBatchMode.executeSql("show partitions target_table").collect());
        Assertions.assertThat(iteratorToList3).hasSize(3);
        Assertions.assertThat(iteratorToList3.toString()).contains(new CharSequence[]{"dt=2022-07-29"});
        assertBatch("target_table", Arrays.asList("+I[zm, 2022-07-29]"));
        createTableEnvInBatchMode.executeSql("INSERT OVERWRITE target_table partition (dt='2022-07-29') SELECT name FROM src_table where dt = '2022-07-29'").await();
        List iteratorToList4 = CollectionUtil.iteratorToList(createTableEnvInBatchMode.executeSql("show partitions target_table").collect());
        Assertions.assertThat(iteratorToList4).hasSize(3);
        Assertions.assertThat(iteratorToList4.toString()).contains(new CharSequence[]{"dt=2022-07-29"});
        assertBatch("target_table", Arrays.asList(new String[0]));
        createTableEnvInBatchMode.executeSql("create table partition_table(`name` string) partitioned by (`p_date` string, `p_hour` string)");
        createTableEnvInBatchMode.executeSql("create table test_src_table(`name` string, `hour` string, age int)");
        createTableEnvInBatchMode.executeSql("insert overwrite table partition_table partition(`p_date`='20220816', `p_hour`) select `name`, `hour` from test_src_table").await();
        Assertions.assertThat(CollectionUtil.iteratorToList(createTableEnvInBatchMode.executeSql("show partitions partition_table").collect())).hasSize(0);
    }

    @Test
    public void testSortByDynamicPartitionEnableConfigurationInBatchMode() {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode();
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        try {
            createTableEnvInBatchMode.executeSql(String.format("create table dynamic_partition_t(a int, b int, d string) partitioned by (d) with ('connector' = 'hive', '%s' = 'metastore')", FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
            Assertions.assertThat(createTableEnvInBatchMode.explainSql("insert into dynamic_partition_t select 1, 1, 'd'", new ExplainDetail[0])).isEqualTo(TableTestUtil.readFromResource("/explain/testDynamicPartitionSortEnabled.out"));
            createTableEnvInBatchMode.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED, false);
            Assertions.assertThat(createTableEnvInBatchMode.explainSql("insert into dynamic_partition_t select 1, 1, 'd'", new ExplainDetail[0])).isEqualTo(TableTestUtil.readFromResource("/explain/testDynamicPartitionSortDisabled.out"));
        } finally {
            createTableEnvInBatchMode.executeSql("drop table dynamic_partition_t");
        }
    }

    @Test
    public void testWriteSuccessFile() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        String str = (String) createTableEnvInBatchMode.getConfig().get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME);
        String str2 = hiveCatalog.getHiveConf().get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
        createTableEnvInBatchMode.executeSql("CREATE TABLE zm_test_non_partition_table (name string)");
        createTableEnvInBatchMode.executeSql("CREATE TABLE zm_test_partition_table (name string) PARTITIONED BY (`dt` string)");
        createTableEnvInBatchMode.executeSql("INSERT INTO zm_test_partition_table partition (dt='2022-07-27') values ('zm')").await();
        Assertions.assertThat(new File(str2 + "/zm_test_partition_table", "dt=2022-07-27/" + str)).exists();
        createTableEnvInBatchMode.executeSql("INSERT INTO zm_test_non_partition_table values ('zm')").await();
        Assertions.assertThat(new File(str2 + "/zm_test_non_partition_table", str)).exists();
        createTableEnvInBatchMode.executeSql("CREATE TABLE zm_test_partition_table_only_meta (name string) PARTITIONED BY (`dt` string) TBLPROPERTIES ('sink.partition-commit.policy.kind' = 'metastore')");
        createTableEnvInBatchMode.executeSql("INSERT INTO zm_test_partition_table_only_meta partition (dt='2022-08-15') values ('zm')").await();
        Assertions.assertThat(new File(str2 + "/zm_test_partition_table_only_meta", "dt=2022-08-15/" + str)).doesNotExist();
        createTableEnvInBatchMode.executeSql("CREATE TABLE zm_test_partition_table_success_file (name string) PARTITIONED BY (`dt` string) TBLPROPERTIES ('sink.partition-commit.success-file.name' = '_ZM')");
        String str3 = str2 + "/zm_test_partition_table_success_file";
        createTableEnvInBatchMode.executeSql("INSERT INTO zm_test_partition_table_success_file partition (dt='2022-08-15') values ('zm')").await();
        Assertions.assertThat(new File(str3, "dt=2022-08-15/" + str)).doesNotExist();
        Assertions.assertThat(new File(str3, "dt=2022-08-15/_ZM")).exists();
    }

    @Test
    public void testAutoGatherStatisticForBatchWriting() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        String var = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
        createTableEnvInBatchMode.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE, false);
        createTableEnvInBatchMode.executeSql("create table t1(x int)");
        createTableEnvInBatchMode.executeSql("create table t2(x int) stored as orc");
        createTableEnvInBatchMode.executeSql("create table t3(x int) stored as parquet");
        createTableEnvInBatchMode.executeSql("insert into t1 values (1)").await();
        createTableEnvInBatchMode.executeSql("insert into t2 values (1)").await();
        createTableEnvInBatchMode.executeSql("insert into t3 values (1)").await();
        for (int i = 1; i <= 3; i++) {
            Assertions.assertThat(hiveCatalog.getTableStatistics(new ObjectPath("default", "t" + i))).isEqualTo(CatalogTableStatistics.UNKNOWN);
        }
        createTableEnvInBatchMode.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE, true);
        createTableEnvInBatchMode.executeSql("insert into t1 values (1)").await();
        createTableEnvInBatchMode.executeSql("insert into t2 values (1)").await();
        createTableEnvInBatchMode.executeSql("insert into t3 values (1)").await();
        Assertions.assertThat(hiveCatalog.getTableStatistics(new ObjectPath("default", "t1"))).isEqualTo(new CatalogTableStatistics(-1L, 2, getPathSize(Paths.get(var, "t1")), -1L));
        Assertions.assertThat(hiveCatalog.getTableStatistics(new ObjectPath("default", "t2"))).isEqualTo(new CatalogTableStatistics(2L, 2, getPathSize(Paths.get(var, "t2")), 8L));
        Assertions.assertThat(hiveCatalog.getTableStatistics(new ObjectPath("default", "t3"))).isEqualTo(new CatalogTableStatistics(2L, 2, getPathSize(Paths.get(var, "t3")), 66L));
        createTableEnvInBatchMode.executeSql("create table pt1(x int) partitioned by (y int)");
        createTableEnvInBatchMode.executeSql("create table pt2(x int) partitioned by (y int) stored as orc");
        createTableEnvInBatchMode.executeSql("create table pt3(x int) partitioned by (y int) stored as parquet");
        createTableEnvInBatchMode.executeSql("insert into pt1 partition(y=1) values (1)").await();
        createTableEnvInBatchMode.executeSql("insert into pt2 partition(y=2) values (2)").await();
        createTableEnvInBatchMode.executeSql("insert into pt3 partition(y=3) values (3)").await();
        Assertions.assertThat(hiveCatalog.getPartitionStatistics(new ObjectPath("default", "pt1"), new CatalogPartitionSpec(Collections.singletonMap("y", "1")))).isEqualTo(new CatalogTableStatistics(-1L, 1, getPathSize(Paths.get(var, "pt1", "y=1")), -1L));
        Assertions.assertThat(hiveCatalog.getPartitionStatistics(new ObjectPath("default", "pt2"), new CatalogPartitionSpec(Collections.singletonMap("y", "2")))).isEqualTo(new CatalogTableStatistics(1L, 1, getPathSize(Paths.get(var, "pt2", "y=2")), 4L));
        Assertions.assertThat(hiveCatalog.getPartitionStatistics(new ObjectPath("default", "pt3"), new CatalogPartitionSpec(Collections.singletonMap("y", "3")))).isEqualTo(new CatalogTableStatistics(1L, 1, getPathSize(Paths.get(var, "pt3", "y=3")), 33L));
        createTableEnvInBatchMode.executeSql("insert into pt1 partition(y=1) values (1)").await();
        createTableEnvInBatchMode.executeSql("insert into pt2 partition(y=2) values (2)").await();
        createTableEnvInBatchMode.executeSql("insert into pt3 partition(y=3) values (3)").await();
        Assertions.assertThat(hiveCatalog.getPartitionStatistics(new ObjectPath("default", "pt1"), new CatalogPartitionSpec(Collections.singletonMap("y", "1")))).isEqualTo(new CatalogTableStatistics(-1L, 2, getPathSize(Paths.get(var, "pt1", "y=1")), -1L));
        Assertions.assertThat(hiveCatalog.getPartitionStatistics(new ObjectPath("default", "pt2"), new CatalogPartitionSpec(Collections.singletonMap("y", "2")))).isEqualTo(new CatalogTableStatistics(2L, 2, getPathSize(Paths.get(var, "pt2", "y=2")), 8L));
        Assertions.assertThat(hiveCatalog.getPartitionStatistics(new ObjectPath("default", "pt3"), new CatalogPartitionSpec(Collections.singletonMap("y", "3")))).isEqualTo(new CatalogTableStatistics(2L, 2, getPathSize(Paths.get(var, "pt3", "y=3")), 66L));
        createTableEnvInBatchMode.executeSql("create table src(x int)");
        createTableEnvInBatchMode.executeSql("insert overwrite table pt1 partition(y=1) select * from src").await();
        createTableEnvInBatchMode.executeSql("insert overwrite table pt2 partition(y=2) select * from src").await();
        createTableEnvInBatchMode.executeSql("insert overwrite table pt3 partition(y=3) select * from src").await();
        for (int i2 = 1; i2 <= 3; i2++) {
            Assertions.assertThat(hiveCatalog.getPartitionStatistics(new ObjectPath("default", "pt" + i2), new CatalogPartitionSpec(Collections.singletonMap("y", String.valueOf(i2))))).isEqualTo(CatalogTableStatistics.UNKNOWN);
        }
    }

    private long getPathSize(Path path) throws IOException {
        String str = (String) HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.defaultValue();
        return ((Long) Files.list(path).filter(path2 -> {
            return (path2.toFile().isHidden() || path2.toFile().getPath().equals(str)) ? false : true;
        }).map(path3 -> {
            return Long.valueOf(path3.toFile().length());
        }).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).orElse(0L)).longValue();
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            Assertions.assertThat(it.hasNext()).isTrue();
            arrayList.add(it.next().toString());
        }
        arrayList.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        return arrayList;
    }

    private void checkSuccessFiles(String str) {
        File file = new File(str, "d=2020-05-03");
        Assertions.assertThat(file.list()).hasSize(5);
        Assertions.assertThat(new File(new File(file, "e=7"), "_MY_SUCCESS").exists()).isTrue();
        Assertions.assertThat(new File(new File(file, "e=8"), "_MY_SUCCESS").exists()).isTrue();
        Assertions.assertThat(new File(new File(file, "e=9"), "_MY_SUCCESS").exists()).isTrue();
        Assertions.assertThat(new File(new File(file, "e=10"), "_MY_SUCCESS").exists()).isTrue();
        Assertions.assertThat(new File(new File(file, "e=11"), "_MY_SUCCESS").exists()).isTrue();
    }

    private void testStreamingWriteWithCustomPartitionCommitPolicy(String str) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L));
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(executionEnvironment);
        createTableEnvInStreamingMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInStreamingMode.useCatalog(hiveCatalog.getName());
        createTableEnvInStreamingMode.getConfig().setSqlDialect(SqlDialect.HIVE);
        try {
            createTableEnvInStreamingMode.executeSql("create database db1");
            createTableEnvInStreamingMode.useDatabase("db1");
            createTableEnvInStreamingMode.createTemporaryView("my_table", executionEnvironment.addSource(new FiniteTestSource(Arrays.asList(Row.of(new Object[]{1, "a", "b", "2020-05-03", "7"}), Row.of(new Object[]{2, "p", "q", "2020-05-03", "8"}), Row.of(new Object[]{3, "x", "y", "2020-05-03", "9"}), Row.of(new Object[]{4, "x", "y", "2020-05-03", "10"}), Row.of(new Object[]{5, "x", "y", "2020-05-03", "11"}))), new RowTypeInfo(new TypeInformation[]{Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING})), new Expression[]{Expressions.$("a"), Expressions.$("b"), Expressions.$("c"), Expressions.$("d"), Expressions.$("e")});
            createTableEnvInStreamingMode.executeSql("create external table sink_table (a int,b string,c string) partitioned by (d string,e string)  stored as textfile TBLPROPERTIES ('" + FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY.key() + "'='1h','" + FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key() + "'='metastore,custom','" + FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS.key() + "'='" + str + "')");
            createTableEnvInStreamingMode.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            createTableEnvInStreamingMode.sqlQuery("select * from my_table").executeInsert("sink_table").await();
            Set<String> committedPartitionPathsAndReset = TestCustomCommitPolicy.getCommittedPartitionPathsAndReset();
            String path = URI.create(hiveCatalog.getHiveTable(ObjectPath.fromString("db1.sink_table")).getSd().getLocation()).getPath();
            Lists.newArrayList(new String[]{"e=7", "e=8", "e=9", "e=10", "e=11"}).forEach(str2 -> {
                Assertions.assertThat(committedPartitionPathsAndReset).as("Partition(d=2020-05-03, " + str2 + ") is not committed successfully", new Object[0]).contains(new String[]{new org.apache.flink.core.fs.Path(new org.apache.flink.core.fs.Path(path, "d=2020-05-03"), str2).toString()});
            });
            createTableEnvInStreamingMode.executeSql("drop database if exists db1 cascade");
        } catch (Throwable th) {
            createTableEnvInStreamingMode.executeSql("drop database if exists db1 cascade");
            throw th;
        }
    }

    private void testStreamingWrite(boolean z, boolean z2, String str, Consumer<String> consumer) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(100L);
        StreamTableEnvironment createTableEnvInStreamingMode = HiveTestUtils.createTableEnvInStreamingMode(executionEnvironment);
        createTableEnvInStreamingMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInStreamingMode.useCatalog(hiveCatalog.getName());
        createTableEnvInStreamingMode.getConfig().setSqlDialect(SqlDialect.HIVE);
        if (z2) {
            createTableEnvInStreamingMode.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, true);
        } else {
            createTableEnvInStreamingMode.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
        }
        try {
            createTableEnvInStreamingMode.executeSql("create database db1");
            createTableEnvInStreamingMode.useDatabase("db1");
            createTableEnvInStreamingMode.createTemporaryView("my_table", executionEnvironment.addSource(new FiniteTestSource(Arrays.asList(Row.of(new Object[]{1, "a", "b", "2020-05-03", "7"}), Row.of(new Object[]{2, "p", "q", "2020-05-03", "8"}), Row.of(new Object[]{3, "x", "y", "2020-05-03", "9"}), Row.of(new Object[]{4, "x", "y", "2020-05-03", "10"}), Row.of(new Object[]{5, "x", "y", "2020-05-03", "11"}))), new RowTypeInfo(new TypeInformation[]{Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING})), new Expression[]{Expressions.$("a"), Expressions.$("b"), Expressions.$("c"), Expressions.$("d"), Expressions.$("e")});
            createTableEnvInStreamingMode.executeSql("create external table sink_table (a int,b string,c string" + (z ? "" : ",d string,e string") + ") " + (z ? "partitioned by (d string,e string) " : "") + " stored as " + str + " TBLPROPERTIES ('" + FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key() + "'='$d $e:00:00','" + FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY.key() + "'='1h','" + FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key() + "'='metastore,success-file','" + FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key() + "'='_MY_SUCCESS')");
            createTableEnvInStreamingMode.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            createTableEnvInStreamingMode.sqlQuery("select * from my_table").executeInsert("sink_table").await();
            assertBatch("db1.sink_table", Arrays.asList("+I[1, a, b, 2020-05-03, 7]", "+I[1, a, b, 2020-05-03, 7]", "+I[2, p, q, 2020-05-03, 8]", "+I[2, p, q, 2020-05-03, 8]", "+I[3, x, y, 2020-05-03, 9]", "+I[3, x, y, 2020-05-03, 9]", "+I[4, x, y, 2020-05-03, 10]", "+I[4, x, y, 2020-05-03, 10]", "+I[5, x, y, 2020-05-03, 11]", "+I[5, x, y, 2020-05-03, 11]"));
            consumer.accept(URI.create(hiveCatalog.getHiveTable(ObjectPath.fromString("db1.sink_table")).getSd().getLocation()).getPath());
            createTableEnvInStreamingMode.executeSql("drop database db1 cascade");
        } catch (Throwable th) {
            createTableEnvInStreamingMode.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    private void assertBatch(String str, List<String> list) {
        ArrayList arrayList = new ArrayList();
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode();
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("select * from " + str).collect().forEachRemaining(row -> {
            arrayList.add(row.toString());
        });
        arrayList.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        list.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        Assertions.assertThat(arrayList).isEqualTo(list);
    }
}
