package org.apache.paimon.hive;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;

@RunWith(PaimonEmbeddedHiveRunner.class)
/* loaded from: input_file:org/apache/paimon/hive/HiveCatalogITCaseBase.class */
public abstract class HiveCatalogITCaseBase {

    @Rule
    public TemporaryFolder folder = new TemporaryFolder();
    protected String path;
    protected TableEnvironment tEnv;

    @HiveSQL(files = {})
    protected static HiveShell hiveShell;

    @Before
    public void before() throws Exception {
        hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
        hiveShell.execute("USE test_db");
        hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
        hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200, 'Table')");
        hiveShell.executeQuery("SHOW TABLES");
        this.path = this.folder.newFolder().toURI().toString();
        this.tEnv = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        this.tEnv.executeSql(String.join("\n", "CREATE CATALOG my_hive WITH (", "  'type' = 'paimon',", "  'metastore' = 'hive',", "  'uri' = '',", "  'warehouse' = '" + this.path + "',", "  'lock.enabled' = 'true'", ")")).await();
        this.tEnv.executeSql("USE CATALOG my_hive").await();
        this.tEnv.executeSql("USE test_db").await();
    }

    @After
    public void after() {
        hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
        hiveShell.execute("DROP DATABASE IF EXISTS test_db2 CASCADE");
    }

    @Test
    public void testDatabaseOperations() throws Exception {
        this.tEnv.executeSql("CREATE DATABASE test_db2").await();
        Assert.assertEquals(Arrays.asList(Row.of(new Object[]{"default"}), Row.of(new Object[]{"test_db"}), Row.of(new Object[]{"test_db2"})), collect("SHOW DATABASES"));
        this.tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db2").await();
        try {
            this.tEnv.executeSql("CREATE DATABASE test_db2").await();
            Assert.fail("No exception is thrown");
        } catch (Throwable th) {
            ExceptionUtils.assertThrowableWithMessage(th, "Database test_db2 already exists in Catalog my_hive");
        }
        this.tEnv.executeSql("DROP DATABASE test_db2").await();
        Assert.assertEquals(Arrays.asList(Row.of(new Object[]{"default"}), Row.of(new Object[]{"test_db"})), collect("SHOW DATABASES"));
        this.tEnv.executeSql("DROP DATABASE IF EXISTS test_db2").await();
        try {
            this.tEnv.executeSql("DROP DATABASE test_db2").await();
            Assert.fail("No exception is thrown");
        } catch (Throwable th2) {
            ExceptionUtils.assertThrowableWithMessage(th2, "Database test_db2 does not exist in Catalog my_hive");
        }
        this.tEnv.executeSql("CREATE DATABASE test_db2").await();
        this.tEnv.executeSql("USE test_db2").await();
        this.tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        this.tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2, 'Hello')").await();
        Path path = new Path(this.path, "test_db2.db/t");
        Assert.assertTrue(path.getFileSystem().exists(path));
        try {
            this.tEnv.executeSql("DROP DATABASE test_db2").await();
            Assert.fail("No exception is thrown");
        } catch (Throwable th3) {
            ExceptionUtils.assertThrowableWithMessage(th3, "Database test_db2 in catalog my_hive is not empty");
        }
        this.tEnv.executeSql("DROP DATABASE test_db2 CASCADE").await();
        Assert.assertEquals(Arrays.asList(Row.of(new Object[]{"default"}), Row.of(new Object[]{"test_db"})), collect("SHOW DATABASES"));
        Assert.assertFalse(path.getFileSystem().exists(path));
    }

    @Test
    public void testTableOperations() throws Exception {
        this.tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        this.tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        Assert.assertEquals(Arrays.asList(Row.of(new Object[]{"s"}), Row.of(new Object[]{"t"})), collect("SHOW TABLES"));
        this.tEnv.executeSql("CREATE TABLE IF NOT EXISTS s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        try {
            this.tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
            Assert.fail("No exception is thrown");
        } catch (Throwable th) {
            ExceptionUtils.assertThrowableWithMessage(th, "Table (or view) test_db.s already exists in Catalog my_hive");
        }
        this.tEnv.executeSql("INSERT INTO s VALUES (1, 'Hi'), (2, 'Hello')").await();
        Path path = new Path(this.path, "test_db.db/s");
        Assert.assertTrue(path.getFileSystem().exists(path));
        this.tEnv.executeSql("DROP TABLE s").await();
        Assert.assertEquals(Collections.singletonList(Row.of(new Object[]{"t"})), collect("SHOW TABLES"));
        Assert.assertFalse(path.getFileSystem().exists(path));
        this.tEnv.executeSql("DROP TABLE IF EXISTS s").await();
        try {
            this.tEnv.executeSql("DROP TABLE s").await();
            Assert.fail("No exception is thrown");
        } catch (Throwable th2) {
            ExceptionUtils.assertThrowableWithMessage(th2, "Table with identifier 'my_hive.test_db.s' does not exist");
        }
        try {
            this.tEnv.executeSql("DROP TABLE hive_table").await();
            Assert.fail("No exception is thrown");
        } catch (Throwable th3) {
            ExceptionUtils.assertThrowableWithMessage(th3, "Table test_db.hive_table is not a paimon table");
        }
        this.tEnv.executeSql("ALTER TABLE t SET ( 'manifest.target-file-size' = '16MB' )").await();
        List<Row> collect = collect("SHOW CREATE TABLE t");
        Assert.assertEquals(1L, collect.size());
        Assert.assertTrue(collect.get(0).getField(0).toString().contains("'manifest.target-file-size' = '16MB'"));
        try {
            this.tEnv.executeSql("ALTER TABLE s SET ( 'manifest.target-file-size' = '16MB' )").await();
            Assert.fail("No exception is thrown");
        } catch (Throwable th4) {
            ExceptionUtils.assertThrowableWithMessage(th4, "Table `my_hive`.`test_db`.`s` doesn't exist or is a temporary table");
        }
        try {
            this.tEnv.executeSql("ALTER TABLE hive_table SET ( 'manifest.target-file-size' = '16MB' )").await();
            Assert.fail("No exception is thrown");
        } catch (Throwable th5) {
            ExceptionUtils.assertThrowableWithMessage(th5, "Table test_db.hive_table is not a paimon table");
        }
    }

    @Test
    public void testCreateExternalTable() throws Exception {
        this.tEnv.executeSql(String.join("\n", "CREATE CATALOG my_hive_external WITH (", "  'type' = 'paimon',", "  'metastore' = 'hive',", "  'uri' = '',", "  'warehouse' = '" + this.path + "',", "  'lock.enabled' = 'true',", "  'table.type' = 'EXTERNAL'", ")")).await();
        this.tEnv.executeSql("USE CATALOG my_hive_external").await();
        this.tEnv.executeSql("USE test_db").await();
        this.tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        Assert.assertTrue(hiveShell.executeQuery("DESC FORMATTED t").contains("Table Type:         \tEXTERNAL_TABLE      \tNULL"));
        this.tEnv.executeSql("DROP TABLE t").await();
        Path path = new Path(this.path, "test_db.db/t");
        Assert.assertFalse(path.getFileSystem().exists(path));
    }

    @Test
    public void testFlinkWriteAndHiveRead() throws Exception {
        this.tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        this.tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2, 'Hello')").await();
        Assert.assertEquals(Arrays.asList("1\tHi", "2\tHello"), hiveShell.executeQuery("SELECT * FROM t ORDER BY a"));
        try {
            this.tEnv.executeSql("INSERT INTO hive_table VALUES (1, 'Hi'), (2, 'Hello')").await();
            Assert.fail("No exception is thrown");
        } catch (Throwable th) {
            ExceptionUtils.assertThrowableWithMessage(th, "Table test_db.hive_table is not a paimon table");
        }
    }

    @Test
    public void testCreateTableAs() throws Exception {
        this.tEnv.executeSql("CREATE TABLE t (a INT)").await();
        this.tEnv.executeSql("INSERT INTO t VALUES(1)").await();
        this.tEnv.executeSql("CREATE TABLE t1 AS SELECT * FROM t").await();
        Assertions.assertThat(collect("SELECT * FROM t1$schemas s").toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
        Assertions.assertThat(collect("SELECT * FROM t1")).contains(new Row[]{Row.of(new Object[]{1})});
        this.tEnv.executeSql("CREATE TABLE t_option (a INT)").await();
        this.tEnv.executeSql("INSERT INTO t_option VALUES(1)").await();
        this.tEnv.executeSql("CREATE TABLE t1_option WITH ('file.format' = 'parquet') AS SELECT * FROM t_option").await();
        Assertions.assertThat(collect("SELECT * FROM t1_option$options")).containsExactly(new Row[]{Row.of(new Object[]{"file.format", "parquet"})});
        Assertions.assertThat(collect("SELECT * FROM t1_option")).contains(new Row[]{Row.of(new Object[]{1})});
        this.tEnv.executeSql("CREATE TABLE t_p (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING\n) PARTITIONED BY (dt, hh)");
        this.tEnv.executeSql("INSERT INTO t_p  SELECT 1,2,'a','2023-02-19','12'").await();
        this.tEnv.executeSql("CREATE TABLE t1_p WITH ('partition' = 'dt') AS SELECT * FROM t_p").await();
        Assertions.assertThat(collect("SELECT * FROM t1_p$schemas s").toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"user_id\",\"type\":\"BIGINT\"},{\"id\":1,\"name\":\"item_id\",\"type\":\"BIGINT\"},{\"id\":2,\"name\":\"behavior\",\"type\":\"STRING\"},{\"id\":3,\"name\":\"dt\",\"type\":\"STRING\"},{\"id\":4,\"name\":\"hh\",\"type\":\"STRING\"}], [\"dt\"], [], {}, ]]");
        Assertions.assertThat(collect("SELECT * FROM t1_p").toString()).isEqualTo("[+I[1, 2, a, 2023-02-19, 12]]");
        this.tEnv.executeSql("CREATE TABLE t_pk (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n)").await();
        this.tEnv.executeSql("INSERT INTO t_pk VALUES(1,2,'aaa','2020-01-02','09')").await();
        this.tEnv.executeSql("CREATE TABLE t_pk_as WITH ('primary-key' = 'dt') AS SELECT * FROM t_pk").await();
        Assertions.assertThat(collect("SHOW CREATE TABLE t_pk_as").toString()).contains(new CharSequence[]{"PRIMARY KEY (`dt`)"});
        Assertions.assertThat(collect("SELECT * FROM t_pk_as").toString()).isEqualTo("[+I[1, 2, aaa, 2020-01-02, 09]]");
        this.tEnv.executeSql("CREATE TABLE t_all (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n) PARTITIONED BY (dt, hh)").await();
        this.tEnv.executeSql("INSERT INTO t_all VALUES(1,2,'login','2020-01-02','09')").await();
        this.tEnv.executeSql("CREATE TABLE t_all_as WITH ('primary-key' = 'dt,hh' , 'partition' = 'dt' ) AS SELECT * FROM t_all").await();
        List<Row> collect = collect("SHOW CREATE TABLE t_all_as");
        Assertions.assertThat(collect.toString()).contains(new CharSequence[]{"PRIMARY KEY (`dt`, `hh`)"});
        Assertions.assertThat(collect.toString()).contains(new CharSequence[]{"PARTITIONED BY (`dt`)"});
        Assertions.assertThat(collect("SELECT * FROM t_all_as").toString()).isEqualTo("[+I[1, 2, login, 2020-01-02, 09]]");
        this.tEnv.executeSql("CREATE TABLE t_pk_not_exist (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n)").await();
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql("CREATE TABLE t_pk_not_exist_as WITH ('primary-key' = 'aaa') AS SELECT * FROM t_pk_not_exist").await();
        }).hasRootCauseMessage("Primary key column '[aaa]' is not defined in the schema.");
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql("CREATE TABLE t_pk_ddl_option (                            user_id BIGINT,                            item_id BIGINT,                            behavior STRING,                            dt STRING,                            hh STRING,                            PRIMARY KEY (dt, hh, user_id) NOT ENFORCED                        ) WITH ('primary-key' = 'dt')").await();
        }).hasRootCauseMessage("Cannot define primary key on DDL and table options at the same time.");
        this.tEnv.executeSql("CREATE TABLE t_partition_not_exist (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING\n) PARTITIONED BY (dt, hh) ").await();
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql("CREATE TABLE t_partition_not_exist_as WITH ('partition' = 'aaa') AS SELECT * FROM t_partition_not_exist").await();
        }).hasRootCauseMessage("Partition column '[aaa]' is not defined in the schema.");
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql("CREATE TABLE t_partition_ddl_option (                            user_id BIGINT,                            item_id BIGINT,                            behavior STRING,                            dt STRING,                            hh STRING                        ) PARTITIONED BY (dt, hh)  WITH ('partition' = 'dt')").await();
        }).hasRootCauseMessage("Cannot define partition on DDL and table options at the same time.");
    }

    @Test
    public void testRenameTable() throws Exception {
        this.tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
        this.tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
        this.tEnv.executeSql("INSERT INTO t1 SELECT 1");
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            this.tEnv.executeSql("ALTER TABLE t3 RENAME TO t4");
        }).hasMessage("Table `my_hive`.`test_db`.`t3` doesn't exist or is a temporary table.");
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            this.tEnv.executeSql("ALTER TABLE t1 RENAME TO t2");
        }).hasMessage("Could not execute ALTER TABLE my_hive.test_db.t1 RENAME TO my_hive.test_db.t2");
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            this.tEnv.executeSql("ALTER TABLE t1 RENAME TO T1");
        }).hasMessage("Could not execute ALTER TABLE my_hive.test_db.t1 RENAME TO my_hive.test_db.T1");
        this.tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
        List executeQuery = hiveShell.executeQuery("SHOW TABLES");
        Assert.assertTrue(executeQuery.contains("t3"));
        Assert.assertFalse(executeQuery.contains("t1"));
        Assert.assertEquals(((FlinkCatalog) this.tEnv.getCatalog(this.tEnv.getCurrentCatalog()).get()).catalog().getDataTableLocation(new Identifier("test_db", "t3")).toString(), this.path + "test_db.db" + File.separator + "t3");
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            this.tEnv.executeSql("SELECT * FROM t3");
        }).hasMessageContaining("SQL validation failed. There is no paimond in");
    }

    @Test
    public void testHiveLock() throws InterruptedException {
        this.tEnv.executeSql("CREATE TABLE t (a INT)");
        CatalogLock.Factory factory = (CatalogLock.Factory) ((FlinkCatalog) this.tEnv.getCatalog(this.tEnv.getCurrentCatalog()).get()).catalog().lockFactory().get();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ArrayList arrayList = new ArrayList();
        Callable callable = () -> {
            int i = atomicInteger.get() + 1;
            Thread.sleep(1L);
            atomicInteger.set(i);
            return null;
        };
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                CatalogLock create = factory.create();
                for (int i2 = 0; i2 < 10; i2++) {
                    try {
                        create.runWithLock("test_db", "t", callable);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            thread.start();
            arrayList.add(thread);
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).join();
        }
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(100);
    }

    @Test
    public void testUpperCase() {
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            this.tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        }).hasRootCauseMessage(String.format("Table name[%s] cannot contain upper case in hive catalog", "T"));
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            this.tEnv.executeSql("CREATE TABLE t (A INT, b STRING, C STRING) WITH ( 'file.format' = 'avro')").await();
        }).hasRootCauseMessage(String.format("Field names %s cannot contain upper case in hive catalog", "[A, C]"));
    }

    @Test
    public void testQuickPathInShowTables() throws Exception {
        collect("CREATE TABLE t ( a INT, b STRING )");
        Assert.assertEquals("[+I[t]]", collect("SHOW TABLES").toString());
        new LocalFileIO().delete(new org.apache.paimon.fs.Path(this.path, "test_db.db/t"), true);
        Assert.assertEquals("[]", collect("SHOW TABLES").toString());
    }

    @Test
    public void testCatalogOptionsInheritAndOverride() throws Exception {
        this.tEnv.executeSql(String.join("\n", "CREATE CATALOG my_hive_options WITH (", "  'type' = 'paimon',", "  'metastore' = 'hive',", "  'uri' = '',", "  'warehouse' = '" + this.path + "',", "  'lock.enabled' = 'true',", "  'table-default.opt1' = 'value1',", "  'table-default.opt2' = 'value2',", "  'table-default.opt3' = 'value3'", ")")).await();
        this.tEnv.executeSql("USE CATALOG my_hive_options").await();
        this.tEnv.executeSql("CREATE TABLE table_without_options (a INT, b STRING)").await();
        Identifier identifier = new Identifier("default", "table_without_options");
        Catalog catalog = ((FlinkCatalog) this.tEnv.getCatalog(this.tEnv.getCurrentCatalog()).get()).catalog();
        Map options = catalog.getTable(identifier).options();
        Assertions.assertThat(options).containsEntry("opt1", "value1");
        Assertions.assertThat(options).containsEntry("opt2", "value2");
        Assertions.assertThat(options).containsEntry("opt3", "value3");
        Assertions.assertThat(options).doesNotContainKey("lock.enabled");
        this.tEnv.executeSql("CREATE TABLE table_with_options (a INT, b STRING) WITH ('opt1' = 'new_value')").await();
        Map options2 = catalog.getTable(new Identifier("default", "table_with_options")).options();
        Assertions.assertThat(options2).containsEntry("opt1", "new_value");
        Assertions.assertThat(options2).containsEntry("opt2", "value2");
        Assertions.assertThat(options2).containsEntry("opt3", "value3");
        Assertions.assertThat(options2).doesNotContainKey("lock.enabled");
    }

    protected List<Row> collect(String str) throws Exception {
        ArrayList arrayList = new ArrayList();
        CloseableIterator collect = this.tEnv.executeSql(str).collect();
        Throwable th = null;
        while (collect.hasNext()) {
            try {
                try {
                    arrayList.add(collect.next());
                } finally {
                }
            } catch (Throwable th2) {
                if (collect != null) {
                    if (th != null) {
                        try {
                            collect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        collect.close();
                    }
                }
                throw th2;
            }
        }
        if (collect != null) {
            if (0 != 0) {
                try {
                    collect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                collect.close();
            }
        }
        return arrayList;
    }
}
