/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.HiveParallelismInference;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.connectors.hive.HiveSourceFileEnumerator;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.connectors.hive.read.HiveContinuousPartitionContext;
import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.DefaultPartTimeExtractor;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveTableSource
implements ScanTableSource,
SupportsPartitionPushDown,
SupportsProjectionPushDown,
SupportsLimitPushDown {
    private static final Logger LOG = LoggerFactory.getLogger(HiveTableSource.class);
    protected final JobConf jobConf;
    protected final ReadableConfig flinkConf;
    protected final ObjectPath tablePath;
    protected final CatalogTable catalogTable;
    protected final String hiveVersion;
    protected final HiveShim hiveShim;
    @Nullable
    private List<Map<String, String>> remainingPartitions = null;
    protected int[] projectedFields;
    private Long limit = null;

    public HiveTableSource(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath, CatalogTable catalogTable) {
        this.jobConf = (JobConf)Preconditions.checkNotNull((Object)jobConf);
        this.flinkConf = (ReadableConfig)Preconditions.checkNotNull((Object)flinkConf);
        this.tablePath = (ObjectPath)Preconditions.checkNotNull((Object)tablePath);
        this.catalogTable = (CatalogTable)Preconditions.checkNotNull((Object)catalogTable);
        this.hiveVersion = (String)Preconditions.checkNotNull((Object)jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()), (String)"Hive version is not defined");
        this.hiveShim = HiveShimLoader.loadHiveShim(this.hiveVersion);
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
        return new DataStreamScanProvider(){

            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
                return HiveTableSource.this.getDataStream(execEnv);
            }

            public boolean isBounded() {
                return !HiveTableSource.this.isStreamingSource();
            }
        };
    }

    @VisibleForTesting
    protected DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
        HiveSourceBuilder sourceBuilder = new HiveSourceBuilder(this.jobConf, this.flinkConf, this.tablePath, this.hiveVersion, this.catalogTable).setProjectedFields(this.projectedFields).setLimit(this.limit);
        if (this.isStreamingSource()) {
            return this.toDataStreamSource(execEnv, sourceBuilder.buildWithDefaultBulkFormat());
        }
        List<HiveTablePartition> hivePartitionsToRead = HivePartitionUtils.getAllPartitions(this.jobConf, this.hiveVersion, this.tablePath, this.catalogTable.getPartitionKeys(), this.remainingPartitions);
        int parallelism = new HiveParallelismInference(this.tablePath, this.flinkConf).infer((SupplierWithException<Integer, IOException>)((SupplierWithException)() -> HiveSourceFileEnumerator.getNumFiles(hivePartitionsToRead, this.jobConf)), (SupplierWithException<Integer, IOException>)((SupplierWithException)() -> HiveSourceFileEnumerator.createInputSplits(0, hivePartitionsToRead, this.jobConf).size())).limit(this.limit);
        return this.toDataStreamSource(execEnv, sourceBuilder.setPartitions(hivePartitionsToRead).buildWithDefaultBulkFormat()).setParallelism(parallelism);
    }

    private DataStreamSource<RowData> toDataStreamSource(StreamExecutionEnvironment execEnv, HiveSource<RowData> hiveSource) {
        return execEnv.fromSource(hiveSource, WatermarkStrategy.noWatermarks(), "HiveSource-" + this.tablePath.getFullName());
    }

    protected boolean isStreamingSource() {
        return Boolean.parseBoolean(this.catalogTable.getOptions().getOrDefault(FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.key(), ((Boolean)FileSystemConnectorOptions.STREAMING_SOURCE_ENABLE.defaultValue()).toString()));
    }

    protected TableSchema getTableSchema() {
        return this.catalogTable.getSchema();
    }

    protected TableSchema getProducedTableSchema() {
        TableSchema fullSchema = this.getTableSchema();
        if (this.projectedFields == null) {
            return fullSchema;
        }
        String[] fullNames = fullSchema.getFieldNames();
        DataType[] fullTypes = fullSchema.getFieldDataTypes();
        return TableSchema.builder().fields((String[])Arrays.stream(this.projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new), (DataType[])Arrays.stream(this.projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
    }

    public void applyLimit(long limit) {
        this.limit = limit;
    }

    public Optional<List<Map<String, String>>> listPartitions() {
        return Optional.empty();
    }

    public void applyPartitions(List<Map<String, String>> remainingPartitions) {
        if (this.catalogTable.getPartitionKeys() == null || this.catalogTable.getPartitionKeys().size() == 0) {
            throw new UnsupportedOperationException("Should not apply partitions to a non-partitioned table.");
        }
        this.remainingPartitions = remainingPartitions;
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public void applyProjection(int[][] projectedFields) {
        this.projectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray();
    }

    public String asSummaryString() {
        return "HiveSource";
    }

    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    public DynamicTableSource copy() {
        HiveTableSource source = new HiveTableSource(this.jobConf, this.flinkConf, this.tablePath, this.catalogTable);
        source.remainingPartitions = this.remainingPartitions;
        source.projectedFields = this.projectedFields;
        source.limit = this.limit;
        return source;
    }

    @VisibleForTesting
    public JobConf getJobConf() {
        return this.jobConf;
    }

    public static class HiveContinuousPartitionFetcherContext<T extends Comparable<T>>
    extends HivePartitionFetcherContextBase<Partition>
    implements HiveContinuousPartitionContext<Partition, T> {
        private static final long serialVersionUID = 1L;
        private static final Long DEFAULT_MIN_TIME_OFFSET = 0L;
        private static final String DEFAULT_MIN_NAME_OFFSET = "";
        private final TypeSerializer<T> typeSerializer;
        private final T consumeStartOffset;

        public HiveContinuousPartitionFetcherContext(ObjectPath tablePath, HiveShim hiveShim, JobConfWrapper confWrapper, List<String> partitionKeys, DataType[] fieldTypes, String[] fieldNames, Configuration configuration, String defaultPartitionName) {
            super(tablePath, hiveShim, confWrapper, partitionKeys, fieldTypes, fieldNames, configuration, defaultPartitionName);
            switch (this.partitionOrder) {
                case PARTITION_NAME: {
                    if (configuration.contains(FileSystemConnectorOptions.STREAMING_SOURCE_CONSUME_START_OFFSET)) {
                        String consumeOffsetStr = configuration.getString(FileSystemConnectorOptions.STREAMING_SOURCE_CONSUME_START_OFFSET);
                        this.consumeStartOffset = consumeOffsetStr;
                    } else {
                        this.consumeStartOffset = DEFAULT_MIN_NAME_OFFSET;
                    }
                    this.typeSerializer = StringSerializer.INSTANCE;
                    break;
                }
                case PARTITION_TIME: 
                case CREATE_TIME: {
                    if (configuration.contains(FileSystemConnectorOptions.STREAMING_SOURCE_CONSUME_START_OFFSET)) {
                        String consumeOffsetStr = configuration.getString(FileSystemConnectorOptions.STREAMING_SOURCE_CONSUME_START_OFFSET);
                        this.consumeStartOffset = DefaultPartTimeExtractor.toMills((String)consumeOffsetStr);
                    } else {
                        this.consumeStartOffset = DEFAULT_MIN_TIME_OFFSET;
                    }
                    this.typeSerializer = LongSerializer.INSTANCE;
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported partition order: " + this.partitionOrder);
                }
            }
        }

        public Optional<Partition> getPartition(List<String> partValues) throws TException {
            try {
                return Optional.of(this.metaStoreClient.getPartition(this.tablePath.getDatabaseName(), this.tablePath.getObjectName(), partValues));
            }
            catch (NoSuchObjectException e) {
                return Optional.empty();
            }
        }

        public ObjectPath getTablePath() {
            return this.tablePath;
        }

        @Override
        public HiveTablePartition toHiveTablePartition(Partition partition) {
            return HivePartitionUtils.toHiveTablePartition(this.partitionKeys, this.tableProps, partition);
        }

        public TypeSerializer<T> getTypeSerializer() {
            return this.typeSerializer;
        }

        public T getConsumeStartOffset() {
            return this.consumeStartOffset;
        }

        @Override
        public void close() throws Exception {
            if (this.metaStoreClient != null) {
                this.metaStoreClient.close();
            }
        }
    }
}

