/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive.read;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.ArrayResultIterator;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.connectors.hive.read.HiveMapredSplitReader;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
import org.apache.flink.connectors.hive.read.HiveTableInputSplit;
import org.apache.flink.hive.shaded.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
import org.apache.flink.orc.shim.OrcShim;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.PartitionFieldExtractor;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveBulkFormatAdapter
implements BulkFormat<RowData, HiveSourceSplit> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HiveBulkFormatAdapter.class);
    private static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns";
    private static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types";
    private static final PartitionFieldExtractor<HiveSourceSplit> PARTITION_FIELD_EXTRACTOR = (PartitionFieldExtractor & Serializable)(split, fieldName, fieldType) -> split.getHiveTablePartition().getPartitionSpec().get(fieldName);
    private final JobConfWrapper jobConfWrapper;
    private final List<String> partitionKeys;
    private final String[] fieldNames;
    private final DataType[] fieldTypes;
    private final String hiveVersion;
    private final HiveShim hiveShim;
    private final RowType producedRowType;
    private final boolean useMapRedReader;

    public HiveBulkFormatAdapter(JobConfWrapper jobConfWrapper, List<String> partitionKeys, String[] fieldNames, DataType[] fieldTypes, String hiveVersion, RowType producedRowType, boolean useMapRedReader) {
        this.jobConfWrapper = jobConfWrapper;
        this.partitionKeys = partitionKeys;
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
        this.hiveVersion = hiveVersion;
        this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
        this.producedRowType = producedRowType;
        this.useMapRedReader = useMapRedReader;
    }

    public BulkFormat.Reader<RowData> createReader(Configuration config, HiveSourceSplit split) throws IOException {
        return this.createBulkFormatForSplit(split).createReader(config, (FileSourceSplit)split);
    }

    public BulkFormat.Reader<RowData> restoreReader(Configuration config, HiveSourceSplit split) throws IOException {
        return this.createBulkFormatForSplit(split).restoreReader(config, (FileSourceSplit)split);
    }

    public boolean isSplittable() {
        return true;
    }

    public TypeInformation<RowData> getProducedType() {
        return InternalTypeInfo.of((RowType)this.producedRowType);
    }

    private RowType tableRowType() {
        LogicalType[] types = (LogicalType[])Arrays.stream(this.fieldTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new);
        return RowType.of((LogicalType[])types, (String[])this.fieldNames);
    }

    private BulkFormat<RowData, ? super HiveSourceSplit> createBulkFormatForSplit(HiveSourceSplit split) {
        if (!this.useMapRedReader && this.useParquetVectorizedRead(split.getHiveTablePartition())) {
            return ParquetColumnarRowInputFormat.createPartitionedFormat((org.apache.hadoop.conf.Configuration)this.jobConfWrapper.conf(), this.producedRowType, this.partitionKeys, PARTITION_FIELD_EXTRACTOR, 2048, this.hiveVersion.startsWith("3"), false);
        }
        if (!this.useMapRedReader && this.useOrcVectorizedRead(split.getHiveTablePartition())) {
            return this.createOrcFormat();
        }
        return new HiveMapRedBulkFormat();
    }

    private OrcColumnarRowFileInputFormat<?, HiveSourceSplit> createOrcFormat() {
        return this.hiveVersion.startsWith("1.") ? OrcNoHiveColumnarRowInputFormat.createPartitionedFormat((org.apache.hadoop.conf.Configuration)this.jobConfWrapper.conf(), this.tableRowType(), this.partitionKeys, PARTITION_FIELD_EXTRACTOR, this.computeSelectedFields(), Collections.emptyList(), 2048) : OrcColumnarRowFileInputFormat.createPartitionedFormat(OrcShim.createShim(this.hiveVersion), (org.apache.hadoop.conf.Configuration)this.jobConfWrapper.conf(), this.tableRowType(), this.partitionKeys, PARTITION_FIELD_EXTRACTOR, this.computeSelectedFields(), Collections.emptyList(), 2048);
    }

    private boolean useOrcVectorizedRead(HiveTablePartition partition) {
        boolean isOrc = partition.getStorageDescriptor().getSerdeInfo().getSerializationLib().toLowerCase().contains("orc");
        if (!isOrc) {
            return false;
        }
        for (RowType.RowField field : this.producedRowType.getFields()) {
            if (!HiveBulkFormatAdapter.isVectorizationUnsupported(field.getType())) continue;
            LOG.info("Fallback to hadoop mapred reader, unsupported field type: " + field.getType());
            return false;
        }
        LOG.info("Use flink orc ColumnarRowData reader.");
        return true;
    }

    private boolean useParquetVectorizedRead(HiveTablePartition partition) {
        boolean isParquet = partition.getStorageDescriptor().getSerdeInfo().getSerializationLib().toLowerCase().contains("parquet");
        if (!isParquet) {
            return false;
        }
        for (RowType.RowField field : this.producedRowType.getFields()) {
            if (!HiveBulkFormatAdapter.isVectorizationUnsupported(field.getType())) continue;
            LOG.info("Fallback to hadoop mapred reader, unsupported field type: " + field.getType());
            return false;
        }
        LOG.info("Use flink parquet ColumnarRowData reader.");
        return true;
    }

    private static boolean isVectorizationUnsupported(LogicalType t) {
        switch (t.getTypeRoot()) {
            case CHAR: 
            case VARCHAR: 
            case BOOLEAN: 
            case BINARY: 
            case VARBINARY: 
            case DECIMAL: 
            case TINYINT: 
            case SMALLINT: 
            case INTEGER: 
            case BIGINT: 
            case FLOAT: 
            case DOUBLE: 
            case DATE: 
            case TIME_WITHOUT_TIME_ZONE: 
            case TIMESTAMP_WITHOUT_TIME_ZONE: 
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE: {
                return false;
            }
        }
        return true;
    }

    private int[] computeSelectedFields() {
        int[] selectedFields = new int[this.producedRowType.getFieldCount()];
        for (int i = 0; i < selectedFields.length; ++i) {
            String name = (String)this.producedRowType.getFieldNames().get(i);
            int index = Arrays.asList(this.fieldNames).indexOf(name);
            Preconditions.checkState((index >= 0 ? 1 : 0) != 0, (String)"Produced field name %s not found in table schema fields %s", (Object[])new Object[]{name, Arrays.toString(this.fieldNames)});
            selectedFields[i] = index;
        }
        return selectedFields;
    }

    private class HiveReader
    implements BulkFormat.Reader<RowData> {
        private final HiveMapredSplitReader hiveMapredSplitReader;
        private final RowDataSerializer serializer;
        private final int[] selectedFields;
        private long numRead = 0L;

        private HiveReader(HiveSourceSplit split) throws IOException {
            this.selectedFields = HiveBulkFormatAdapter.this.computeSelectedFields();
            JobConf clonedConf = new JobConf((org.apache.hadoop.conf.Configuration)HiveBulkFormatAdapter.this.jobConfWrapper.conf());
            this.addSchemaToConf(clonedConf);
            HiveTableInputSplit oldSplit = new HiveTableInputSplit(-1, (InputSplit)split.toMapRedSplit(), clonedConf, split.getHiveTablePartition());
            this.hiveMapredSplitReader = new HiveMapredSplitReader(clonedConf, HiveBulkFormatAdapter.this.partitionKeys, HiveBulkFormatAdapter.this.fieldTypes, this.selectedFields, oldSplit, HiveBulkFormatAdapter.this.hiveShim);
            this.serializer = new RowDataSerializer(HiveBulkFormatAdapter.this.producedRowType);
        }

        public BulkFormat.RecordIterator<RowData> readBatch() throws IOException {
            Object[] records = new RowData[2048];
            long skipCount = this.numRead;
            int num = 0;
            while (!this.hiveMapredSplitReader.reachedEnd() && num < 2048) {
                records[num++] = this.serializer.copy(this.nextRecord());
            }
            if (num == 0) {
                return null;
            }
            ArrayResultIterator iterator = new ArrayResultIterator();
            iterator.set(records, num, -1L, skipCount);
            return iterator;
        }

        public void close() throws IOException {
            this.hiveMapredSplitReader.close();
        }

        private RowData nextRecord() throws IOException {
            RowData res = this.hiveMapredSplitReader.nextRecord(null);
            ++this.numRead;
            return res;
        }

        private void seek(long toSkip) throws IOException {
            while (!this.hiveMapredSplitReader.reachedEnd() && toSkip > 0L) {
                this.nextRecord();
                --toSkip;
            }
        }

        private void addSchemaToConf(JobConf jobConf) {
            List typeStrs = Arrays.stream(HiveBulkFormatAdapter.this.fieldTypes).map(t -> HiveTypeUtil.toHiveTypeInfo(t, true).toString()).collect(Collectors.toList());
            jobConf.set("columns", String.join((CharSequence)",", HiveBulkFormatAdapter.this.fieldNames));
            jobConf.set("columns.types", String.join((CharSequence)",", typeStrs));
            int numNonPartCol = HiveBulkFormatAdapter.this.fieldNames.length - HiveBulkFormatAdapter.this.partitionKeys.size();
            jobConf.set(HiveBulkFormatAdapter.SCHEMA_EVOLUTION_COLUMNS, String.join((CharSequence)",", Arrays.copyOfRange(HiveBulkFormatAdapter.this.fieldNames, 0, numNonPartCol)));
            jobConf.set(HiveBulkFormatAdapter.SCHEMA_EVOLUTION_COLUMNS_TYPES, String.join((CharSequence)",", typeStrs.subList(0, numNonPartCol)));
            String readColIDs = Arrays.stream(this.selectedFields).filter(i -> i < numNonPartCol).mapToObj(String::valueOf).collect(Collectors.joining(","));
            jobConf.set("hive.io.file.readcolumn.ids", readColIDs);
        }
    }

    private class HiveMapRedBulkFormat
    implements BulkFormat<RowData, HiveSourceSplit> {
        private static final long serialVersionUID = 1L;

        private HiveMapRedBulkFormat() {
        }

        public BulkFormat.Reader<RowData> createReader(Configuration config, HiveSourceSplit split) throws IOException {
            return new HiveReader(split);
        }

        public BulkFormat.Reader<RowData> restoreReader(Configuration config, HiveSourceSplit split) throws IOException {
            assert (split.getReaderPosition().isPresent());
            HiveReader hiveReader = new HiveReader(split);
            hiveReader.seek(((CheckpointedPosition)split.getReaderPosition().get()).getRecordsAfterOffset());
            return hiveReader;
        }

        public boolean isSplittable() {
            return true;
        }

        public TypeInformation<RowData> getProducedType() {
            return InternalTypeInfo.of((RowType)HiveBulkFormatAdapter.this.producedRowType);
        }
    }
}

