package org.apache.iceberg.flink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/TestIcebergConnector.class */
public class TestIcebergConnector extends FlinkTestBase {
    private static final String TABLE_NAME = "test_table";

    @ClassRule
    public static final TemporaryFolder WAREHOUSE = new TemporaryFolder();
    private final String catalogName;
    private final Map<String, String> properties;
    private final boolean isStreaming;
    private volatile TableEnvironment tEnv;

    @Parameterized.Parameters(name = "catalogName = {0}, properties = {1}, isStreaming={2}")
    public static Iterable<Object[]> parameters() {
        return Lists.newArrayList(new Object[]{new Object[]{"testhadoop", ImmutableMap.of("connector", "iceberg", "catalog-type", "hadoop"), true}, new Object[]{"testhadoop", ImmutableMap.of("connector", "iceberg", "catalog-type", "hadoop", "catalog-table", "not_existing_table"), true}, new Object[]{"testhadoop", ImmutableMap.of("connector", "iceberg", "catalog-type", "hadoop"), false}, new Object[]{"testhadoop", ImmutableMap.of("connector", "iceberg", "catalog-type", "hadoop", "catalog-database", "not_existing_db"), true}, new Object[]{"testhadoop", ImmutableMap.of("connector", "iceberg", "catalog-type", "hadoop", "catalog-database", "not_existing_db", "catalog-table", "not_existing_table"), true}, new Object[]{"testhadoop", ImmutableMap.of("connector", "iceberg", "catalog-type", "hadoop", "catalog-database", "not_existing_db"), false}, new Object[]{"testhive", ImmutableMap.of("connector", "iceberg", "catalog-type", "hive"), true}, new Object[]{"testhive", ImmutableMap.of("connector", "iceberg", "catalog-type", "hive", "catalog-table", "not_existing_table"), true}, new Object[]{"testhive", ImmutableMap.of("connector", "iceberg", "catalog-type", "hive"), false}, new Object[]{"testhive", ImmutableMap.of("connector", "iceberg", "catalog-type", "hive", "catalog-database", "not_existing_db"), true}, new Object[]{"testhive", ImmutableMap.of("connector", "iceberg", "catalog-type", "hive", "catalog-database", "not_existing_db", "catalog-table", "not_existing_table"), true}, new Object[]{"testhive", ImmutableMap.of("connector", "iceberg", "catalog-type", "hive", "catalog-database", "not_existing_db"), false}});
    }

    public TestIcebergConnector(String str, Map<String, String> map, boolean z) {
        this.catalogName = str;
        this.properties = map;
        this.isStreaming = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.flink.FlinkTestBase
    public TableEnvironment getTableEnv() {
        if (this.tEnv == null) {
            synchronized (this) {
                if (this.tEnv == null) {
                    EnvironmentSettings.Builder useBlinkPlanner = EnvironmentSettings.newInstance().useBlinkPlanner();
                    if (this.isStreaming) {
                        useBlinkPlanner.inStreamingMode();
                        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
                        executionEnvironment.enableCheckpointing(400L);
                        executionEnvironment.setMaxParallelism(2);
                        executionEnvironment.setParallelism(2);
                        this.tEnv = StreamTableEnvironment.create(executionEnvironment, useBlinkPlanner.build());
                    } else {
                        useBlinkPlanner.inBatchMode();
                        this.tEnv = TableEnvironment.create(useBlinkPlanner.build());
                    }
                    this.tEnv.getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1).set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
                }
            }
        }
        return this.tEnv;
    }

    @After
    public void after() throws TException {
        sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
        if (isHiveCatalog()) {
            HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
            try {
                hiveMetaStoreClient.dropTable(databaseName(), tableName());
                if (!isDefaultDatabaseName()) {
                    try {
                        hiveMetaStoreClient.dropDatabase(databaseName());
                    } catch (Exception e) {
                    }
                }
            } finally {
                hiveMetaStoreClient.close();
            }
        }
    }

