/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.ConsumeOrder;
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.connectors.hive.HiveParallelismInference;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceFileEnumerator;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher;
import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
import org.apache.flink.table.filesystem.DefaultPartTimeExtractor;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveTableSource
implements ScanTableSource,
SupportsPartitionPushDown,
SupportsProjectionPushDown,
SupportsLimitPushDown {
    private static final Logger LOG = LoggerFactory.getLogger(HiveTableSource.class);
    private static final Duration DEFAULT_SCAN_MONITOR_INTERVAL = Duration.ofMinutes(1L);
    protected final JobConf jobConf;
    protected final ReadableConfig flinkConf;
    protected final ObjectPath tablePath;
    protected final CatalogTable catalogTable;
    protected final String hiveVersion;
    protected final HiveShim hiveShim;
    @Nullable
    private List<Map<String, String>> remainingPartitions = null;
    protected int[] projectedFields;
    private Long limit = null;

    public HiveTableSource(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath, CatalogTable catalogTable) {
        this.jobConf = (JobConf)Preconditions.checkNotNull((Object)jobConf);
        this.flinkConf = (ReadableConfig)Preconditions.checkNotNull((Object)flinkConf);
        this.tablePath = (ObjectPath)Preconditions.checkNotNull((Object)tablePath);
        this.catalogTable = (CatalogTable)Preconditions.checkNotNull((Object)catalogTable);
        this.hiveVersion = (String)Preconditions.checkNotNull((Object)jobConf.get("hive-version"), (String)"Hive version is not defined");
        this.hiveShim = HiveShimLoader.loadHiveShim(this.hiveVersion);
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
        return new DataStreamScanProvider(){

            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
                return HiveTableSource.this.getDataStream(execEnv);
            }

            public boolean isBounded() {
                return !HiveTableSource.this.isStreamingSource();
            }
        };
    }

    @VisibleForTesting
    protected DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
        this.validateScanConfigurations();
        HiveTableUtil.checkAcidTable(this.catalogTable, this.tablePath);
        List<HiveTablePartition> allHivePartitions = HivePartitionUtils.getAllPartitions(this.jobConf, this.hiveVersion, this.tablePath, this.catalogTable, this.hiveShim, this.remainingPartitions);
        Configuration configuration = Configuration.fromMap((Map)this.catalogTable.getOptions());
        HiveSource.HiveSourceBuilder sourceBuilder = new HiveSource.HiveSourceBuilder(this.jobConf, this.tablePath, this.catalogTable, allHivePartitions, this.limit, this.hiveVersion, (Boolean)this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER), (RowType)this.getProducedDataType().getLogicalType());
        if (this.isStreamingSource()) {
            String consumeOrderStr;
            ConsumeOrder consumeOrder;
            if (this.catalogTable.getPartitionKeys().isEmpty() && (consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr = (String)configuration.get(FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER))) != ConsumeOrder.CREATE_TIME_ORDER) {
                throw new UnsupportedOperationException("Only " + (Object)((Object)ConsumeOrder.CREATE_TIME_ORDER) + " is supported for non partition table.");
            }
            Duration monitorInterval = configuration.get(FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL) == null ? DEFAULT_SCAN_MONITOR_INTERVAL : (Duration)configuration.get(FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL);
            sourceBuilder.monitorContinuously(monitorInterval);
            if (!this.catalogTable.getPartitionKeys().isEmpty()) {
                sourceBuilder.setFetcher(new HiveContinuousPartitionFetcher());
                String defaultPartitionName = this.jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
                HiveContinuousPartitionFetcherContext fetcherContext = new HiveContinuousPartitionFetcherContext(this.tablePath, this.hiveShim, new JobConfWrapper(this.jobConf), this.catalogTable.getPartitionKeys(), this.getTableSchema().getFieldDataTypes(), this.getTableSchema().getFieldNames(), configuration, defaultPartitionName);
                sourceBuilder.setFetcherContext(fetcherContext);
            }
        }
        HiveSource hiveSource = sourceBuilder.build();
        DataStreamSource source = execEnv.fromSource((Source)hiveSource, WatermarkStrategy.noWatermarks(), "HiveSource-" + this.tablePath.getFullName());
        if (this.isStreamingSource()) {
            return source;
        }
        int parallelism = new HiveParallelismInference(this.tablePath, this.flinkConf).infer((SupplierWithException<Integer, IOException>)((SupplierWithException)() -> HiveSourceFileEnumerator.getNumFiles(allHivePartitions, this.jobConf)), (SupplierWithException<Integer, IOException>)((SupplierWithException)() -> HiveSourceFileEnumerator.createInputSplits(0, allHivePartitions, this.jobConf).size())).limit(this.limit);
        return source.setParallelism(parallelism);
    }

    private void validateScanConfigurations() {
        String partitionInclude = (String)this.catalogTable.getOptions().getOrDefault(FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE.defaultValue());
        Preconditions.checkArgument((boolean)"all".equals(partitionInclude), (Object)String.format("The only supported '%s' is 'all' in hive table scan, but is '%s'", FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), partitionInclude));
    }

    protected boolean isStreamingSource() {
        return Boolean.parseBoolean(this.catalogTable.getOptions().getOrDefault(FileSystemOptions.STREAMING_SOURCE_ENABLE.key(), ((Boolean)FileSystemOptions.STREAMING_SOURCE_ENABLE.defaultValue()).toString()));
    }

    protected TableSchema getTableSchema() {
        return this.catalogTable.getSchema();
    }

    private DataType getProducedDataType() {
        return (DataType)this.getProducedTableSchema().toRowDataType().bridgedTo(RowData.class);
    }

    protected TableSchema getProducedTableSchema() {
        TableSchema fullSchema = this.getTableSchema();
        if (this.projectedFields == null) {
            return fullSchema;
        }
        String[] fullNames = fullSchema.getFieldNames();
        DataType[] fullTypes = fullSchema.getFieldDataTypes();
        return TableSchema.builder().fields((String[])Arrays.stream(this.projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new), (DataType[])Arrays.stream(this.projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
    }

    public void applyLimit(long limit) {
        this.limit = limit;
    }

    public Optional<List<Map<String, String>>> listPartitions() {
        return Optional.empty();
    }

    public void applyPartitions(List<Map<String, String>> remainingPartitions) {
        if (this.catalogTable.getPartitionKeys() == null || this.catalogTable.getPartitionKeys().size() == 0) {
            throw new UnsupportedOperationException("Should not apply partitions to a non-partitioned table.");
        }
        this.remainingPartitions = remainingPartitions;
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public void applyProjection(int[][] projectedFields) {
        this.projectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray();
    }

    public String asSummaryString() {
        return "HiveSource";
    }

    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    public DynamicTableSource copy() {
        HiveTableSource source = new HiveTableSource(this.jobConf, this.flinkConf, this.tablePath, this.catalogTable);
        source.remainingPartitions = this.remainingPartitions;
        source.projectedFields = this.projectedFields;
        source.limit = this.limit;
        return source;
    }

    public static class HiveContinuousPartitionFetcherContext<T extends Comparable<T>>
    extends HivePartitionFetcherContextBase<Partition>
    implements ContinuousPartitionFetcher.Context<Partition, T> {
        private static final long serialVersionUID = 1L;
        private static final Long DEFAULT_MIN_TIME_OFFSET = 0L;
        private static final String DEFAULT_MIN_NAME_OFFSET = "";
        private final TypeSerializer<T> typeSerializer;
        private final T consumeStartOffset;

        public HiveContinuousPartitionFetcherContext(ObjectPath tablePath, HiveShim hiveShim, JobConfWrapper confWrapper, List<String> partitionKeys, DataType[] fieldTypes, String[] fieldNames, Configuration configuration, String defaultPartitionName) {
            super(tablePath, hiveShim, confWrapper, partitionKeys, fieldTypes, fieldNames, configuration, defaultPartitionName);
            switch (this.consumeOrder) {
                case PARTITION_NAME_ORDER: {
                    if (configuration.contains(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET)) {
                        String consumeOffsetStr = configuration.getString(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET);
                        this.consumeStartOffset = consumeOffsetStr;
                    } else {
                        this.consumeStartOffset = DEFAULT_MIN_NAME_OFFSET;
                    }
                    this.typeSerializer = StringSerializer.INSTANCE;
                    break;
                }
                case PARTITION_TIME_ORDER: 
                case CREATE_TIME_ORDER: {
                    if (configuration.contains(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET)) {
                        String consumeOffsetStr = configuration.getString(FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET);
                        this.consumeStartOffset = DefaultPartTimeExtractor.toMills((String)consumeOffsetStr);
                    } else {
                        this.consumeStartOffset = DEFAULT_MIN_TIME_OFFSET;
                    }
                    this.typeSerializer = LongSerializer.INSTANCE;
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported consumer order: " + (Object)((Object)this.consumeOrder));
                }
            }
        }

        public Optional<Partition> getPartition(List<String> partValues) throws TException {
            try {
                return Optional.of(this.metaStoreClient.getPartition(this.tablePath.getDatabaseName(), this.tablePath.getObjectName(), partValues));
            }
            catch (NoSuchObjectException e) {
                return Optional.empty();
            }
        }

        public ObjectPath getTablePath() {
            return this.tablePath;
        }

        public long getModificationTime(Partition partition, T partitionOffset) {
            switch (this.consumeOrder) {
                case PARTITION_NAME_ORDER: {
                    return (long)partition.getCreateTime() * 11000L;
                }
                case PARTITION_TIME_ORDER: 
                case CREATE_TIME_ORDER: {
                    return (Long)partitionOffset;
                }
            }
            throw new UnsupportedOperationException("Unsupported consumer order: " + (Object)((Object)this.consumeOrder));
        }

        public HiveTablePartition toHiveTablePartition(Partition partition) {
            return HivePartitionUtils.toHiveTablePartition(this.partitionKeys, this.fieldNames, this.fieldTypes, this.hiveShim, this.tableProps, this.defaultPartitionName, partition);
        }

        public TypeSerializer<T> getTypeSerializer() {
            return this.typeSerializer;
        }

        public T getConsumeStartOffset() {
            return this.consumeStartOffset;
        }

        @Override
        public void close() throws Exception {
            if (this.metaStoreClient != null) {
                this.metaStoreClient.close();
            }
        }
    }
}

