/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive.read;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.HiveSourceFileEnumerator;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.connectors.hive.read.HiveMapredSplitReader;
import org.apache.flink.connectors.hive.read.HiveTableInputSplit;
import org.apache.flink.connectors.hive.read.HiveVectorizedOrcSplitReader;
import org.apache.flink.connectors.hive.read.HiveVectorizedParquetSplitReader;
import org.apache.flink.connectors.hive.read.SplitReader;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveTableInputFormat
extends HadoopInputFormatCommonBase<RowData, HiveTableInputSplit>
implements CheckpointableInputFormat<HiveTableInputSplit, Long> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HiveTableInputFormat.class);
    private static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns";
    private static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types";
    private final JobConfWrapper jobConf;
    private final String hiveVersion;
    private final List<String> partitionKeys;
    private final DataType[] fieldTypes;
    private final String[] fieldNames;
    private final List<HiveTablePartition> partitions;
    private final int[] selectedFields;
    private final Long limit;
    private final boolean useMapRedReader;
    private transient long currentReadCount = 0L;
    @VisibleForTesting
    protected transient SplitReader reader;

    public HiveTableInputFormat(JobConf jobConf, List<String> partitionKeys, DataType[] fieldTypes, String[] fieldNames, int[] projectedFields, Long limit, String hiveVersion, boolean useMapRedReader, List<HiveTablePartition> partitions) {
        super(jobConf.getCredentials());
        this.jobConf = new JobConfWrapper(new JobConf((org.apache.hadoop.conf.Configuration)jobConf));
        this.partitionKeys = partitionKeys;
        this.fieldTypes = fieldTypes;
        this.fieldNames = fieldNames;
        this.limit = limit;
        this.hiveVersion = hiveVersion;
        int rowArity = fieldTypes.length;
        this.selectedFields = projectedFields != null ? projectedFields : IntStream.range(0, rowArity).toArray();
        this.useMapRedReader = useMapRedReader;
        this.partitions = (List)Preconditions.checkNotNull(partitions, (String)"partitions can not be null.");
    }

    public JobConf getJobConf() {
        return this.jobConf.conf();
    }

    public void configure(Configuration parameters) {
    }

    public void open(HiveTableInputSplit split) throws IOException {
        HiveTablePartition partition = split.getHiveTablePartition();
        if (!this.useMapRedReader && this.useOrcVectorizedRead(partition)) {
            this.reader = new HiveVectorizedOrcSplitReader(this.hiveVersion, this.jobConf.conf(), this.fieldNames, this.fieldTypes, this.selectedFields, split);
        } else if (!this.useMapRedReader && this.useParquetVectorizedRead(partition)) {
            this.reader = new HiveVectorizedParquetSplitReader(this.hiveVersion, this.jobConf.conf(), this.fieldNames, this.fieldTypes, this.selectedFields, split);
        } else {
            JobConf clonedConf = new JobConf((org.apache.hadoop.conf.Configuration)this.jobConf.conf());
            this.addSchemaToConf(clonedConf);
            this.reader = new HiveMapredSplitReader(clonedConf, this.partitionKeys, this.fieldTypes, this.selectedFields, split, HiveShimLoader.loadHiveShim(this.hiveVersion));
        }
        this.currentReadCount = 0L;
    }

    private void addSchemaToConf(JobConf jobConf) {
        List typeStrs = Arrays.stream(this.fieldTypes).map(t -> HiveTypeUtil.toHiveTypeInfo(t, true).toString()).collect(Collectors.toList());
        jobConf.set("columns", String.join((CharSequence)",", this.fieldNames));
        jobConf.set("columns.types", String.join((CharSequence)",", typeStrs));
        int numNonPartCol = this.fieldNames.length - this.partitionKeys.size();
        jobConf.set(SCHEMA_EVOLUTION_COLUMNS, String.join((CharSequence)",", Arrays.copyOfRange(this.fieldNames, 0, numNonPartCol)));
        jobConf.set(SCHEMA_EVOLUTION_COLUMNS_TYPES, String.join((CharSequence)",", typeStrs.subList(0, numNonPartCol)));
        String readColIDs = Arrays.stream(this.selectedFields).filter(i -> i < numNonPartCol).mapToObj(String::valueOf).collect(Collectors.joining(","));
        jobConf.set("hive.io.file.readcolumn.ids", readColIDs);
    }

    public void reopen(HiveTableInputSplit split, Long state) throws IOException {
        this.open(split);
        this.currentReadCount = state;
        this.reader.seekToRow(state, (RowData)new GenericRowData(this.selectedFields.length));
    }

    public Long getCurrentState() {
        return this.currentReadCount;
    }

    private static boolean isVectorizationUnsupported(LogicalType t) {
        switch (t.getTypeRoot()) {
            case CHAR: 
            case VARCHAR: 
            case BOOLEAN: 
            case BINARY: 
            case VARBINARY: 
            case DECIMAL: 
            case TINYINT: 
            case SMALLINT: 
            case INTEGER: 
            case BIGINT: 
            case FLOAT: 
            case DOUBLE: 
            case DATE: 
            case TIME_WITHOUT_TIME_ZONE: 
            case TIMESTAMP_WITHOUT_TIME_ZONE: 
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE: {
                return false;
            }
        }
        return true;
    }

    private boolean useParquetVectorizedRead(HiveTablePartition partition) {
        boolean isParquet = partition.getStorageDescriptor().getSerdeInfo().getSerializationLib().toLowerCase().contains("parquet");
        if (!isParquet) {
            return false;
        }
        for (int i : this.selectedFields) {
            if (!HiveTableInputFormat.isVectorizationUnsupported(this.fieldTypes[i].getLogicalType())) continue;
            LOG.info("Fallback to hadoop mapred reader, unsupported field type: " + this.fieldTypes[i]);
            return false;
        }
        LOG.info("Use flink parquet ColumnarRowData reader.");
        return true;
    }

    private boolean useOrcVectorizedRead(HiveTablePartition partition) {
        boolean isOrc = partition.getStorageDescriptor().getSerdeInfo().getSerializationLib().toLowerCase().contains("orc");
        if (!isOrc) {
            return false;
        }
        for (int i : this.selectedFields) {
            if (!HiveTableInputFormat.isVectorizationUnsupported(this.fieldTypes[i].getLogicalType())) continue;
            LOG.info("Fallback to hadoop mapred reader, unsupported field type: " + this.fieldTypes[i]);
            return false;
        }
        LOG.info("Use flink orc ColumnarRowData reader.");
        return true;
    }

    public boolean reachedEnd() throws IOException {
        if (this.limit != null && this.currentReadCount >= this.limit) {
            return true;
        }
        return this.reader.reachedEnd();
    }

    public RowData nextRecord(RowData reuse) throws IOException {
        ++this.currentReadCount;
        return this.reader.nextRecord(reuse);
    }

    public void close() throws IOException {
        if (this.reader != null) {
            this.reader.close();
            this.reader = null;
        }
    }

    public HiveTableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
        return HiveTableInputFormat.createInputSplits(minNumSplits, this.partitions, this.jobConf.conf());
    }

    public static HiveTableInputSplit[] createInputSplits(int minNumSplits, List<HiveTablePartition> partitions, JobConf jobConf) throws IOException {
        ArrayList<HiveTableInputSplit> hiveSplits = new ArrayList<HiveTableInputSplit>();
        int splitNum = 0;
        for (HiveTablePartition partition : partitions) {
            for (InputSplit inputSplit : HiveSourceFileEnumerator.createMRSplits(minNumSplits, partition.getStorageDescriptor(), jobConf)) {
                hiveSplits.add(new HiveTableInputSplit(splitNum++, inputSplit, jobConf, partition));
            }
        }
        return hiveSplits.toArray(new HiveTableInputSplit[0]);
    }

    public BaseStatistics getStatistics(BaseStatistics cachedStats) {
        return null;
    }

    public InputSplitAssigner getInputSplitAssigner(HiveTableInputSplit[] inputSplits) {
        return new LocatableInputSplitAssigner((LocatableInputSplit[])inputSplits);
    }

    public int getNumFiles() throws IOException {
        int numFiles = 0;
        FileSystem fs = null;
        for (HiveTablePartition partition : this.partitions) {
            StorageDescriptor sd = partition.getStorageDescriptor();
            Path inputPath = new Path(sd.getLocation());
            if (fs == null) {
                fs = inputPath.getFileSystem((org.apache.hadoop.conf.Configuration)this.jobConf.conf());
            }
            if (!fs.exists(inputPath)) continue;
            numFiles += fs.listStatus(inputPath).length;
        }
        return numFiles;
    }
}

