package org.apache.flink.connectors.hive;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
import org.apache.flink.table.filesystem.PartitionReader;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connectors/hive/HiveLookupJoinITCase.class */
public class HiveLookupJoinITCase {
    private static TableEnvironment tableEnv;
    private static HiveCatalog hiveCatalog;

    @BeforeClass
    public static void setup() {
        tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        TestCollectionTableFactory.initData(Arrays.asList(Row.of(new Object[]{1, "a"}), Row.of(new Object[]{1, "c"}), Row.of(new Object[]{2, "b"}), Row.of(new Object[]{2, "c"}), Row.of(new Object[]{3, "c"}), Row.of(new Object[]{4, "d"})));
        tableEnv.executeSql("create table default_catalog.default_database.probe (x int,y string, p as proctime()) with ('connector'='COLLECTION','is-bounded' = 'false')");
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql(String.format("create table bounded_table (x int, y string, z int) tblproperties ('%s'='5min')", FileSystemConnectorOptions.LOOKUP_JOIN_CACHE_TTL.key()));
        tableEnv.executeSql(String.format("create table bounded_partition_table (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s'='5min')", FileSystemConnectorOptions.LOOKUP_JOIN_CACHE_TTL.key()));
        tableEnv.executeSql(String.format("create table partition_table (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = 'partition-name', '%s'='2h')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER.key(), FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key()));
        tableEnv.executeSql(String.format("create table partition_table_1 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'latest', '%s'='120min')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key()));
        tableEnv.executeSql(String.format("create table partition_table_2 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ('%s' = 'true', '%s' = 'latest', '%s' = '12h', '%s' = 'partition-time',  '%s' = 'default', '%s' = '$pt_year-$pt_mon-$pt_day 00:00:00')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER.key(), FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_KIND.key(), FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()));
        tableEnv.executeSql(String.format("create table partition_table_3 (x int, y string, z int) partitioned by ( pt_year int, pt_mon string, pt_day string) tblproperties ( '%s' = 'true', '%s' = 'latest', '%s' = 'create-time')", FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_ORDER.key()));
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
    }

    @Test
    public void testLookupOptions() throws Exception {
        FileSystemLookupFunction<HiveTablePartition> lookupFunction = getLookupFunction("bounded_table");
        FileSystemLookupFunction<HiveTablePartition> lookupFunction2 = getLookupFunction("partition_table");
        lookupFunction.open((FunctionContext) null);
        lookupFunction2.open((FunctionContext) null);
        Assert.assertEquals(Duration.ofMinutes(5L), lookupFunction.getReloadInterval());
        Assert.assertEquals(Duration.ofMinutes(120L), lookupFunction2.getReloadInterval());
    }

    @Test
    public void testPartitionFetcherAndReader() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("insert overwrite partition_table values (1,'a',08,2019,'08','01'),(1,'a',10,2020,'08','31'),(2,'a',21,2020,'08','31'),(2,'b',22,2020,'08','31'),(3,'c',33,2020,'09','31')").await();
        FileSystemLookupFunction<HiveTablePartition> lookupFunction = getLookupFunction("partition_table");
        lookupFunction.open((FunctionContext) null);
        List fetch = lookupFunction.getPartitionFetcher().fetch(lookupFunction.getFetcherContext());
        Assert.assertEquals(1L, fetch.size());
        PartitionReader partitionReader = lookupFunction.getPartitionReader();
        partitionReader.open(fetch);
        ArrayList arrayList = new ArrayList();
        CatalogTable table = hiveCatalog.getTable(ObjectIdentifier.of(hiveCatalog.getName(), "default", "partition_table").toObjectPath());
        GenericRowData genericRowData = new GenericRowData(table.getSchema().getFieldCount());
        TypeSerializer create = InternalSerializers.create(table.getSchema().toRowDataType().getLogicalType());
        while (true) {
            RowData rowData = (RowData) partitionReader.read(genericRowData);
            if (rowData == null) {
                arrayList.sort(Comparator.comparingInt(rowData2 -> {
                    return rowData2.getInt(0);
                }));
                Assert.assertEquals("[+I(3,c,33,2020,09,31)]", arrayList.toString());
                return;
            }
            arrayList.add(create.copy(rowData));
        }
    }

    @Test
    public void testLookupJoinBoundedTable() throws Exception {
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tableEnv.executeSql("insert into bounded_table values (1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)").await();
        Assert.assertEquals("[+I[1, a, 10], +I[2, b, 22], +I[3, c, 33]]", CollectionUtil.iteratorToList(tableEnv.sqlQuery("select p.x, p.y, b.z from  default_catalog.default_database.probe as p  join bounded_table for system_time as of p.p as b on p.x=b.x and p.y=b.y").execute().collect()).toString());
    }

    @Test
    public void testLookupJoinBoundedPartitionedTable() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("insert overwrite bounded_partition_table values (1,'a',08,2019,'08','01'),(1,'a',10,2020,'08','31'),(2,'a',21,2020,'08','31'),(2,'b',22,2020,'08','31')").await();
        Assert.assertEquals("[+I[1, a, 8, 2019, 08, 01], +I[1, a, 10, 2020, 08, 31], +I[2, b, 22, 2020, 08, 31]]", CollectionUtil.iteratorToList(tableEnv.sqlQuery("select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day from  default_catalog.default_database.probe as p join bounded_partition_table for system_time as of p.p as b on p.x=b.x and p.y=b.y").execute().collect()).toString());
    }

    @Test
    public void testLookupJoinPartitionedTable() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("insert overwrite partition_table_1 values (1,'a',08,2019,'09','01'),(1,'a',10,2020,'09','31'),(2,'a',21,2020,'09','31'),(2,'b',22,2020,'09','31'),(3,'c',33,2020,'09','31'),(1,'a',101,2020,'08','01'),(2,'a',121,2020,'08','01'),(2,'b',122,2020,'08','01')").await();
        Assert.assertEquals("[+I[1, a, 10, 2020, 09, 31], +I[2, b, 22, 2020, 09, 31], +I[3, c, 33, 2020, 09, 31]]", CollectionUtil.iteratorToList(tableEnv.sqlQuery("select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day from  default_catalog.default_database.probe as p join partition_table_1 for system_time as of p.p as b on p.x=b.x and p.y=b.y").execute().collect()).toString());
    }

    @Test
    public void testLookupJoinPartitionedTableWithPartitionTime() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("insert overwrite partition_table_2 values (1,'a',08,2020,'08','01'),(1,'a',10,2020,'08','31'),(2,'a',21,2019,'08','31'),(2,'b',22,2020,'08','31'),(3,'c',33,2017,'08','31'),(1,'a',101,2017,'09','01'),(2,'a',121,2019,'09','01'),(2,'b',122,2019,'09','01')").await();
        Assert.assertEquals("[+I[1, a, 10, 2020, 08, 31], +I[2, b, 22, 2020, 08, 31]]", CollectionUtil.iteratorToList(tableEnv.sqlQuery("select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day from  default_catalog.default_database.probe as p join partition_table_2 for system_time as of p.p as b on p.x=b.x and p.y=b.y").execute().collect()).toString());
    }

    @Test
    public void testLookupJoinPartitionedTableWithCreateTime() throws Exception {
        TableEnvironment createTableEnvInBatchMode = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        createTableEnvInBatchMode.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        createTableEnvInBatchMode.useCatalog(hiveCatalog.getName());
        createTableEnvInBatchMode.executeSql("insert overwrite partition_table_3 values (1,'a',08,2020,'month1','01'),(1,'a',10,2020,'month2','02'),(2,'a',21,2020,'month1','02'),(2,'b',22,2020,'month3','20'),(3,'c',22,2020,'month3','20'),(3,'c',33,2017,'08','31'),(1,'a',101,2017,'09','01'),(2,'a',121,2019,'09','01'),(2,'b',122,2019,'09','01')").await();
        createTableEnvInBatchMode.executeSql("insert overwrite partition_table_3 values (1,'a',101,2020,'08','01'),(2,'a',121,2020,'08','01'),(2,'b',122,2020,'08','01')").await();
        Assert.assertEquals("[+I[1, a, 101, 2020, 08, 01], +I[2, b, 122, 2020, 08, 01]]", CollectionUtil.iteratorToList(tableEnv.sqlQuery("select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day from  default_catalog.default_database.probe as p join partition_table_3 for system_time as of p.p as b on p.x=b.x and p.y=b.y").execute().collect()).toString());
    }

    /* JADX WARN: Type inference failed for: r1v5, types: [int[], int[][]] */
    private FileSystemLookupFunction<HiveTablePartition> getLookupFunction(String str) throws Exception {
        TableEnvironmentInternal tableEnvironmentInternal = tableEnv;
        ObjectIdentifier of = ObjectIdentifier.of(hiveCatalog.getName(), "default", str);
        return FactoryUtil.createTableSource(hiveCatalog, of, tableEnvironmentInternal.getCatalogManager().resolveCatalogTable(hiveCatalog.getTable(of.toObjectPath())), tableEnv.getConfig().getConfiguration(), Thread.currentThread().getContextClassLoader(), false).getLookupFunction((int[][]) new int[]{new int[]{0}});
    }

    @AfterClass
    public static void tearDown() {
        tableEnv.executeSql("drop table bounded_table");
        tableEnv.executeSql("drop table bounded_partition_table");
        tableEnv.executeSql("drop table partition_table");
        tableEnv.executeSql("drop table partition_table_1");
        tableEnv.executeSql("drop table partition_table_2");
        tableEnv.executeSql("drop table partition_table_3");
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }
}
