package org.apache.paimon.hive;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.paimon.flink.FlinkGenericCatalog;
import org.apache.paimon.flink.FlinkGenericCatalogFactory;
import org.apache.paimon.hive.annotation.Minio;
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
import org.apache.paimon.s3.MinioTestContainer;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;

@RunWith(PaimonEmbeddedHiveRunner.class)
/* loaded from: input_file:org/apache/paimon/hive/FlinkGenericCatalogITCase.class */
public class FlinkGenericCatalogITCase extends AbstractTestBase {

    @Rule
    public TemporaryFolder folder = new TemporaryFolder();
    protected TableEnvironment tEnv;

    @HiveSQL(files = {})
    protected static HiveShell hiveShell;

    @Minio
    private static MinioTestContainer minioTestContainer;

    private static HiveCatalog createHiveCatalog(HiveConf hiveConf) {
        return new HiveCatalog("testcatalog", (String) null, hiveConf, HiveShimLoader.getHiveVersion(), true);
    }

    @Before
    public void before() throws Exception {
        hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
        hiveShell.execute("USE test_db");
        hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
        hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200, 'Table')");
        hiveShell.executeQuery("SHOW TABLES");
        this.tEnv = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        HiveCatalog createHiveCatalog = createHiveCatalog(hiveShell.getHiveConf());
        FlinkGenericCatalog createCatalog = FlinkGenericCatalogFactory.createCatalog(getClass().getClassLoader(), new HashMap(), createHiveCatalog.getName(), createHiveCatalog);
        createCatalog.open();
        this.tEnv.registerCatalog(createHiveCatalog.getName(), createCatalog);
        sql("USE CATALOG " + createHiveCatalog.getName(), new Object[0]);
        sql("USE test_db", new Object[0]);
    }

    @After
    public void after() {
        hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
        hiveShell.execute("DROP DATABASE IF EXISTS test_db2 CASCADE");
    }

    protected List<Row> sql(String str, Object... objArr) {
        try {
            CloseableIterator collect = this.tEnv.executeSql(String.format(str, objArr)).collect();
            Throwable th = null;
            try {
                try {
                    ImmutableList copyOf = ImmutableList.copyOf(collect);
                    if (collect != null) {
                        if (0 != 0) {
                            try {
                                collect.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            collect.close();
                        }
                    }
                    return copyOf;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testPaimonTableToBlackHole() {
        sql("CREATE TABLE paimon_t ( f0 INT, f1 INT ) WITH ('connector'='paimon', 'file.format' = 'avro' )", new Object[0]);
        sql("INSERT INTO paimon_t VALUES (1, 1), (2, 2)", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM paimon_t", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1}), Row.of(new Object[]{2, 2})});
        sql("CREATE TABLE bh (f0 INT, f1 INT) WITH ('connector'='blackhole')", new Object[0]);
        sql("INSERT INTO bh SELECT * FROM paimon_t", new Object[0]);
    }

    @Test
    public void testReadPaimonSystemTable() {
        sql("CREATE TABLE paimon_t (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    PRIMARY KEY (dt, user_id) NOT ENFORCED\n) PARTITIONED BY (dt)  WITH ('connector'='paimon', 'file.format' = 'avro' )", new Object[0]);
        sql("INSERT INTO paimon_t VALUES (1, 2, 'click', '2023-11-01')", new Object[0]);
        sql("INSERT INTO paimon_t VALUES (2, 3, 'click', '2023-11-02')", new Object[0]);
        Assertions.assertThat(sql("SELECT snapshot_id, schema_id, commit_kind FROM paimon_t$snapshots", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1L, 0L, "APPEND"}), Row.of(new Object[]{2L, 0L, "APPEND"})});
    }
}
