package org.apache.flink.table.catalog.hive;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.TableUtils;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogTableBuilder;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.functions.hive.util.TestHiveGenericUDF;
import org.apache.flink.table.functions.hive.util.TestHiveSimpleUDF;
import org.apache.flink.table.functions.hive.util.TestHiveUDTF;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.util.JavaScalaConversionUtil;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.hive.ql.udf.UDFMonth;
import org.apache.hadoop.hive.ql.udf.UDFYear;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;

@RunWith(FlinkStandaloneHiveRunner.class)
/* loaded from: input_file:org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.class */
public class HiveCatalogUseBlinkITCase extends AbstractTestBase {

    @HiveSQL(files = {})
    private static HiveShell hiveShell;
    private static HiveCatalog hiveCatalog;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private String sourceTableName = "csv_source";
    private String sinkTableName = "csv_sink";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase$JavaToScala.class */
    public static class JavaToScala implements MapFunction<Tuple2<Boolean, Row>, scala.Tuple2<Boolean, Row>> {
        private JavaToScala() {
        }

        public scala.Tuple2<Boolean, Row> map(Tuple2<Boolean, Row> tuple2) throws Exception {
            return new scala.Tuple2<>(tuple2.f0, tuple2.f1);
        }
    }

    @BeforeClass
    public static void createCatalog() {
        hiveCatalog = HiveTestUtils.createHiveCatalog(hiveShell.getHiveConf());
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testBlinkUdf() throws Exception {
        TableSchema build = TableSchema.builder().field("name", DataTypes.STRING()).field("age", DataTypes.INT()).build();
        hiveCatalog.createTable(new ObjectPath("default", this.sourceTableName), new CatalogTableBuilder(new FileSystem().path(getClass().getResource("/csv/test.csv").getPath()), build).withFormat(new OldCsv().field("name", Types.STRING()).field("age", Types.INT())).inAppendMode().withComment("Comment.").build(), false);
        hiveCatalog.createFunction(new ObjectPath("default", "myudf"), new CatalogFunctionImpl(TestHiveSimpleUDF.class.getCanonicalName()), false);
        hiveCatalog.createFunction(new ObjectPath("default", "mygenericudf"), new CatalogFunctionImpl(TestHiveGenericUDF.class.getCanonicalName()), false);
        hiveCatalog.createFunction(new ObjectPath("default", "myudtf"), new CatalogFunctionImpl(TestHiveUDTF.class.getCanonicalName()), false);
        hiveCatalog.createFunction(new ObjectPath("default", "myudaf"), new CatalogFunctionImpl(GenericUDAFSum.class.getCanonicalName()), false);
        testUdf(true);
        testUdf(false);
    }

    private void testUdf(boolean z) throws Exception {
        List java;
        EnvironmentSettings.Builder useBlinkPlanner = EnvironmentSettings.newInstance().useBlinkPlanner();
        if (z) {
            useBlinkPlanner.inBatchMode();
        } else {
            useBlinkPlanner.inStreamingMode();
        }
        TableEnvironment create = z ? TableEnvironment.create(useBlinkPlanner.build()) : StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), useBlinkPlanner.build());
        BatchTestBase.configForMiniCluster(create.getConfig());
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        String format = String.format("select a, s, sum(b), myudaf(b) from (%s) group by a, s", String.format("select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from %s, lateral table(myudtf(name, 1)) as T(s)", this.sourceTableName));
        if (z) {
            Path path = Paths.get(this.tempFolder.newFolder().getAbsolutePath(), "test.csv");
            hiveCatalog.createTable(new ObjectPath("default", this.sinkTableName), new CatalogTableBuilder(new FileSystem().path(path.toAbsolutePath().toString()), TableSchema.builder().field("name1", Types.STRING()).field("name2", Types.STRING()).field("sum1", Types.INT()).field("sum2", Types.LONG()).build()).withFormat(new OldCsv().field("name1", Types.STRING()).field("name2", Types.STRING()).field("sum1", Types.INT()).field("sum2", Types.LONG())).inAppendMode().withComment("Comment.").build(), false);
            create.sqlUpdate(String.format("insert into %s " + format, this.sinkTableName));
            create.execute("myjob");
            StringBuilder sb = new StringBuilder();
            Stream<Path> walk = Files.walk(Paths.get(path.toAbsolutePath().toString(), new String[0]), new FileVisitOption[0]);
            Throwable th = null;
            try {
                try {
                    walk.filter(path2 -> {
                        return Files.isRegularFile(path2, new LinkOption[0]);
                    }).forEach(path3 -> {
                        try {
                            String readFileUtf8 = FileUtils.readFileUtf8(path3.toFile());
                            if (readFileUtf8.isEmpty()) {
                                return;
                            }
                            sb.append(readFileUtf8);
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    });
                    if (walk != null) {
                        if (0 != 0) {
                            try {
                                walk.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            walk.close();
                        }
                    }
                    java = (List) Arrays.stream(sb.toString().split("\n")).filter(str -> {
                        return !str.isEmpty();
                    }).collect(Collectors.toList());
                } finally {
                }
            } catch (Throwable th3) {
                if (walk != null) {
                    if (th != null) {
                        try {
                            walk.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        walk.close();
                    }
                }
                throw th3;
            }
        } else {
            StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) create;
            TestingRetractSink testingRetractSink = new TestingRetractSink();
            streamTableEnvironment.toRetractStream(create.sqlQuery(format), Row.class).map(new JavaToScala()).addSink(testingRetractSink);
            streamTableEnvironment.execute("");
            java = JavaScalaConversionUtil.toJava(testingRetractSink.getRetractResults());
        }
        ArrayList arrayList = new ArrayList(java);
        arrayList.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        Assert.assertEquals(Arrays.asList("1,1,2,2", "2,2,4,4", "3,3,6,6"), arrayList);
    }

    @Test
    public void testTimestampUDF() throws Exception {
        hiveCatalog.createFunction(new ObjectPath("default", "myyear"), new CatalogFunctionImpl(UDFYear.class.getCanonicalName()), false);
        hiveShell.execute("create table src(ts timestamp)");
        try {
            HiveTestUtils.createTextTableInserter(hiveShell, "default", "src").addRow(new Object[]{Timestamp.valueOf("2013-07-15 10:00:00")}).addRow(new Object[]{Timestamp.valueOf("2019-05-23 17:32:55")}).commit();
            TableEnvironment createTableEnvWithBlinkPlannerBatchMode = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
            createTableEnvWithBlinkPlannerBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
            createTableEnvWithBlinkPlannerBatchMode.useCatalog(hiveCatalog.getName());
            List collectToList = TableUtils.collectToList(createTableEnvWithBlinkPlannerBatchMode.sqlQuery("select myyear(ts) as y from src"));
            Assert.assertEquals(2L, collectToList.size());
            Assert.assertEquals("[2013, 2019]", collectToList.toString());
            hiveShell.execute("drop table src");
        } catch (Throwable th) {
            hiveShell.execute("drop table src");
            throw th;
        }
    }

    @Test
    public void testDateUDF() throws Exception {
        hiveCatalog.createFunction(new ObjectPath("default", "mymonth"), new CatalogFunctionImpl(UDFMonth.class.getCanonicalName()), false);
        hiveShell.execute("create table src(dt date)");
        try {
            HiveTestUtils.createTextTableInserter(hiveShell, "default", "src").addRow(new Object[]{Date.valueOf("2019-01-19")}).addRow(new Object[]{Date.valueOf("2019-03-02")}).commit();
            TableEnvironment createTableEnvWithBlinkPlannerBatchMode = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
            createTableEnvWithBlinkPlannerBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
            createTableEnvWithBlinkPlannerBatchMode.useCatalog(hiveCatalog.getName());
            List collectToList = TableUtils.collectToList(createTableEnvWithBlinkPlannerBatchMode.sqlQuery("select mymonth(dt) as m from src order by m"));
            Assert.assertEquals(2L, collectToList.size());
            Assert.assertEquals("[1, 3]", collectToList.toString());
            hiveShell.execute("drop table src");
        } catch (Throwable th) {
            hiveShell.execute("drop table src");
            throw th;
        }
    }
}
