package org.apache.flink.connectors.hive;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.class */
public class HiveDynamicPartitionPruningITCase {

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());

    @Parameter
    public boolean enableAdaptiveBatchScheduler;
    private TableEnvironment tableEnv;
    private HiveCatalog hiveCatalog;
    private String warehouse;

    @Parameters(name = "enableAdaptiveBatchScheduler={0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    @BeforeEach
    public void setup() {
        this.hiveCatalog = HiveTestUtils.createHiveCatalog();
        this.hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, false);
        this.hiveCatalog.open();
        this.warehouse = this.hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
        if (this.enableAdaptiveBatchScheduler) {
            this.tableEnv = HiveTestUtils.createTableEnvInBatchModeWithAdaptiveScheduler();
        } else {
            this.tableEnv = HiveTestUtils.createTableEnvInBatchMode();
        }
        this.tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        this.tableEnv.registerCatalog(this.hiveCatalog.getName(), this.hiveCatalog);
        this.tableEnv.useCatalog(this.hiveCatalog.getName());
    }

    @AfterEach
    public void tearDown() {
        if (this.hiveCatalog != null) {
            this.hiveCatalog.close();
        }
        if (this.warehouse != null) {
            FileUtils.deleteDirectoryQuietly(new File(this.warehouse));
        }
    }

    @TestTemplate
    public void testDynamicPartitionPruning() throws Exception {
        this.tableEnv.executeSql("create table dim (x int,y string,z int)");
        this.tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
        this.tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
        this.tableEnv.executeSql("insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ").await();
        this.tableEnv.executeSql("insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ").await();
        this.tableEnv.executeSql("insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ").await();
        System.out.println(this.tableEnv.explainSql("select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a", new ExplainDetail[0]));
        this.tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
        Assertions.assertThat(this.tableEnv.explainSql("select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a", new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat(this.tableEnv.explainSql("select a, b, c, p, x, y from dim, fact where x = p and z = 1 order by a", new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat(queryResult(this.tableEnv.sqlQuery("select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a")).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]");
        Assertions.assertThat(queryResult(this.tableEnv.sqlQuery("select a, b, c, p, x, y from dim, fact where x = p and z = 1 order by a")).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]");
        ((Catalog) this.tableEnv.getCatalog(this.tableEnv.getCurrentCatalog()).get()).alterTableStatistics(new ObjectPath(this.tableEnv.getCurrentDatabase(), "dim"), new CatalogTableStatistics(3L, -1, -1L, -1L), false);
        Assertions.assertThat(queryResult(this.tableEnv.sqlQuery("select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a")).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]");
        Assertions.assertThat(queryResult(this.tableEnv.sqlQuery("select a, b, c, p, x, y from dim, fact where x = p and z = 1 order by a")).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]");
    }

    @TestTemplate
    public void testDynamicPartitionPruningOnTwoFactTables() throws Exception {
        this.tableEnv.executeSql("create table dim (x int,y string,z int)");
        this.tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
        this.tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
        this.tableEnv.executeSql("insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ").await();
        this.tableEnv.executeSql("insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ").await();
        this.tableEnv.executeSql("insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ").await();
        this.tableEnv.executeSql("create table fact2 (a int, b bigint, c string) partitioned by (p int)");
        this.tableEnv.executeSql("insert into fact2 partition (p=1) values (40,100,'aaa'),(41,101,'bbb'),(42,102,'ccc') ").await();
        this.tableEnv.executeSql("insert into fact2 partition (p=2) values (50,200,'aaa'),(51,201,'bbb'),(52,202,'ccc') ").await();
        this.tableEnv.executeSql("insert into fact2 partition (p=3) values (60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ").await();
        this.tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
        Assertions.assertThat(this.tableEnv.explainSql("select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) union all (select a, b, c, p, x, y from fact2, dim where x = p and z = 1)) t order by a", new ExplainDetail[0])).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id=");
        Assertions.assertThat(queryResult(this.tableEnv.sqlQuery("select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) union all (select a, b, c, p, x, y from fact2, dim where x = p and z = 1)) t order by a")).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], +I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], +I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]");
        Assertions.assertThat(this.tableEnv.explainSql("select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) union all (select a, b, c, p, x, y from fact2, dim where x = p and z = 2)) t order by a", new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat(queryResult(this.tableEnv.sqlQuery("select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) union all (select a, b, c, p, x, y from fact2, dim where x = p and z = 2)) t order by a")).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], +I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]");
    }

    private static List<Row> queryResult(Table table) {
        return CollectionUtil.iteratorToList(table.execute().collect());
    }
}
