/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.ConsumeOrder;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.read.HiveContinuousMonitoringFunction;
import org.apache.flink.connectors.hive.read.HiveTableFileInputFormat;
import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.filesystem.DefaultPartTimeExtractor;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.sources.LimitableTableSource;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.PartitionableTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveTableSource
implements StreamTableSource<RowData>,
PartitionableTableSource,
ProjectableTableSource<RowData>,
LimitableTableSource<RowData>,
LookupableTableSource<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(HiveTableSource.class);
    private final JobConf jobConf;
    private final ReadableConfig flinkConf;
    private final ObjectPath tablePath;
    private final CatalogTable catalogTable;
    @Nullable
    private List<Map<String, String>> remainingPartitions = null;
    private String hiveVersion;
    private HiveShim hiveShim;
    private boolean partitionPruned;
    private int[] projectedFields;
    private boolean isLimitPushDown = false;
    private long limit = -1L;
    private Duration hiveTableCacheTTL;

    public HiveTableSource(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath, CatalogTable catalogTable) {
        this.jobConf = (JobConf)Preconditions.checkNotNull((Object)jobConf);
        this.flinkConf = (ReadableConfig)Preconditions.checkNotNull((Object)flinkConf);
        this.tablePath = (ObjectPath)Preconditions.checkNotNull((Object)tablePath);
        this.catalogTable = (CatalogTable)Preconditions.checkNotNull((Object)catalogTable);
        this.hiveVersion = (String)Preconditions.checkNotNull((Object)jobConf.get("hive-version"), (String)"Hive version is not defined");
        this.hiveShim = HiveShimLoader.loadHiveShim(this.hiveVersion);
        this.partitionPruned = false;
    }

    private HiveTableSource(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath, CatalogTable catalogTable, List<Map<String, String>> remainingPartitions, String hiveVersion, boolean partitionPruned, int[] projectedFields, boolean isLimitPushDown, long limit) {
        this.jobConf = (JobConf)Preconditions.checkNotNull((Object)jobConf);
        this.flinkConf = (ReadableConfig)Preconditions.checkNotNull((Object)flinkConf);
        this.tablePath = (ObjectPath)Preconditions.checkNotNull((Object)tablePath);
        this.catalogTable = (CatalogTable)Preconditions.checkNotNull((Object)catalogTable);
        this.remainingPartitions = remainingPartitions;
        this.hiveVersion = hiveVersion;
        this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
        this.partitionPruned = partitionPruned;
        this.projectedFields = projectedFields;
        this.isLimitPushDown = isLimitPushDown;
        this.limit = limit;
    }

    public boolean isBounded() {
        return !this.isStreamingSource();
    }

    public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
        List<HiveTablePartition> allHivePartitions = this.initAllPartitions();
        TypeInformation typeInfo = TypeInfoDataTypeConverter.fromDataTypeToTypeInfo((DataType)this.getProducedDataType());
        HiveTableInputFormat inputFormat = this.getInputFormat(allHivePartitions, (Boolean)this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
        if (this.isStreamingSource()) {
            if (this.catalogTable.getPartitionKeys().isEmpty()) {
                return this.createStreamSourceForNonPartitionTable(execEnv, (TypeInformation<RowData>)typeInfo, inputFormat, allHivePartitions.get(0));
            }
            return this.createStreamSourceForPartitionTable(execEnv, (TypeInformation<RowData>)typeInfo, inputFormat);
        }
        return this.createBatchSource(execEnv, (TypeInformation<RowData>)typeInfo, inputFormat);
    }

    private boolean isStreamingSource() {
        return Boolean.parseBoolean(this.catalogTable.getOptions().getOrDefault(FileSystemOptions.STREAMING_SOURCE_ENABLE.key(), ((Boolean)FileSystemOptions.STREAMING_SOURCE_ENABLE.defaultValue()).toString()));
    }

    private DataStream<RowData> createBatchSource(StreamExecutionEnvironment execEnv, TypeInformation<RowData> typeInfo, HiveTableInputFormat inputFormat) {
        DataStreamSource source = execEnv.createInput((InputFormat)inputFormat, typeInfo);
        int parallelism = (Integer)this.flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
        if (((Boolean)this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)).booleanValue()) {
            int splitNum;
            int max = (Integer)this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
            if (max < 1) {
                throw new IllegalConfigurationException(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
            }
            try {
                long nano1 = System.nanoTime();
                splitNum = inputFormat.createInputSplits(0).length;
                long nano2 = System.nanoTime();
                LOG.info("Hive source({}}) createInputSplits use time: {} ms", (Object)this.tablePath, (Object)((nano2 - nano1) / 1000000L));
            }
            catch (IOException e) {
                throw new FlinkHiveException(e);
            }
            parallelism = Math.min(splitNum, max);
        }
        parallelism = this.limit > 0L ? Math.min(parallelism, (int)this.limit / 1000) : parallelism;
        parallelism = Math.max(1, parallelism);
        source.setParallelism(parallelism);
        return source.name(this.explainSource());
    }

    private DataStream<RowData> createStreamSourceForPartitionTable(StreamExecutionEnvironment execEnv, TypeInformation<RowData> typeInfo, HiveTableInputFormat inputFormat) {
        Configuration configuration = new Configuration();
        this.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)configuration).setString(arg_0, arg_1));
        String consumeOrderStr = (String)configuration.get(FileSystemOptions.STREAMING_SOURCE_CONSUME_ORDER);
        ConsumeOrder consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
        String consumeOffset = (String)configuration.get(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET);
        String extractorKind = (String)configuration.get(FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND);
        String extractorClass = (String)configuration.get(FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS);
        String extractorPattern = (String)configuration.get(FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
        Duration monitorInterval = (Duration)configuration.get(FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL);
        HiveContinuousMonitoringFunction monitoringFunction = new HiveContinuousMonitoringFunction(this.hiveShim, this.jobConf, this.tablePath, this.catalogTable, execEnv.getParallelism(), consumeOrder, consumeOffset, extractorKind, extractorClass, extractorPattern, monitorInterval.toMillis());
        ContinuousFileReaderOperatorFactory factory = new ContinuousFileReaderOperatorFactory((InputFormat)inputFormat);
        String sourceName = "HiveMonitoringFunction";
        SingleOutputStreamOperator source = execEnv.addSource((SourceFunction)monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, (OneInputStreamOperatorFactory)factory);
        return new DataStreamSource(source);
    }

    private DataStream<RowData> createStreamSourceForNonPartitionTable(StreamExecutionEnvironment execEnv, TypeInformation<RowData> typeInfo, HiveTableInputFormat inputFormat, HiveTablePartition hiveTable) {
        HiveTableFileInputFormat fileInputFormat = new HiveTableFileInputFormat(inputFormat, hiveTable);
        Configuration configuration = new Configuration();
        this.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)configuration).setString(arg_0, arg_1));
        String consumeOrderStr = (String)configuration.get(FileSystemOptions.STREAMING_SOURCE_CONSUME_ORDER);
        ConsumeOrder consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
        if (consumeOrder != ConsumeOrder.CREATE_TIME_ORDER) {
            throw new UnsupportedOperationException("Only " + (Object)((Object)ConsumeOrder.CREATE_TIME_ORDER) + " is supported for non partition table.");
        }
        String consumeOffset = (String)configuration.get(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET);
        long currentReadTime = TimestampData.fromLocalDateTime((LocalDateTime)DefaultPartTimeExtractor.toLocalDateTime((String)consumeOffset)).toTimestamp().getTime();
        Duration monitorInterval = (Duration)configuration.get(FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL);
        ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction((FileInputFormat)fileInputFormat, FileProcessingMode.PROCESS_CONTINUOUSLY, execEnv.getParallelism(), monitorInterval.toMillis(), currentReadTime);
        ContinuousFileReaderOperatorFactory factory = new ContinuousFileReaderOperatorFactory((InputFormat)fileInputFormat);
        String sourceName = "HiveFileMonitoringFunction";
        SingleOutputStreamOperator source = execEnv.addSource((SourceFunction)monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, (OneInputStreamOperatorFactory)factory);
        return new DataStreamSource(source);
    }

    @VisibleForTesting
    HiveTableInputFormat getInputFormat(List<HiveTablePartition> allHivePartitions, boolean useMapRedReader) {
        return new HiveTableInputFormat(this.jobConf, this.catalogTable, allHivePartitions, this.projectedFields, this.limit, this.hiveVersion, useMapRedReader);
    }

    public TableSchema getTableSchema() {
        return this.catalogTable.getSchema();
    }

    public DataType getProducedDataType() {
        return (DataType)this.getProducedTableSchema().toRowDataType().bridgedTo(RowData.class);
    }

    private TableSchema getProducedTableSchema() {
        TableSchema fullSchema = this.getTableSchema();
        if (this.projectedFields == null) {
            return fullSchema;
        }
        String[] fullNames = fullSchema.getFieldNames();
        DataType[] fullTypes = fullSchema.getFieldDataTypes();
        return TableSchema.builder().fields((String[])Arrays.stream(this.projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new), (DataType[])Arrays.stream(this.projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
    }

    public boolean isLimitPushedDown() {
        return this.isLimitPushDown;
    }

    public TableSource<RowData> applyLimit(long limit) {
        return new HiveTableSource(this.jobConf, this.flinkConf, this.tablePath, this.catalogTable, this.remainingPartitions, this.hiveVersion, this.partitionPruned, this.projectedFields, true, limit);
    }

    public List<Map<String, String>> getPartitions() {
        throw new UnsupportedOperationException("Please use Catalog API to retrieve all partitions of a table");
    }

    public TableSource<RowData> applyPartitionPruning(List<Map<String, String>> remainingPartitions) {
        if (this.catalogTable.getPartitionKeys() == null || this.catalogTable.getPartitionKeys().size() == 0) {
            return this;
        }
        return new HiveTableSource(this.jobConf, this.flinkConf, this.tablePath, this.catalogTable, remainingPartitions, this.hiveVersion, true, this.projectedFields, this.isLimitPushDown, this.limit);
    }

    public TableSource<RowData> projectFields(int[] fields) {
        return new HiveTableSource(this.jobConf, this.flinkConf, this.tablePath, this.catalogTable, this.remainingPartitions, this.hiveVersion, this.partitionPruned, fields, this.isLimitPushDown, this.limit);
    }

    private List<HiveTablePartition> initAllPartitions() {
        ArrayList<HiveTablePartition> allHivePartitions = new ArrayList<HiveTablePartition>();
        try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(new HiveConf((org.apache.hadoop.conf.Configuration)this.jobConf, HiveConf.class), this.hiveVersion);){
            String dbName = this.tablePath.getDatabaseName();
            String tableName = this.tablePath.getObjectName();
            List partitionColNames = this.catalogTable.getPartitionKeys();
            Table hiveTable = client.getTable(dbName, tableName);
            Properties tableProps = HiveReflectionUtils.getTableMetadata(this.hiveShim, hiveTable);
            String ttlStr = tableProps.getProperty(FileSystemOptions.LOOKUP_JOIN_CACHE_TTL.key());
            Duration duration = this.hiveTableCacheTTL = ttlStr != null ? TimeUtils.parseDuration((String)ttlStr) : (Duration)FileSystemOptions.LOOKUP_JOIN_CACHE_TTL.defaultValue();
            if (partitionColNames != null && partitionColNames.size() > 0) {
                String defaultPartitionName = this.jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
                ArrayList<Partition> partitions = new ArrayList<Partition>();
                if (this.remainingPartitions != null) {
                    for (Map<String, String> spec : this.remainingPartitions) {
                        partitions.add(client.getPartition(dbName, tableName, HiveTableSource.partitionSpecToValues(spec, partitionColNames)));
                    }
                } else {
                    partitions.addAll(client.listPartitions(dbName, tableName, (short)-1));
                }
                for (Partition partition : partitions) {
                    HiveTablePartition hiveTablePartition = HiveTableSource.toHiveTablePartition(this.catalogTable.getPartitionKeys(), this.catalogTable.getSchema().getFieldNames(), this.catalogTable.getSchema().getFieldDataTypes(), this.hiveShim, tableProps, defaultPartitionName, partition);
                    allHivePartitions.add(hiveTablePartition);
                }
            } else {
                allHivePartitions.add(new HiveTablePartition(hiveTable.getSd(), tableProps));
            }
        }
        catch (TException e) {
            throw new FlinkHiveException("Failed to collect all partitions from hive metaStore", e);
        }
        return allHivePartitions;
    }

    public static HiveTablePartition toHiveTablePartition(List<String> partitionKeys, String[] fieldNames, DataType[] fieldTypes, HiveShim shim, Properties tableProps, String defaultPartitionName, Partition partition) {
        StorageDescriptor sd = partition.getSd();
        HashMap<String, Object> partitionColValues = new HashMap<String, Object>();
        List<String> nameList = Arrays.asList(fieldNames);
        for (int i = 0; i < partitionKeys.size(); ++i) {
            LogicalTypeRoot typeRoot;
            String partitionColName = partitionKeys.get(i);
            String partitionValue = partition.getValues().get(i);
            DataType type = fieldTypes[nameList.indexOf(partitionColName)];
            Object partitionObject = defaultPartitionName.equals(partitionValue) ? ((typeRoot = type.getLogicalType().getTypeRoot()) == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR ? defaultPartitionName : null) : HiveTableSource.restorePartitionValueFromFromType(shim, partitionValue, type);
            partitionColValues.put(partitionColName, partitionObject);
        }
        return new HiveTablePartition(sd, partitionColValues, tableProps);
    }

    private static List<String> partitionSpecToValues(Map<String, String> spec, List<String> partitionColNames) {
        Preconditions.checkArgument((spec.size() == partitionColNames.size() && spec.keySet().containsAll(partitionColNames) ? 1 : 0) != 0, (String)"Partition spec (%s) and partition column names (%s) doesn't match", (Object[])new Object[]{spec, partitionColNames});
        return partitionColNames.stream().map(spec::get).collect(Collectors.toList());
    }

    private static Object restorePartitionValueFromFromType(HiveShim shim, String valStr, DataType type) {
        LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot();
        switch (typeRoot) {
            case CHAR: 
            case VARCHAR: {
                return valStr;
            }
            case BOOLEAN: {
                return Boolean.parseBoolean(valStr);
            }
            case TINYINT: {
                return Integer.valueOf(valStr).byteValue();
            }
            case SMALLINT: {
                return Short.valueOf(valStr);
            }
            case INTEGER: {
                return Integer.valueOf(valStr);
            }
            case BIGINT: {
                return Long.valueOf(valStr);
            }
            case FLOAT: {
                return Float.valueOf(valStr);
            }
            case DOUBLE: {
                return Double.valueOf(valStr);
            }
            case DATE: {
                return HiveInspectors.toFlinkObject(HiveInspectors.getObjectInspector(type), shim.toHiveDate(Date.valueOf(valStr)), shim);
            }
            case TIMESTAMP_WITHOUT_TIME_ZONE: {
                return HiveInspectors.toFlinkObject(HiveInspectors.getObjectInspector(type), shim.toHiveTimestamp(Timestamp.valueOf(valStr)), shim);
            }
        }
        throw new FlinkHiveException(new IllegalArgumentException(String.format("Can not convert %s to type %s for partition value", valStr, type)));
    }

    public String explainSource() {
        String explain = String.format(" TablePath: %s, PartitionPruned: %s, PartitionNums: %d", this.tablePath.getFullName(), this.partitionPruned, null == this.remainingPartitions ? null : Integer.valueOf(this.remainingPartitions.size()));
        if (this.projectedFields != null) {
            explain = explain + ", ProjectedFields: " + Arrays.toString(this.projectedFields);
        }
        if (this.isLimitPushDown) {
            explain = explain + String.format(", LimitPushDown %s, Limit %d", this.isLimitPushDown, this.limit);
        }
        return TableConnectorUtils.generateRuntimeName(this.getClass(), (String[])this.getTableSchema().getFieldNames()) + explain;
    }

    public TableFunction<RowData> getLookupFunction(String[] lookupKeys) {
        List<HiveTablePartition> allPartitions = this.initAllPartitions();
        TableSchema producedSchema = this.getProducedTableSchema();
        return new FileSystemLookupFunction((InputFormat)this.getInputFormat(allPartitions, (Boolean)this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)), lookupKeys, producedSchema.getFieldNames(), producedSchema.getFieldDataTypes(), this.hiveTableCacheTTL);
    }

    public AsyncTableFunction<RowData> getAsyncLookupFunction(String[] lookupKeys) {
        throw new UnsupportedOperationException("Hive table doesn't support async lookup");
    }

    public boolean isAsyncEnabled() {
        return false;
    }
}

