package org.apache.flink.table.catalog.hive;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileReader;
import java.io.PrintStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableBuilder;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/table/catalog/hive/HiveCatalogITCase.class */
public class HiveCatalogITCase {
    private static HiveCatalog hiveCatalog;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private String sourceTableName = "csv_source";
    private String sinkTableName = "csv_sink";

    @BeforeClass
    public static void createCatalog() {
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testCsvTableViaSQL() {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        create.executeSql("create table test2 (name String, age Int) with (\n   'connector.type' = 'filesystem',\n   'connector.path' = 'file://" + getClass().getResource("/csv/test.csv").getPath() + "',\n   'format.type' = 'csv'\n)");
        Assert.assertEquals(new HashSet(Arrays.asList(Row.of(new Object[]{"1", 1}), Row.of(new Object[]{"2", 2}), Row.of(new Object[]{"3", 3}))), new HashSet(CollectionUtil.iteratorToList(create.sqlQuery("SELECT * FROM myhive.`default`.test2").execute().collect())));
        create.executeSql("ALTER TABLE test2 RENAME TO newtable");
        Assert.assertEquals(new HashSet(Arrays.asList(Row.of(new Object[]{"1", 1}), Row.of(new Object[]{"2", 2}), Row.of(new Object[]{"3", 3}))), new HashSet(CollectionUtil.iteratorToList(create.sqlQuery("SELECT * FROM myhive.`default`.newtable").execute().collect())));
        create.executeSql("DROP TABLE newtable");
    }

    @Test
    public void testCsvTableViaAPI() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
        create.getConfig().addConfiguration(new Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 1));
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        TableSchema build = TableSchema.builder().field("name", DataTypes.STRING()).field("age", DataTypes.INT()).build();
        OldCsv field = new OldCsv().field("name", Types.STRING()).field("age", Types.INT());
        CatalogTable build2 = new CatalogTableBuilder(new FileSystem().path(getClass().getResource("/csv/test.csv").getPath()), build).withFormat(field).inAppendMode().withComment("Comment.").build();
        Path path = Paths.get(this.tempFolder.newFolder().getAbsolutePath(), "test.csv");
        CatalogTable build3 = new CatalogTableBuilder(new FileSystem().path(path.toAbsolutePath().toString()), build).withFormat(field).inAppendMode().withComment("Comment.").build();
        hiveCatalog.createTable(new ObjectPath("default", this.sourceTableName), build2, false);
        hiveCatalog.createTable(new ObjectPath("default", this.sinkTableName), build3, false);
        List iteratorToList = CollectionUtil.iteratorToList(create.sqlQuery(String.format("select * from myhive.`default`.%s", this.sourceTableName)).execute().collect());
        iteratorToList.sort(Comparator.comparing((v0) -> {
            return String.valueOf(v0);
        }));
        Assert.assertEquals(Arrays.asList(Row.of(new Object[]{"1", 1}), Row.of(new Object[]{"2", 2}), Row.of(new Object[]{"3", 3})), iteratorToList);
        create.executeSql(String.format("insert into myhive.`default`.%s select * from myhive.`default`.%s", this.sinkTableName, this.sourceTableName)).await();
        BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(path.toAbsolutePath().toString())));
        for (int i = 0; i < 3; i++) {
            Assert.assertEquals(String.format("%d,%d", Integer.valueOf(i + 1), Integer.valueOf(i + 1)), bufferedReader.readLine());
        }
        Assert.assertNull(bufferedReader.readLine());
        create.executeSql(String.format("DROP TABLE %s", this.sourceTableName));
        create.executeSql(String.format("DROP TABLE %s", this.sinkTableName));
    }

    @Test
    public void testReadWriteCsv() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        create.executeSql("CREATE TABLE src (price DECIMAL(10, 2),currency STRING,ts6 TIMESTAMP(6),ts AS CAST(ts6 AS TIMESTAMP(3)),WATERMARK FOR ts AS ts) " + String.format("WITH ('connector.type' = 'filesystem','connector.path' = 'file://%s','format.type' = 'csv')", getClass().getResource("/csv/test3.csv").getPath()));
        String uri = new File(this.tempFolder.newFolder(), "csv-order-sink").toURI().toString();
        create.executeSql("CREATE TABLE sink (window_end TIMESTAMP(3),max_ts TIMESTAMP(6),counter BIGINT,total_price DECIMAL(10, 2)) " + String.format("WITH ('connector.type' = 'filesystem','connector.path' = '%s','format.type' = 'csv')", uri));
        create.executeSql("INSERT INTO sink SELECT TUMBLE_END(ts, INTERVAL '5' SECOND),MAX(ts6),COUNT(*),MAX(price) FROM src GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)").await();
        Assert.assertEquals("2019-12-12 00:00:05.0,2019-12-12 00:00:04.004001,3,50.00\n2019-12-12 00:00:10.0,2019-12-12 00:00:06.006001,2,5.33\n", FileUtils.readFileUtf8(new File(new URI(uri))));
    }

    @Test
    public void testBatchReadWriteCsvWithProctime() {
        testReadWriteCsvWithProctime(false);
    }

    @Test
    public void testStreamReadWriteCsvWithProctime() {
        testReadWriteCsvWithProctime(true);
    }

    private void testReadWriteCsvWithProctime(boolean z) {
        TableEnvironment prepareTable = prepareTable(z);
        Assert.assertEquals(5L, CollectionUtil.iteratorToList(prepareTable.executeSql("SELECT * FROM proctime_src").collect()).size());
        prepareTable.executeSql("DROP TABLE proctime_src");
    }

    @Test
    public void testTableApiWithProctimeForBatch() {
        testTableApiWithProctime(false);
    }

    @Test
    public void testTableApiWithProctimeForStreaming() {
        testTableApiWithProctime(true);
    }

    private void testTableApiWithProctime(boolean z) {
        TableEnvironment prepareTable = prepareTable(z);
        Assert.assertEquals(5L, CollectionUtil.iteratorToList(prepareTable.from("proctime_src").select(new Expression[]{Expressions.$("price"), Expressions.$("ts"), Expressions.$("l_proctime")}).execute().collect()).size());
        prepareTable.executeSql("DROP TABLE proctime_src");
    }

    private TableEnvironment prepareTable(boolean z) {
        EnvironmentSettings.Builder useBlinkPlanner = EnvironmentSettings.newInstance().useBlinkPlanner();
        TableEnvironment create = TableEnvironment.create((z ? useBlinkPlanner.inStreamingMode() : useBlinkPlanner.inBatchMode()).build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        create.executeSql("CREATE TABLE proctime_src (price DECIMAL(10, 2),currency STRING,ts6 TIMESTAMP(6),ts AS CAST(ts6 AS TIMESTAMP(3)),WATERMARK FOR ts AS ts,l_proctime AS PROCTIME( )) " + String.format("WITH ('connector.type' = 'filesystem','connector.path' = 'file://%s','format.type' = 'csv')", getClass().getResource("/csv/test3.csv").getPath()));
        return create;
    }

    @Test
    public void testTableWithPrimaryKey() {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().build());
        create.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        create.registerCatalog("catalog1", hiveCatalog);
        create.useCatalog("catalog1");
        create.executeSql("CREATE TABLE pk_src (\n  uuid varchar(40) not null,\n  price DECIMAL(10, 2),\n  currency STRING,\n  ts6 TIMESTAMP(6),\n  ts AS CAST(ts6 AS TIMESTAMP(3)),\n  WATERMARK FOR ts AS ts,\n  constraint ct1 PRIMARY KEY(uuid) NOT ENFORCED)\n  WITH (\n    'connector.type' = 'filesystem',    'connector.path' = 'file://fakePath',    'format.type' = 'csv')");
        TableSchema tableSchema = (TableSchema) create.getCatalog(create.getCurrentCatalog()).map(catalog -> {
            try {
                return catalog.getTable(ObjectPath.fromString(catalog.getDefaultDatabase() + ".pk_src")).getSchema();
            } catch (TableNotExistException e) {
                return null;
            }
        }).orElse(null);
        Assert.assertNotNull(tableSchema);
        Assert.assertEquals(tableSchema.getPrimaryKey(), Optional.of(UniqueConstraint.primaryKey("ct1", Collections.singletonList("uuid"))));
        create.executeSql("DROP TABLE pk_src");
    }

    @Test
    public void testNewTableFactory() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        create.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        String path = getClass().getResource("/csv/test.csv").getPath();
        PrintStream printStream = System.out;
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            System.setOut(new PrintStream(byteArrayOutputStream));
            create.executeSql("create table csv_table (name String, age Int) with ('connector.type' = 'filesystem','connector.path' = 'file://" + path + "','format.type' = 'csv')");
            create.executeSql("create table print_table (name String, age Int) with ('connector' = 'print')");
            create.executeSql("insert into print_table select * from csv_table").await();
            Assert.assertEquals("+I(1,1)\n+I(2,2)\n+I(3,3)\n", byteArrayOutputStream.toString());
            if (System.out != printStream) {
                System.out.close();
            }
            System.setOut(printStream);
            create.executeSql("DROP TABLE csv_table");
            create.executeSql("DROP TABLE print_table");
        } catch (Throwable th) {
            if (System.out != printStream) {
                System.out.close();
            }
            System.setOut(printStream);
            create.executeSql("DROP TABLE csv_table");
            create.executeSql("DROP TABLE print_table");
            throw th;
        }
    }

    @Test
    public void testConcurrentAccessHiveCatalog() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        Callable callable = () -> {
            return hiveCatalog.listDatabases();
        };
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            arrayList.add(newFixedThreadPool.submit(callable));
        }
        newFixedThreadPool.shutdown();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get(5L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void testTemporaryGenericTable() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
        create.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        create.useCatalog(hiveCatalog.getName());
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2})));
        create.executeSql("create temporary table src(x int) with ('connector'='COLLECTION','is-bounded' = 'false')");
        File file = Files.createTempDirectory("dest-", new FileAttribute[0]).toFile();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            org.apache.commons.io.FileUtils.deleteQuietly(file);
        }));
        create.executeSql("create temporary table dest(x int) with ('connector' = 'filesystem'," + String.format("'path' = 'file://%s/1.csv',", file.getAbsolutePath()) + "'format' = 'csv')");
        create.executeSql("insert into dest select * from src").await();
        create.executeSql("create temporary table datagen(i int) with ('connector'='datagen','rows-per-second'='5','fields.i.kind'='sequence','fields.i.start'='1','fields.i.end'='10')");
        create.executeSql("create temporary table blackhole(i int) with ('connector'='blackhole')");
        create.executeSql("insert into blackhole select * from datagen").await();
    }
}
