package org.apache.flink.connectors.hive;

import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.read.HiveContinuousMonitoringFunction;
import org.apache.flink.connectors.hive.read.HiveTableFileInputFormat;
import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.filesystem.DefaultPartTimeExtractor;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.sources.LimitableTableSource;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.PartitionableTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSource.class */
public class HiveTableSource implements StreamTableSource<RowData>, PartitionableTableSource, ProjectableTableSource<RowData>, LimitableTableSource<RowData>, LookupableTableSource<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(HiveTableSource.class);
    private final JobConf jobConf;
    private final ReadableConfig flinkConf;
    private final ObjectPath tablePath;
    private final CatalogTable catalogTable;

    @Nullable
    private List<Map<String, String>> remainingPartitions;
    private String hiveVersion;
    private HiveShim hiveShim;
    private boolean partitionPruned;
    private int[] projectedFields;
    private boolean isLimitPushDown;
    private long limit;
    private Duration hiveTableCacheTTL;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.connectors.hive.HiveTableSource$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSource$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot = new int[LogicalTypeRoot.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.CHAR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.VARCHAR.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.BOOLEAN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.TINYINT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.SMALLINT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.INTEGER.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.BIGINT.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.FLOAT.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.DOUBLE.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.DATE.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
        }
    }

    public HiveTableSource(JobConf jobConf, ReadableConfig readableConfig, ObjectPath objectPath, CatalogTable catalogTable) {
        this.remainingPartitions = null;
        this.isLimitPushDown = false;
        this.limit = -1L;
        this.jobConf = (JobConf) Preconditions.checkNotNull(jobConf);
        this.flinkConf = (ReadableConfig) Preconditions.checkNotNull(readableConfig);
        this.tablePath = (ObjectPath) Preconditions.checkNotNull(objectPath);
        this.catalogTable = (CatalogTable) Preconditions.checkNotNull(catalogTable);
        this.hiveVersion = (String) Preconditions.checkNotNull(jobConf.get("hive-version"), "Hive version is not defined");
        this.hiveShim = HiveShimLoader.loadHiveShim(this.hiveVersion);
        this.partitionPruned = false;
    }

    private HiveTableSource(JobConf jobConf, ReadableConfig readableConfig, ObjectPath objectPath, CatalogTable catalogTable, List<Map<String, String>> list, String str, boolean z, int[] iArr, boolean z2, long j) {
        this.remainingPartitions = null;
        this.isLimitPushDown = false;
        this.limit = -1L;
        this.jobConf = (JobConf) Preconditions.checkNotNull(jobConf);
        this.flinkConf = (ReadableConfig) Preconditions.checkNotNull(readableConfig);
        this.tablePath = (ObjectPath) Preconditions.checkNotNull(objectPath);
        this.catalogTable = (CatalogTable) Preconditions.checkNotNull(catalogTable);
        this.remainingPartitions = list;
        this.hiveVersion = str;
        this.hiveShim = HiveShimLoader.loadHiveShim(str);
        this.partitionPruned = z;
        this.projectedFields = iArr;
        this.isLimitPushDown = z2;
        this.limit = j;
    }

    public boolean isBounded() {
        return !isStreamingSource();
    }

    public DataStream<RowData> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        List<HiveTablePartition> initAllPartitions = initAllPartitions();
        TypeInformation<RowData> fromDataTypeToTypeInfo = TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
        HiveTableInputFormat inputFormat = getInputFormat(initAllPartitions, ((Boolean) this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)).booleanValue());
        return isStreamingSource() ? this.catalogTable.getPartitionKeys().isEmpty() ? createStreamSourceForNonPartitionTable(streamExecutionEnvironment, fromDataTypeToTypeInfo, inputFormat, initAllPartitions.get(0)) : createStreamSourceForPartitionTable(streamExecutionEnvironment, fromDataTypeToTypeInfo, inputFormat) : createBatchSource(streamExecutionEnvironment, fromDataTypeToTypeInfo, inputFormat);
    }

    private boolean isStreamingSource() {
        return Boolean.parseBoolean((String) this.catalogTable.getOptions().getOrDefault(FileSystemOptions.STREAMING_SOURCE_ENABLE.key(), ((Boolean) FileSystemOptions.STREAMING_SOURCE_ENABLE.defaultValue()).toString()));
    }

    private DataStream<RowData> createBatchSource(StreamExecutionEnvironment streamExecutionEnvironment, TypeInformation<RowData> typeInformation, HiveTableInputFormat hiveTableInputFormat) {
        DataStreamSource createInput = streamExecutionEnvironment.createInput(hiveTableInputFormat, typeInformation);
        int intValue = ((Integer) this.flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM)).intValue();
        if (((Boolean) this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)).booleanValue()) {
            int intValue2 = ((Integer) this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX)).intValue();
            if (intValue2 < 1) {
                throw new IllegalConfigurationException(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
            }
            try {
                long nanoTime = System.nanoTime();
                int length = hiveTableInputFormat.m619createInputSplits(0).length;
                LOG.info("Hive source({}}) createInputSplits use time: {} ms", this.tablePath, Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
                intValue = Math.min(length, intValue2);
            } catch (IOException e) {
                throw new FlinkHiveException(e);
            }
        }
        createInput.setParallelism(Math.max(1, this.limit > 0 ? Math.min(intValue, ((int) this.limit) / 1000) : intValue));
        return createInput.name(explainSource());
    }

    private DataStream<RowData> createStreamSourceForPartitionTable(StreamExecutionEnvironment streamExecutionEnvironment, TypeInformation<RowData> typeInformation, HiveTableInputFormat hiveTableInputFormat) {
        Configuration configuration = new Configuration();
        Map options = this.catalogTable.getOptions();
        configuration.getClass();
        options.forEach(configuration::setString);
        return new DataStreamSource(streamExecutionEnvironment.addSource(new HiveContinuousMonitoringFunction(this.hiveShim, this.jobConf, this.tablePath, this.catalogTable, streamExecutionEnvironment.getParallelism(), ConsumeOrder.getConsumeOrder((String) configuration.get(FileSystemOptions.STREAMING_SOURCE_CONSUME_ORDER)), (String) configuration.get(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET), (String) configuration.get(FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND), (String) configuration.get(FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS), (String) configuration.get(FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN), ((Duration) configuration.get(FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL)).toMillis()), "HiveMonitoringFunction").transform("Split Reader: HiveMonitoringFunction", typeInformation, new ContinuousFileReaderOperatorFactory(hiveTableInputFormat)));
    }

    private DataStream<RowData> createStreamSourceForNonPartitionTable(StreamExecutionEnvironment streamExecutionEnvironment, TypeInformation<RowData> typeInformation, HiveTableInputFormat hiveTableInputFormat, HiveTablePartition hiveTablePartition) {
        HiveTableFileInputFormat hiveTableFileInputFormat = new HiveTableFileInputFormat(hiveTableInputFormat, hiveTablePartition);
        Configuration configuration = new Configuration();
        Map options = this.catalogTable.getOptions();
        configuration.getClass();
        options.forEach(configuration::setString);
        if (ConsumeOrder.getConsumeOrder((String) configuration.get(FileSystemOptions.STREAMING_SOURCE_CONSUME_ORDER)) != ConsumeOrder.CREATE_TIME_ORDER) {
            throw new UnsupportedOperationException("Only " + ConsumeOrder.CREATE_TIME_ORDER + " is supported for non partition table.");
        }
        return new DataStreamSource(streamExecutionEnvironment.addSource(new ContinuousFileMonitoringFunction(hiveTableFileInputFormat, FileProcessingMode.PROCESS_CONTINUOUSLY, streamExecutionEnvironment.getParallelism(), ((Duration) configuration.get(FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL)).toMillis(), TimestampData.fromLocalDateTime(DefaultPartTimeExtractor.toLocalDateTime((String) configuration.get(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET))).toTimestamp().getTime()), "HiveFileMonitoringFunction").transform("Split Reader: HiveFileMonitoringFunction", typeInformation, new ContinuousFileReaderOperatorFactory(hiveTableFileInputFormat)));
    }

    @VisibleForTesting
    HiveTableInputFormat getInputFormat(List<HiveTablePartition> list, boolean z) {
        return new HiveTableInputFormat(this.jobConf, this.catalogTable, list, this.projectedFields, this.limit, this.hiveVersion, z);
    }

    public TableSchema getTableSchema() {
        return this.catalogTable.getSchema();
    }

    public DataType getProducedDataType() {
        return getProducedTableSchema().toRowDataType().bridgedTo(RowData.class);
    }

    private TableSchema getProducedTableSchema() {
        TableSchema tableSchema = getTableSchema();
        if (this.projectedFields == null) {
            return tableSchema;
        }
        String[] fieldNames = tableSchema.getFieldNames();
        DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
        return TableSchema.builder().fields((String[]) Arrays.stream(this.projectedFields).mapToObj(i -> {
            return fieldNames[i];
        }).toArray(i2 -> {
            return new String[i2];
        }), (DataType[]) Arrays.stream(this.projectedFields).mapToObj(i3 -> {
            return fieldDataTypes[i3];
        }).toArray(i4 -> {
            return new DataType[i4];
        })).build();
    }

    public boolean isLimitPushedDown() {
        return this.isLimitPushDown;
    }

    public TableSource<RowData> applyLimit(long j) {
        return new HiveTableSource(this.jobConf, this.flinkConf, this.tablePath, this.catalogTable, this.remainingPartitions, this.hiveVersion, this.partitionPruned, this.projectedFields, true, j);
    }

    public List<Map<String, String>> getPartitions() {
        throw new UnsupportedOperationException("Please use Catalog API to retrieve all partitions of a table");
    }

    public TableSource<RowData> applyPartitionPruning(List<Map<String, String>> list) {
        return (this.catalogTable.getPartitionKeys() == null || this.catalogTable.getPartitionKeys().size() == 0) ? this : new HiveTableSource(this.jobConf, this.flinkConf, this.tablePath, this.catalogTable, list, this.hiveVersion, true, this.projectedFields, this.isLimitPushDown, this.limit);
    }

    public TableSource<RowData> projectFields(int[] iArr) {
        return new HiveTableSource(this.jobConf, this.flinkConf, this.tablePath, this.catalogTable, this.remainingPartitions, this.hiveVersion, this.partitionPruned, iArr, this.isLimitPushDown, this.limit);
    }

    private List<HiveTablePartition> initAllPartitions() {
        ArrayList arrayList = new ArrayList();
        try {
            HiveMetastoreClientWrapper create = HiveMetastoreClientFactory.create(HiveConfUtils.create(this.jobConf), this.hiveVersion);
            Throwable th = null;
            try {
                String databaseName = this.tablePath.getDatabaseName();
                String objectName = this.tablePath.getObjectName();
                List partitionKeys = this.catalogTable.getPartitionKeys();
                Table table = create.getTable(databaseName, objectName);
                Properties tableMetadata = HiveReflectionUtils.getTableMetadata(this.hiveShim, table);
                String property = tableMetadata.getProperty(FileSystemOptions.LOOKUP_JOIN_CACHE_TTL.key());
                this.hiveTableCacheTTL = property != null ? TimeUtils.parseDuration(property) : (Duration) FileSystemOptions.LOOKUP_JOIN_CACHE_TTL.defaultValue();
                if (partitionKeys == null || partitionKeys.size() <= 0) {
                    arrayList.add(new HiveTablePartition(table.getSd(), tableMetadata));
                } else {
                    String str = this.jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
                    ArrayList arrayList2 = new ArrayList();
                    if (this.remainingPartitions != null) {
                        Iterator<Map<String, String>> it = this.remainingPartitions.iterator();
                        while (it.hasNext()) {
                            arrayList2.add(create.getPartition(databaseName, objectName, partitionSpecToValues(it.next(), partitionKeys)));
                        }
                    } else {
                        arrayList2.addAll(create.listPartitions(databaseName, objectName, (short) -1));
                    }
                    Iterator it2 = arrayList2.iterator();
                    while (it2.hasNext()) {
                        arrayList.add(toHiveTablePartition(this.catalogTable.getPartitionKeys(), this.catalogTable.getSchema().getFieldNames(), this.catalogTable.getSchema().getFieldDataTypes(), this.hiveShim, tableMetadata, str, (Partition) it2.next()));
                    }
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (TException e) {
            throw new FlinkHiveException("Failed to collect all partitions from hive metaStore", e);
        }
    }

    public static HiveTablePartition toHiveTablePartition(List<String> list, String[] strArr, DataType[] dataTypeArr, HiveShim hiveShim, Properties properties, String str, Partition partition) {
        Object restorePartitionValueFromFromType;
        StorageDescriptor sd = partition.getSd();
        HashMap hashMap = new HashMap();
        List asList = Arrays.asList(strArr);
        for (int i = 0; i < list.size(); i++) {
            String str2 = list.get(i);
            String str3 = partition.getValues().get(i);
            DataType dataType = dataTypeArr[asList.indexOf(str2)];
            if (str.equals(str3)) {
                LogicalTypeRoot typeRoot = dataType.getLogicalType().getTypeRoot();
                restorePartitionValueFromFromType = (typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR) ? str : null;
            } else {
                restorePartitionValueFromFromType = restorePartitionValueFromFromType(hiveShim, str3, dataType);
            }
            hashMap.put(str2, restorePartitionValueFromFromType);
        }
        return new HiveTablePartition(sd, hashMap, properties);
    }

    private static List<String> partitionSpecToValues(Map<String, String> map, List<String> list) {
        Preconditions.checkArgument(map.size() == list.size() && map.keySet().containsAll(list), "Partition spec (%s) and partition column names (%s) doesn't match", new Object[]{map, list});
        Stream<String> stream = list.stream();
        map.getClass();
        return (List) stream.map((v1) -> {
            return r1.get(v1);
        }).collect(Collectors.toList());
    }

    private static Object restorePartitionValueFromFromType(HiveShim hiveShim, String str, DataType dataType) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[dataType.getLogicalType().getTypeRoot().ordinal()]) {
            case 1:
            case 2:
                return str;
            case 3:
                return Boolean.valueOf(Boolean.parseBoolean(str));
            case 4:
                return Byte.valueOf(Integer.valueOf(str).byteValue());
            case 5:
                return Short.valueOf(str);
            case 6:
                return Integer.valueOf(str);
            case 7:
                return Long.valueOf(str);
            case 8:
                return Float.valueOf(str);
            case 9:
                return Double.valueOf(str);
            case 10:
                return HiveInspectors.toFlinkObject(HiveInspectors.getObjectInspector(dataType), hiveShim.toHiveDate(Date.valueOf(str)), hiveShim);
            case 11:
                return HiveInspectors.toFlinkObject(HiveInspectors.getObjectInspector(dataType), hiveShim.toHiveTimestamp(Timestamp.valueOf(str)), hiveShim);
            default:
                throw new FlinkHiveException(new IllegalArgumentException(String.format("Can not convert %s to type %s for partition value", str, dataType)));
        }
    }

    public String explainSource() {
        Object[] objArr = new Object[3];
        objArr[0] = this.tablePath.getFullName();
        objArr[1] = Boolean.valueOf(this.partitionPruned);
        objArr[2] = null == this.remainingPartitions ? null : Integer.valueOf(this.remainingPartitions.size());
        String format = String.format(" TablePath: %s, PartitionPruned: %s, PartitionNums: %d", objArr);
        if (this.projectedFields != null) {
            format = format + ", ProjectedFields: " + Arrays.toString(this.projectedFields);
        }
        if (this.isLimitPushDown) {
            format = format + String.format(", LimitPushDown %s, Limit %d", Boolean.valueOf(this.isLimitPushDown), Long.valueOf(this.limit));
        }
        return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + format;
    }

    public TableFunction<RowData> getLookupFunction(String[] strArr) {
        List<HiveTablePartition> initAllPartitions = initAllPartitions();
        TableSchema producedTableSchema = getProducedTableSchema();
        return new FileSystemLookupFunction(getInputFormat(initAllPartitions, ((Boolean) this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)).booleanValue()), strArr, producedTableSchema.getFieldNames(), producedTableSchema.getFieldDataTypes(), this.hiveTableCacheTTL);
    }

    public AsyncTableFunction<RowData> getAsyncLookupFunction(String[] strArr) {
        throw new UnsupportedOperationException("Hive table doesn't support async lookup");
    }

    public boolean isAsyncEnabled() {
        return false;
    }
}