    private void testCreateConnectorTable() {
        Map<String, String> createTableProps = createTableProps();
        sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", TABLE_NAME, toWithClause(createTableProps));
        sql("INSERT INTO %s VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC')", TABLE_NAME);
        Assert.assertEquals("Should have expected rows", Sets.newHashSet(new Row[]{Row.of(new Object[]{1L, "AAA"}), Row.of(new Object[]{2L, "BBB"}), Row.of(new Object[]{3L, "CCC"})}), Sets.newHashSet(sql("SELECT * FROM %s", TABLE_NAME)));
        Catalog createCatalog = new FlinkCatalogFactory().createCatalog(this.catalogName, createTableProps, new Configuration());
        Assert.assertTrue("Should have created the expected database", createCatalog.databaseExists(databaseName()));
        Assert.assertTrue("Should have created the expected table", createCatalog.tableExists(new ObjectPath(databaseName(), tableName())));
        sql("DROP TABLE %s", TABLE_NAME);
        sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", TABLE_NAME, toWithClause(createTableProps));
        Assert.assertEquals("Should have expected rows", Sets.newHashSet(new Row[]{Row.of(new Object[]{1L, "AAA"}), Row.of(new Object[]{2L, "BBB"}), Row.of(new Object[]{3L, "CCC"})}), Sets.newHashSet(sql("SELECT * FROM %s", TABLE_NAME)));
    }

    @Test
    public void testCreateTableUnderDefaultDatabase() {
        testCreateConnectorTable();
    }

    @Test
    public void testCatalogDatabaseConflictWithFlinkDatabase() {
        sql("CREATE DATABASE IF NOT EXISTS `%s`", databaseName());
        sql("USE `%s`", databaseName());
        try {
            testCreateConnectorTable();
            AssertHelpers.assertThrows("Table should already exists", ValidationException.class, "Could not execute CreateTable in path", () -> {
                return sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME);
            });
            sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME);
            if (isDefaultDatabaseName()) {
                return;
            }
            sql("DROP DATABASE `%s`", databaseName());
        } catch (Throwable th) {
            sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME);
            if (!isDefaultDatabaseName()) {
                sql("DROP DATABASE `%s`", databaseName());
            }
            throw th;
        }
    }

    @Test
    public void testConnectorTableInIcebergCatalog() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("type", "iceberg");
        if (isHiveCatalog()) {
            newHashMap.put("catalog-type", "hive");
            newHashMap.put("uri", FlinkCatalogTestBase.getURI(hiveConf));
        } else {
            newHashMap.put("catalog-type", "hadoop");
        }
        newHashMap.put("warehouse", createWarehouse());
        Map<String, String> createTableProps = createTableProps();
        sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(newHashMap));
        try {
            AssertHelpers.assertThrowsCause("Cannot create the iceberg connector table in iceberg catalog", IllegalArgumentException.class, "Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog", () -> {
                sql("CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s", TestFixtures.DATABASE, TABLE_NAME, toWithClause(createTableProps));
            });
            sql("DROP CATALOG IF EXISTS `test_catalog`", new Object[0]);
        } catch (Throwable th) {
            sql("DROP CATALOG IF EXISTS `test_catalog`", new Object[0]);
            throw th;
        }
    }

    private Map<String, String> createTableProps() {
        HashMap newHashMap = Maps.newHashMap(this.properties);
        newHashMap.put("catalog-name", this.catalogName);
        newHashMap.put("warehouse", createWarehouse());
        if (isHiveCatalog()) {
            newHashMap.put("uri", FlinkCatalogTestBase.getURI(hiveConf));
        }
        return newHashMap;
    }

    private boolean isHiveCatalog() {
        return "testhive".equalsIgnoreCase(this.catalogName);
    }

    private boolean isDefaultDatabaseName() {
        return TestFixtures.DATABASE.equalsIgnoreCase(databaseName());
    }

    private String tableName() {
        return this.properties.getOrDefault("catalog-table", TABLE_NAME);
    }

    private String databaseName() {
        return this.properties.getOrDefault("catalog-database", "default_database");
    }

    private String toWithClause(Map<String, String> map) {
        return FlinkCatalogTestBase.toWithClause(map);
    }

    private static String createWarehouse() {
        try {
            return String.format("file://%s", WAREHOUSE.newFolder().getAbsolutePath());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}
