package org.apache.flink.connectors.hive;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher;
import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
import org.apache.flink.table.filesystem.DefaultPartTimeExtractor;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSource.class */
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
    private static final Logger LOG = LoggerFactory.getLogger(HiveTableSource.class);
    private static final Duration DEFAULT_SCAN_MONITOR_INTERVAL = Duration.ofMinutes(1);
    protected final JobConf jobConf;
    protected final ReadableConfig flinkConf;
    protected final ObjectPath tablePath;
    protected final CatalogTable catalogTable;
    protected final String hiveVersion;
    protected final HiveShim hiveShim;
    protected int[] projectedFields;

    @Nullable
    private List<Map<String, String>> remainingPartitions = null;
    private Long limit = null;

    /* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSource$HiveContinuousPartitionFetcherContext.class */
    public static class HiveContinuousPartitionFetcherContext<T extends Comparable<T>> extends HivePartitionFetcherContextBase<Partition> implements ContinuousPartitionFetcher.Context<Partition, T> {
        private static final long serialVersionUID = 1;
        private static final Long DEFAULT_MIN_TIME_OFFSET = 0L;
        private static final String DEFAULT_MIN_NAME_OFFSET = "";
        private final TypeSerializer<T> typeSerializer;
        private final T consumeStartOffset;

        public HiveContinuousPartitionFetcherContext(ObjectPath objectPath, HiveShim hiveShim, JobConfWrapper jobConfWrapper, List<String> list, DataType[] dataTypeArr, String[] strArr, Configuration configuration, String str) {
            super(objectPath, hiveShim, jobConfWrapper, list, dataTypeArr, strArr, configuration, str);
            switch (this.consumeOrder) {
                case PARTITION_NAME_ORDER:
                    if (configuration.contains(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET)) {
                        this.consumeStartOffset = configuration.getString(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET);
                    } else {
                        this.consumeStartOffset = "";
                    }
                    this.typeSerializer = StringSerializer.INSTANCE;
                    return;
                case PARTITION_TIME_ORDER:
                case CREATE_TIME_ORDER:
                    if (configuration.contains(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET)) {
                        this.consumeStartOffset = Long.valueOf(DefaultPartTimeExtractor.toMills(configuration.getString(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET)));
                    } else {
                        this.consumeStartOffset = DEFAULT_MIN_TIME_OFFSET;
                    }
                    this.typeSerializer = LongSerializer.INSTANCE;
                    return;
                default:
                    throw new UnsupportedOperationException("Unsupported consumer order: " + this.consumeOrder);
            }
        }

        public Optional<Partition> getPartition(List<String> list) throws TException {
            try {
                return Optional.of(this.metaStoreClient.getPartition(this.tablePath.getDatabaseName(), this.tablePath.getObjectName(), list));
            } catch (NoSuchObjectException e) {
                return Optional.empty();
            }
        }

        public ObjectPath getTablePath() {
            return this.tablePath;
        }

        public long getModificationTime(Partition partition, T t) {
            switch (this.consumeOrder) {
                case PARTITION_NAME_ORDER:
                    return partition.getCreateTime() * 11000;
                case PARTITION_TIME_ORDER:
                case CREATE_TIME_ORDER:
                    return ((Long) t).longValue();
                default:
                    throw new UnsupportedOperationException("Unsupported consumer order: " + this.consumeOrder);
            }
        }

        public HiveTablePartition toHiveTablePartition(Partition partition) {
            return HivePartitionUtils.toHiveTablePartition(this.partitionKeys, this.fieldNames, this.fieldTypes, this.hiveShim, this.tableProps, this.defaultPartitionName, partition);
        }

        public TypeSerializer<T> getTypeSerializer() {
            return this.typeSerializer;
        }

        public T getConsumeStartOffset() {
            return this.consumeStartOffset;
        }

        @Override // org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase
        public void close() throws Exception {
            if (this.metaStoreClient != null) {
                this.metaStoreClient.close();
            }
        }
    }

    public HiveTableSource(JobConf jobConf, ReadableConfig readableConfig, ObjectPath objectPath, CatalogTable catalogTable) {
        this.jobConf = (JobConf) Preconditions.checkNotNull(jobConf);
        this.flinkConf = (ReadableConfig) Preconditions.checkNotNull(readableConfig);
        this.tablePath = (ObjectPath) Preconditions.checkNotNull(objectPath);
        this.catalogTable = (CatalogTable) Preconditions.checkNotNull(catalogTable);
        this.hiveVersion = (String) Preconditions.checkNotNull(jobConf.get("hive-version"), "Hive version is not defined");
        this.hiveShim = HiveShimLoader.loadHiveShim(this.hiveVersion);
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return new DataStreamScanProvider() { // from class: org.apache.flink.connectors.hive.HiveTableSource.1
            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
                return HiveTableSource.this.getDataStream(streamExecutionEnvironment);
            }

            public boolean isBounded() {
                return !HiveTableSource.this.isStreamingSource();
            }
        };
    }

    @VisibleForTesting
    protected DataStream<RowData> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        validateScanConfigurations();
        HiveTableUtil.checkAcidTable(this.catalogTable, this.tablePath);
        List<HiveTablePartition> allPartitions = HivePartitionUtils.getAllPartitions(this.jobConf, this.hiveVersion, this.tablePath, this.catalogTable, this.hiveShim, this.remainingPartitions);
        Configuration fromMap = Configuration.fromMap(this.catalogTable.getOptions());
        HiveSource.HiveSourceBuilder hiveSourceBuilder = new HiveSource.HiveSourceBuilder(this.jobConf, this.tablePath, this.catalogTable, allPartitions, this.limit, this.hiveVersion, ((Boolean) this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)).booleanValue(), getProducedDataType().getLogicalType());
        if (isStreamingSource()) {
            if (this.catalogTable.getPartitionKeys().isEmpty() && ConsumeOrder.getConsumeOrder((String) fromMap.get(FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER)) != ConsumeOrder.CREATE_TIME_ORDER) {
                throw new UnsupportedOperationException("Only " + ConsumeOrder.CREATE_TIME_ORDER + " is supported for non partition table.");
            }
            hiveSourceBuilder.monitorContinuously(fromMap.get(FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL) == null ? DEFAULT_SCAN_MONITOR_INTERVAL : (Duration) fromMap.get(FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL));
            if (!this.catalogTable.getPartitionKeys().isEmpty()) {
                hiveSourceBuilder.setFetcher(new HiveContinuousPartitionFetcher());
                hiveSourceBuilder.setFetcherContext(new HiveContinuousPartitionFetcherContext<>(this.tablePath, this.hiveShim, new JobConfWrapper(this.jobConf), this.catalogTable.getPartitionKeys(), getProducedTableSchema().getFieldDataTypes(), getProducedTableSchema().getFieldNames(), fromMap, this.jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal)));
            }
        }
        DataStreamSource fromSource = streamExecutionEnvironment.fromSource(hiveSourceBuilder.m617build(), WatermarkStrategy.noWatermarks(), "HiveSource-" + this.tablePath.getFullName());
        return isStreamingSource() ? fromSource : fromSource.setParallelism(new HiveParallelismInference(this.tablePath, this.flinkConf).infer(() -> {
            return Integer.valueOf(HiveSourceFileEnumerator.getNumFiles(allPartitions, this.jobConf));
        }, () -> {
            return Integer.valueOf(HiveSourceFileEnumerator.createInputSplits(0, allPartitions, this.jobConf).size());
        }).limit(this.limit));
    }

    private void validateScanConfigurations() {
        String str = (String) this.catalogTable.getOptions().getOrDefault(FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE.defaultValue());
        Preconditions.checkArgument("all".equals(str), String.format("The only supported '%s' is 'all' in hive table scan, but is '%s'", FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isStreamingSource() {
        return Boolean.parseBoolean((String) this.catalogTable.getOptions().getOrDefault(FileSystemOptions.STREAMING_SOURCE_ENABLE.key(), ((Boolean) FileSystemOptions.STREAMING_SOURCE_ENABLE.defaultValue()).toString()));
    }

    protected TableSchema getTableSchema() {
        return this.catalogTable.getSchema();
    }

    private DataType getProducedDataType() {
        return getProducedTableSchema().toRowDataType().bridgedTo(RowData.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TableSchema getProducedTableSchema() {
        TableSchema tableSchema = getTableSchema();
        if (this.projectedFields == null) {
            return tableSchema;
        }
        String[] fieldNames = tableSchema.getFieldNames();
        DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
        return TableSchema.builder().fields((String[]) Arrays.stream(this.projectedFields).mapToObj(i -> {
            return fieldNames[i];
        }).toArray(i2 -> {
            return new String[i2];
        }), (DataType[]) Arrays.stream(this.projectedFields).mapToObj(i3 -> {
            return fieldDataTypes[i3];
        }).toArray(i4 -> {
            return new DataType[i4];
        })).build();
    }

    public void applyLimit(long j) {
        this.limit = Long.valueOf(j);
    }

    public Optional<List<Map<String, String>>> listPartitions() {
        return Optional.empty();
    }

    public void applyPartitions(List<Map<String, String>> list) {
        if (this.catalogTable.getPartitionKeys() == null || this.catalogTable.getPartitionKeys().size() == 0) {
            throw new UnsupportedOperationException("Should not apply partitions to a non-partitioned table.");
        }
        this.remainingPartitions = list;
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public void applyProjection(int[][] iArr) {
        this.projectedFields = Arrays.stream(iArr).mapToInt(iArr2 -> {
            return iArr2[0];
        }).toArray();
    }

    public String asSummaryString() {
        return "HiveSource";
    }

    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    public DynamicTableSource copy() {
        HiveTableSource hiveTableSource = new HiveTableSource(this.jobConf, this.flinkConf, this.tablePath, this.catalogTable);
        hiveTableSource.remainingPartitions = this.remainingPartitions;
        hiveTableSource.projectedFields = this.projectedFields;
        hiveTableSource.limit = this.limit;
        return hiveTableSource;
    }
}
