package org.apache.flink.table.catalog.hive;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileReader;
import java.io.PrintStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.connectors.hive.TableEnvExecutorUtil;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TestSchemaResolver;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestManagedTableFactory;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/table/catalog/hive/HiveCatalogITCase.class */
public class HiveCatalogITCase {
    private static HiveCatalog hiveCatalog;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private String sourceTableName = "csv_source";
    private String sinkTableName = "csv_sink";

    @BeforeClass
    public static void createCatalog() {
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testCsvTableViaSQL() {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inBatchMode());
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        create.executeSql("create table test2 (name String, age Int) with (\n   'connector.type' = 'filesystem',\n   'connector.path' = 'file://" + getClass().getResource("/csv/test.csv").getPath() + "',\n   'format.type' = 'csv'\n)");
        Assertions.assertThat(CollectionUtil.iteratorToList(create.sqlQuery("SELECT * FROM myhive.`default`.test2").execute().collect())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", 1}), Row.of(new Object[]{"2", 2}), Row.of(new Object[]{"3", 3})});
        create.executeSql("ALTER TABLE test2 RENAME TO newtable");
        Assertions.assertThat(CollectionUtil.iteratorToList(create.sqlQuery("SELECT * FROM myhive.`default`.newtable").execute().collect())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", 1}), Row.of(new Object[]{"2", 2}), Row.of(new Object[]{"3", 3})});
        create.executeSql("DROP TABLE newtable");
    }

    @Test
    public void testCsvTableViaAPI() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inBatchMode());
        create.getConfig().addConfiguration(new Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 1));
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        ResolvedSchema resolvedSchema = new ResolvedSchema(Arrays.asList(Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())), new ArrayList(), (UniqueConstraint) null);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.type", "filesystem");
        hashMap.put("connector.path", getClass().getResource("/csv/test.csv").getPath());
        hashMap.put("format.type", "csv");
        ResolvedCatalogTable resolvedCatalogTable = new ResolvedCatalogTable(CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), "Comment.", new ArrayList(), hashMap), resolvedSchema);
        Path path = Paths.get(this.tempFolder.newFolder().getAbsolutePath(), "test.csv");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("connector.type", "filesystem");
        hashMap2.put("connector.path", path.toAbsolutePath().toString());
        hashMap2.put("format.type", "csv");
        ResolvedCatalogTable resolvedCatalogTable2 = new ResolvedCatalogTable(CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), "Comment.", new ArrayList(), hashMap2), resolvedSchema);
        hiveCatalog.createTable(new ObjectPath("default", this.sourceTableName), resolvedCatalogTable, false);
        hiveCatalog.createTable(new ObjectPath("default", this.sinkTableName), resolvedCatalogTable2, false);
        List iteratorToList = CollectionUtil.iteratorToList(create.sqlQuery(String.format("select * from myhive.`default`.%s", this.sourceTableName)).execute().collect());
        iteratorToList.sort(Comparator.comparing((v0) -> {
            return String.valueOf(v0);
        }));
        Assertions.assertThat(iteratorToList).containsExactly(new Row[]{Row.of(new Object[]{"1", 1}), Row.of(new Object[]{"2", 2}), Row.of(new Object[]{"3", 3})});
        create.executeSql(String.format("insert into myhive.`default`.%s select * from myhive.`default`.%s", this.sinkTableName, this.sourceTableName)).await();
        BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(path.toAbsolutePath().toString())));
        for (int i = 0; i < 3; i++) {
            Assertions.assertThat(bufferedReader.readLine()).isEqualTo(String.format("%d,%d", Integer.valueOf(i + 1), Integer.valueOf(i + 1)));
        }
        Assertions.assertThat(bufferedReader.readLine()).isNull();
        create.executeSql(String.format("DROP TABLE %s", this.sourceTableName));
        create.executeSql(String.format("DROP TABLE %s", this.sinkTableName));
    }

    @Test
    public void testReadWriteCsv() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        create.executeSql("CREATE TABLE src (price DECIMAL(10, 2),currency STRING,ts6 TIMESTAMP(6),ts AS CAST(ts6 AS TIMESTAMP(3)),WATERMARK FOR ts AS ts) " + String.format("WITH ('connector.type' = 'filesystem','connector.path' = 'file://%s','format.type' = 'csv')", getClass().getResource("/csv/test3.csv").getPath()));
        String uri = new File(this.tempFolder.newFolder(), "csv-order-sink").toURI().toString();
        create.executeSql("CREATE TABLE sink (window_end TIMESTAMP(3),max_ts TIMESTAMP(6),counter BIGINT,total_price DECIMAL(10, 2)) " + String.format("WITH ('connector.type' = 'filesystem','connector.path' = '%s','format.type' = 'csv')", uri));
        create.executeSql("INSERT INTO sink SELECT TUMBLE_END(ts, INTERVAL '5' SECOND),MAX(ts6),COUNT(*),MAX(price) FROM src GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)").await();
        Assertions.assertThat(FileUtils.readFileUtf8(new File(new URI(uri)))).isEqualTo("2019-12-12 00:00:05.0,2019-12-12 00:00:04.004001,3,50.00\n2019-12-12 00:00:10.0,2019-12-12 00:00:06.006001,2,5.33\n");
    }

    @Test
    public void testBatchReadWriteCsvWithProctime() {
        testReadWriteCsvWithProctime(false);
    }

    @Test
    public void testStreamReadWriteCsvWithProctime() {
        testReadWriteCsvWithProctime(true);
    }

    private void testReadWriteCsvWithProctime(boolean z) {
        TableEnvironment prepareTable = prepareTable(z);
        Assertions.assertThat(CollectionUtil.iteratorToList(prepareTable.executeSql("SELECT * FROM proctime_src").collect())).hasSize(5);
        prepareTable.executeSql("DROP TABLE proctime_src");
    }

    @Test
    public void testTableApiWithProctimeForBatch() {
        testTableApiWithProctime(false);
    }

    @Test
    public void testTableApiWithProctimeForStreaming() {
        testTableApiWithProctime(true);
    }

    private void testTableApiWithProctime(boolean z) {
        TableEnvironment prepareTable = prepareTable(z);
        Assertions.assertThat(CollectionUtil.iteratorToList(prepareTable.from("proctime_src").select(new Expression[]{Expressions.$("price"), Expressions.$("ts"), Expressions.$("l_proctime")}).execute().collect())).hasSize(5);
        prepareTable.executeSql("DROP TABLE proctime_src");
    }

    private TableEnvironment prepareTable(boolean z) {
        TableEnvironment create = TableEnvironment.create(z ? EnvironmentSettings.inStreamingMode() : EnvironmentSettings.inBatchMode());
        create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        create.executeSql("CREATE TABLE proctime_src (price DECIMAL(10, 2),currency STRING,ts6 TIMESTAMP(6),ts AS CAST(ts6 AS TIMESTAMP(3)),WATERMARK FOR ts AS ts,l_proctime AS PROCTIME( )) " + String.format("WITH ('connector.type' = 'filesystem','connector.path' = 'file://%s','format.type' = 'csv')", getClass().getResource("/csv/test3.csv").getPath()));
        return create;
    }

    @Test
    public void testTableWithPrimaryKey() {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        create.registerCatalog("catalog1", hiveCatalog);
        create.useCatalog("catalog1");
        create.executeSql("CREATE TABLE pk_src (\n  uuid varchar(40) not null,\n  price DECIMAL(10, 2),\n  currency STRING,\n  ts6 TIMESTAMP(6),\n  ts AS CAST(ts6 AS TIMESTAMP(3)),\n  WATERMARK FOR ts AS ts,\n  constraint ct1 PRIMARY KEY(uuid) NOT ENFORCED)\n  WITH (\n    'connector.type' = 'filesystem',    'connector.path' = 'file://fakePath',    'format.type' = 'csv')");
        Schema schema = (Schema) create.getCatalog(create.getCurrentCatalog()).map(catalog -> {
            try {
                return catalog.getTable(ObjectPath.fromString(catalog.getDefaultDatabase() + ".pk_src")).getUnresolvedSchema();
            } catch (TableNotExistException e) {
                return null;
            }
        }).orElse(null);
        Assertions.assertThat(schema).isNotNull();
        Assertions.assertThat(schema.getPrimaryKey()).hasValue(new Schema.UnresolvedPrimaryKey("ct1", Collections.singletonList("uuid")));
        create.executeSql("DROP TABLE pk_src");
    }

    @Test
    public void testNewTableFactory() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.registerCatalog("myhive", hiveCatalog);
        create.useCatalog("myhive");
        create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        String path = getClass().getResource("/csv/test.csv").getPath();
        PrintStream printStream = System.out;
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            System.setOut(new PrintStream(byteArrayOutputStream));
            create.executeSql("create table csv_table (name String, age Int) with ('connector.type' = 'filesystem','connector.path' = 'file://" + path + "','format.type' = 'csv')");
            create.executeSql("create table print_table (name String, age Int) with ('connector' = 'print')");
            create.executeSql("insert into print_table select * from csv_table").await();
            Assertions.assertThat(byteArrayOutputStream.toString()).isEqualTo("+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n");
            if (System.out != printStream) {
                System.out.close();
            }
            System.setOut(printStream);
            create.executeSql("DROP TABLE csv_table");
            create.executeSql("DROP TABLE print_table");
        } catch (Throwable th) {
            if (System.out != printStream) {
                System.out.close();
            }
            System.setOut(printStream);
            create.executeSql("DROP TABLE csv_table");
            create.executeSql("DROP TABLE print_table");
            throw th;
        }
    }

    @Test
    public void testConcurrentAccessHiveCatalog() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        try {
            Callable callable = () -> {
                return hiveCatalog.listDatabases();
            };
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 5; i++) {
                arrayList.add(newFixedThreadPool.submit(callable));
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get(5L, TimeUnit.SECONDS);
            }
        } finally {
            newFixedThreadPool.shutdown();
        }
    }

    @Test
    public void testTemporaryGenericTable() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        create.useCatalog(hiveCatalog.getName());
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2})));
        create.executeSql("create temporary table src(x int) with ('connector'='COLLECTION','is-bounded' = 'false')");
        File file = Files.createTempDirectory("dest-", new FileAttribute[0]).toFile();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            org.apache.commons.io.FileUtils.deleteQuietly(file);
        }));
        create.executeSql("create temporary table dest(x int) with ('connector' = 'filesystem'," + String.format("'path' = 'file://%s/1.csv',", file.getAbsolutePath()) + "'format' = 'csv')");
        create.executeSql("insert into dest select * from src").await();
        create.executeSql("create temporary table datagen(i int) with ('connector'='datagen','rows-per-second'='5','fields.i.kind'='sequence','fields.i.start'='1','fields.i.end'='10')");
        create.executeSql("create temporary table blackhole(i int) with ('connector'='blackhole')");
        create.executeSql("insert into blackhole select * from datagen").await();
    }

    @Test
    public void testCreateTableLike() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode();
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("create table generic_table (x int) with ('connector'='COLLECTION')");
        createTableEnvInBatchMode.useCatalog(CatalogManagerMocks.DEFAULT_CATALOG);
        createTableEnvInBatchMode.executeSql(String.format("create table copy like `%s`.`default`.generic_table", hiveCatalog.getName()));
        CatalogBaseTable table = ((Catalog) createTableEnvInBatchMode.getCatalog(CatalogManagerMocks.DEFAULT_CATALOG).get()).getTable(new ObjectPath(CatalogManagerMocks.DEFAULT_DATABASE, "copy"));
        Assertions.assertThat(table.getOptions()).hasSize(1);
        Assertions.assertThat(table.getOptions()).containsEntry(FactoryUtil.CONNECTOR.key(), "COLLECTION");
        Assertions.assertThat(table.getSchema().getFieldCount()).isEqualTo(1);
        Assertions.assertThat(table.getSchema().getFieldNames()).hasSameElementsAs(Collections.singletonList("x"));
        Assertions.assertThat(table.getSchema().getFieldDataTypes()).hasSameElementsAs(Collections.singletonList(DataTypes.INT()));
    }

    @Test
    public void testViewSchema() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        TableEnvExecutorUtil.executeInSeparateDatabase(createTableEnvInBatchMode, true, () -> {
            createTableEnvInBatchMode.executeSql("create table src(x int,ts timestamp(3)) with ('connector'='datagen','number-of-rows'='10')");
            createTableEnvInBatchMode.executeSql("create view v1 as select x,ts from src order by x limit 3");
            Assertions.assertThat(hiveCatalog.getTable(new ObjectPath("db1", "v1")).getUnresolvedSchema().resolve(new TestSchemaResolver())).isEqualTo(new ResolvedSchema(Lists.newArrayList(new Column[]{Column.physical("x", DataTypes.INT()), Column.physical("ts", DataTypes.TIMESTAMP(3))}), new ArrayList(), (UniqueConstraint) null));
            Assertions.assertThat(CollectionUtil.iteratorToList(createTableEnvInBatchMode.executeSql("select x from v1").collect())).hasSize(3);
            createTableEnvInBatchMode.executeSql("create view v2 (v2_x,v2_ts) comment 'v2 comment' as select x,cast(ts as timestamp_ltz(3)) from v1");
            CatalogView table = hiveCatalog.getTable(new ObjectPath("db1", "v2"));
            Assertions.assertThat(table.getUnresolvedSchema().resolve(new TestSchemaResolver())).isEqualTo(new ResolvedSchema(Lists.newArrayList(new Column[]{Column.physical("v2_x", DataTypes.INT()), Column.physical("v2_ts", DataTypes.TIMESTAMP_LTZ(3))}), new ArrayList(), (UniqueConstraint) null));
            Assertions.assertThat(table.getComment()).isEqualTo("v2 comment");
            Assertions.assertThat(CollectionUtil.iteratorToList(createTableEnvInBatchMode.executeSql("select * from v2").collect())).hasSize(3);
        });
    }

    @Test
    public void testCreateAndGetManagedTable() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        ObjectIdentifier of = ObjectIdentifier.of("myhive", "default", "managed_table");
        try {
            TestManagedTableFactory.MANAGED_TABLES.put(of, new AtomicReference());
            create.registerCatalog("myhive", hiveCatalog);
            create.useCatalog("myhive");
            create.executeSql(String.format("CREATE TABLE %s (\n  uuid varchar(40) not null,\n  price DECIMAL(10, 2),\n  currency STRING,\n  ts6 TIMESTAMP(6),\n  ts AS CAST(ts6 AS TIMESTAMP(3)),\n  WATERMARK FOR ts AS ts,\n  constraint ct1 PRIMARY KEY(uuid) NOT ENFORCED)\n", "managed_table"));
            HashMap hashMap = new HashMap();
            hashMap.put("ENRICHED_KEY", "ENRICHED_VALUE");
            Assertions.assertThat((Map) ((AtomicReference) TestManagedTableFactory.MANAGED_TABLES.get(of)).get()).containsExactlyInAnyOrderEntriesOf(hashMap);
            HashMap hashMap2 = new HashMap();
            hashMap.forEach((str, str2) -> {
            });
            hashMap2.put("flink." + FactoryUtil.CONNECTOR.key(), "default");
            Assertions.assertThat(hiveCatalog.getHiveTable(of.toObjectPath()).getParameters()).containsAllEntriesOf(hashMap2);
            Assertions.assertThat(hiveCatalog.getTable(of.toObjectPath()).getOptions()).containsExactlyEntriesOf(Collections.singletonMap("ENRICHED_KEY", "ENRICHED_VALUE"));
            create.executeSql(String.format("DROP TABLE %s", "managed_table"));
            Assertions.assertThat((Map) ((AtomicReference) TestManagedTableFactory.MANAGED_TABLES.get(of)).get()).isNull();
        } catch (Throwable th) {
            create.executeSql(String.format("DROP TABLE %s", "managed_table"));
            Assertions.assertThat((Map) ((AtomicReference) TestManagedTableFactory.MANAGED_TABLES.get(of)).get()).isNull();
            throw th;
        }
    }
}
