package org.apache.flink.connectors.hive;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSinkITCase.class */
public class HiveTableSinkITCase {
    private static HiveCatalog hiveCatalog;

    @BeforeClass
    public static void createCatalog() throws IOException {
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testHiveTableSinkWithParallelismInBatch() {
        testHiveTableSinkWithParallelismBase(HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE), "/explain/testHiveTableSinkWithParallelismInBatch.out");
    }

    @Test
    public void testHiveTableSinkWithParallelismInStreaming() {
        testHiveTableSinkWithParallelismBase(HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(StreamExecutionEnvironment.getExecutionEnvironment(), SqlDialect.HIVE), "/explain/testHiveTableSinkWithParallelismInStreaming.out");
    }

    private void testHiveTableSinkWithParallelismBase(TableEnvironment tableEnvironment, String str) {
        tableEnvironment.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        tableEnvironment.useCatalog(hiveCatalog.getName());
        tableEnvironment.executeSql("create database db1");
        tableEnvironment.useDatabase("db1");
        tableEnvironment.executeSql(String.format("CREATE TABLE test_table ( id int, real_col int) TBLPROPERTIES ( 'sink.parallelism' = '8')", new Object[0]));
        tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        Assert.assertEquals(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(TableTestUtil.readFromResource(str))), TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(tableEnvironment.explainSql("insert into test_table select 1, 1", new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN}))));
        tableEnvironment.executeSql("drop database db1 cascade");
    }

    @Test
    public void testBatchAppend() throws Exception {
        TableEnvironment createTableEnvWithBlinkPlannerBatchMode = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
        createTableEnvWithBlinkPlannerBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvWithBlinkPlannerBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvWithBlinkPlannerBatchMode.executeSql("create database db1");
        createTableEnvWithBlinkPlannerBatchMode.useDatabase("db1");
        try {
            createTableEnvWithBlinkPlannerBatchMode.executeSql("create table append_table (i int, j int)");
            createTableEnvWithBlinkPlannerBatchMode.executeSql("insert into append_table select 1, 1").await();
            createTableEnvWithBlinkPlannerBatchMode.executeSql("insert into append_table select 2, 2").await();
            List iteratorToList = CollectionUtil.iteratorToList(createTableEnvWithBlinkPlannerBatchMode.executeSql("select * from append_table").collect());
            iteratorToList.sort(Comparator.comparingInt(row -> {
                return ((Integer) row.getField(0)).intValue();
            }));
            Assert.assertEquals(Arrays.asList(Row.of(new Object[]{1, 1}), Row.of(new Object[]{2, 2})), iteratorToList);
        } finally {
            createTableEnvWithBlinkPlannerBatchMode.executeSql("drop database db1 cascade");
        }
    }

    @Test(timeout = 120000)
    public void testDefaultSerPartStreamingWrite() throws Exception {
        testStreamingWrite(true, false, "textfile", this::checkSuccessFiles);
    }

    @Test(timeout = 120000)
    public void testPartStreamingWrite() throws Exception {
        testStreamingWrite(true, false, "parquet", this::checkSuccessFiles);
        if (hiveCatalog.getHiveVersion().startsWith("2.")) {
            return;
        }
        testStreamingWrite(true, false, "orc", this::checkSuccessFiles);
    }

    @Test(timeout = 120000)
    public void testNonPartStreamingWrite() throws Exception {
        testStreamingWrite(false, false, "parquet", str -> {
        });
        if (hiveCatalog.getHiveVersion().startsWith("2.")) {
            return;
        }
        testStreamingWrite(false, false, "orc", str2 -> {
        });
    }

    @Test(timeout = 120000)
    public void testPartStreamingMrWrite() throws Exception {
        testStreamingWrite(true, true, "parquet", this::checkSuccessFiles);
        if (hiveCatalog.getHiveVersion().startsWith("2.0")) {
            return;
        }
        testStreamingWrite(true, true, "orc", this::checkSuccessFiles);
    }

    @Test(timeout = 120000)
    public void testNonPartStreamingMrWrite() throws Exception {
        testStreamingWrite(false, true, "parquet", str -> {
        });
        if (hiveCatalog.getHiveVersion().startsWith("2.0")) {
            return;
        }
        testStreamingWrite(false, true, "orc", str2 -> {
        });
    }

    @Test(timeout = 120000)
    public void testStreamingAppend() throws Exception {
        testStreamingWrite(false, false, "parquet", str -> {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(1);
            StreamTableEnvironment createTableEnvWithBlinkPlannerStreamMode = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(executionEnvironment);
            createTableEnvWithBlinkPlannerStreamMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
            createTableEnvWithBlinkPlannerStreamMode.useCatalog(hiveCatalog.getName());
            try {
                createTableEnvWithBlinkPlannerStreamMode.executeSql("insert into db1.sink_table select 6,'a','b','2020-05-03','12'").await();
            } catch (Exception e) {
                Assert.fail("Failed to execute sql: " + e.getMessage());
            }
            assertBatch("db1.sink_table", Arrays.asList("+I[1, a, b, 2020-05-03, 7]", "+I[1, a, b, 2020-05-03, 7]", "+I[2, p, q, 2020-05-03, 8]", "+I[2, p, q, 2020-05-03, 8]", "+I[3, x, y, 2020-05-03, 9]", "+I[3, x, y, 2020-05-03, 9]", "+I[4, x, y, 2020-05-03, 10]", "+I[4, x, y, 2020-05-03, 10]", "+I[5, x, y, 2020-05-03, 11]", "+I[5, x, y, 2020-05-03, 11]", "+I[6, a, b, 2020-05-03, 12]"));
        });
    }

    private void checkSuccessFiles(String str) {
        File file = new File(str, "d=2020-05-03");
        Assert.assertEquals(5L, file.list().length);
        Assert.assertTrue(new File(new File(file, "e=7"), "_MY_SUCCESS").exists());
        Assert.assertTrue(new File(new File(file, "e=8"), "_MY_SUCCESS").exists());
        Assert.assertTrue(new File(new File(file, "e=9"), "_MY_SUCCESS").exists());
        Assert.assertTrue(new File(new File(file, "e=10"), "_MY_SUCCESS").exists());
        Assert.assertTrue(new File(new File(file, "e=11"), "_MY_SUCCESS").exists());
    }

    private void testStreamingWrite(boolean z, boolean z2, String str, Consumer<String> consumer) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(100L);
        StreamTableEnvironment createTableEnvWithBlinkPlannerStreamMode = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(executionEnvironment);
        createTableEnvWithBlinkPlannerStreamMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvWithBlinkPlannerStreamMode.useCatalog(hiveCatalog.getName());
        createTableEnvWithBlinkPlannerStreamMode.getConfig().setSqlDialect(SqlDialect.HIVE);
        if (z2) {
            createTableEnvWithBlinkPlannerStreamMode.getConfig().getConfiguration().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, true);
        } else {
            createTableEnvWithBlinkPlannerStreamMode.getConfig().getConfiguration().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
        }
        try {
            createTableEnvWithBlinkPlannerStreamMode.executeSql("create database db1");
            createTableEnvWithBlinkPlannerStreamMode.useDatabase("db1");
            createTableEnvWithBlinkPlannerStreamMode.createTemporaryView("my_table", executionEnvironment.addSource(new FiniteTestSource(Arrays.asList(Row.of(new Object[]{1, "a", "b", "2020-05-03", "7"}), Row.of(new Object[]{2, "p", "q", "2020-05-03", "8"}), Row.of(new Object[]{3, "x", "y", "2020-05-03", "9"}), Row.of(new Object[]{4, "x", "y", "2020-05-03", "10"}), Row.of(new Object[]{5, "x", "y", "2020-05-03", "11"}))), new RowTypeInfo(new TypeInformation[]{Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING})), new Expression[]{Expressions.$("a"), Expressions.$("b"), Expressions.$("c"), Expressions.$("d"), Expressions.$("e")});
            createTableEnvWithBlinkPlannerStreamMode.executeSql("create external table sink_table (a int,b string,c string" + (z ? "" : ",d string,e string") + ") " + (z ? "partitioned by (d string,e string) " : "") + " stored as " + str + " TBLPROPERTIES ('" + FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key() + "'='$d $e:00:00','" + FileSystemOptions.SINK_PARTITION_COMMIT_DELAY.key() + "'='1h','" + FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key() + "'='metastore,success-file','" + FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key() + "'='_MY_SUCCESS')");
            createTableEnvWithBlinkPlannerStreamMode.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            createTableEnvWithBlinkPlannerStreamMode.sqlQuery("select * from my_table").executeInsert("sink_table").await();
            assertBatch("db1.sink_table", Arrays.asList("+I[1, a, b, 2020-05-03, 7]", "+I[1, a, b, 2020-05-03, 7]", "+I[2, p, q, 2020-05-03, 8]", "+I[2, p, q, 2020-05-03, 8]", "+I[3, x, y, 2020-05-03, 9]", "+I[3, x, y, 2020-05-03, 9]", "+I[4, x, y, 2020-05-03, 10]", "+I[4, x, y, 2020-05-03, 10]", "+I[5, x, y, 2020-05-03, 11]", "+I[5, x, y, 2020-05-03, 11]"));
            ArrayList arrayList = new ArrayList();
            TableEnvironment createTableEnvWithBlinkPlannerBatchMode = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
            createTableEnvWithBlinkPlannerBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
            createTableEnvWithBlinkPlannerBatchMode.useCatalog(hiveCatalog.getName());
            createTableEnvWithBlinkPlannerBatchMode.executeSql("select * from db1.sink_table").collect().forEachRemaining(row -> {
                arrayList.add(row.toString());
            });
            arrayList.sort((v0, v1) -> {
                return v0.compareTo(v1);
            });
            Assert.assertEquals(Arrays.asList("+I[1, a, b, 2020-05-03, 7]", "+I[1, a, b, 2020-05-03, 7]", "+I[2, p, q, 2020-05-03, 8]", "+I[2, p, q, 2020-05-03, 8]", "+I[3, x, y, 2020-05-03, 9]", "+I[3, x, y, 2020-05-03, 9]", "+I[4, x, y, 2020-05-03, 10]", "+I[4, x, y, 2020-05-03, 10]", "+I[5, x, y, 2020-05-03, 11]", "+I[5, x, y, 2020-05-03, 11]"), arrayList);
            consumer.accept(URI.create(hiveCatalog.getHiveTable(ObjectPath.fromString("db1.sink_table")).getSd().getLocation()).getPath());
            createTableEnvWithBlinkPlannerStreamMode.executeSql("drop database db1 cascade");
        } catch (Throwable th) {
            createTableEnvWithBlinkPlannerStreamMode.executeSql("drop database db1 cascade");
            throw th;
        }
    }

    private void assertBatch(String str, List<String> list) {
        ArrayList arrayList = new ArrayList();
        TableEnvironment createTableEnvWithBlinkPlannerBatchMode = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
        createTableEnvWithBlinkPlannerBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvWithBlinkPlannerBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvWithBlinkPlannerBatchMode.executeSql("select * from " + str).collect().forEachRemaining(row -> {
            arrayList.add(row.toString());
        });
        arrayList.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        list.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        Assert.assertEquals(list, arrayList);
    }
}
