/*
 * Decompiled with CFR 0.152.
 */
package org.apache.comet.parquet;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.nio.channels.Channels;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.comet.CometConf;
import org.apache.comet.parquet.AbstractColumnReader;
import org.apache.comet.parquet.ConstantColumnReader;
import org.apache.comet.parquet.Native;
import org.apache.comet.parquet.NativeColumnReader;
import org.apache.comet.parquet.ReadOptions;
import org.apache.comet.shaded.arrow.c.CometSchemaImporter;
import org.apache.comet.shaded.arrow.memory.BufferAllocator;
import org.apache.comet.shaded.arrow.memory.RootAllocator;
import org.apache.comet.shaded.arrow.vector.ipc.WriteChannel;
import org.apache.comet.shaded.arrow.vector.ipc.message.MessageSerializer;
import org.apache.comet.shaded.arrow.vector.types.pojo.Schema;
import org.apache.comet.shims.ShimBatchReader;
import org.apache.comet.shims.ShimFileFormat;
import org.apache.comet.vector.CometVector;
import org.apache.comet.vector.NativeUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.comet.parquet.CometParquetReadSupport;
import org.apache.spark.sql.comet.util.Utils$;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.AccumulatorV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;

public class NativeBatchReader
extends RecordReader<Void, ColumnarBatch>
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(NativeBatchReader.class);
    protected static final BufferAllocator ALLOCATOR = new RootAllocator();
    private NativeUtil nativeUtil = new NativeUtil();
    private Configuration conf;
    private int capacity;
    private boolean isCaseSensitive;
    private boolean useFieldId;
    private boolean ignoreMissingIds;
    private StructType partitionSchema;
    private InternalRow partitionValues;
    private PartitionedFile file;
    private final Map<String, SQLMetric> metrics;
    private StructType sparkSchema;
    private MessageType requestedSchema;
    private CometVector[] vectors;
    private AbstractColumnReader[] columnReaders;
    private CometSchemaImporter importer;
    private ColumnarBatch currentBatch;
    private boolean[] missingColumns;
    private boolean isInitialized;
    private ParquetMetadata footer;
    private boolean useDecimal128;
    private boolean useLegacyDateTimestamp;
    private final TaskContext taskContext;
    private long handle;

    public NativeBatchReader(String file, int capacity) {
        this(file, capacity, null, null);
    }

    public NativeBatchReader(String file, int capacity, StructType partitionSchema, InternalRow partitionValues) {
        this(new Configuration(), file, capacity, partitionSchema, partitionValues);
    }

    public NativeBatchReader(Configuration conf, String file, int capacity, StructType partitionSchema, InternalRow partitionValues) {
        this.conf = conf;
        this.capacity = capacity;
        this.isCaseSensitive = false;
        this.useFieldId = false;
        this.ignoreMissingIds = false;
        this.partitionSchema = partitionSchema;
        this.partitionValues = partitionValues;
        this.file = ShimBatchReader.newPartitionedFile(partitionValues, file);
        this.metrics = new HashMap<String, SQLMetric>();
        this.taskContext = TaskContext$.MODULE$.get();
    }

    public NativeBatchReader(AbstractColumnReader[] columnReaders) {
        int numColumns = columnReaders.length;
        this.columnReaders = new AbstractColumnReader[numColumns];
        this.vectors = new CometVector[numColumns];
        this.currentBatch = new ColumnarBatch((ColumnVector[])this.vectors);
        this.isInitialized = true;
        this.taskContext = TaskContext$.MODULE$.get();
        this.metrics = new HashMap<String, SQLMetric>();
    }

    NativeBatchReader(Configuration conf, PartitionedFile inputSplit, ParquetMetadata footer, int capacity, StructType sparkSchema, boolean isCaseSensitive, boolean useFieldId, boolean ignoreMissingIds, boolean useLegacyDateTimestamp, StructType partitionSchema, InternalRow partitionValues, Map<String, SQLMetric> metrics) {
        this.conf = conf;
        this.capacity = capacity;
        this.sparkSchema = sparkSchema;
        this.isCaseSensitive = isCaseSensitive;
        this.useFieldId = useFieldId;
        this.ignoreMissingIds = ignoreMissingIds;
        this.useLegacyDateTimestamp = useLegacyDateTimestamp;
        this.partitionSchema = partitionSchema;
        this.partitionValues = partitionValues;
        this.file = inputSplit;
        this.footer = footer;
        this.metrics = metrics;
        this.taskContext = TaskContext$.MODULE$.get();
    }

    public void init() throws URISyntaxException, IOException {
        Option<AccumulatorV2<?, ?>> accu;
        this.useDecimal128 = this.conf.getBoolean(CometConf.COMET_USE_DECIMAL_128().key(), ((Boolean)CometConf.COMET_USE_DECIMAL_128().defaultValue().get()).booleanValue());
        long start = this.file.start();
        long length = this.file.length();
        String filePath = this.file.filePath().toString();
        long fileSize = this.file.fileSize();
        MessageType fileSchema = this.requestedSchema = this.footer.getFileMetaData().getSchema();
        HadoopReadOptions.Builder builder = HadoopReadOptions.builder((Configuration)this.conf, (Path)new Path(filePath));
        if (start >= 0L && length >= 0L) {
            builder = builder.withRange(start, start + length);
        }
        ParquetReadOptions readOptions = builder.build();
        ReadOptions cometReadOptions = ReadOptions.builder(this.conf).build();
        if (this.sparkSchema == null) {
            this.sparkSchema = new ParquetToSparkSchemaConverter(this.conf).convert(this.requestedSchema);
        } else {
            this.requestedSchema = CometParquetReadSupport.clipParquetSchema(this.requestedSchema, this.sparkSchema, this.isCaseSensitive, this.useFieldId, this.ignoreMissingIds);
            if (this.requestedSchema.getFieldCount() != this.sparkSchema.size()) {
                throw new IllegalArgumentException(String.format("Spark schema has %d columns while Parquet schema has %d columns", this.sparkSchema.size(), this.requestedSchema.getColumns().size()));
            }
        }
        String timeZoneId = this.conf.get("spark.sql.session.timeZone");
        Schema arrowSchema = Utils$.MODULE$.toArrowSchema(this.sparkSchema, timeZoneId);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        WriteChannel writeChannel = new WriteChannel(Channels.newChannel(out));
        MessageSerializer.serialize(writeChannel, arrowSchema);
        byte[] serializedRequestedArrowSchema = out.toByteArray();
        List columns = this.requestedSchema.getColumns();
        List fields = this.requestedSchema.getFields();
        int numColumns = fields.size();
        if (this.partitionSchema != null) {
            numColumns += this.partitionSchema.size();
        }
        this.columnReaders = new AbstractColumnReader[numColumns];
        this.missingColumns = new boolean[columns.size()];
        List paths = this.requestedSchema.getPaths();
        StructField[] nonPartitionFields = this.sparkSchema.fields();
        for (int i = 0; i < this.requestedSchema.getFieldCount(); ++i) {
            Type t = (Type)this.requestedSchema.getFields().get(i);
            Object[] colPath = (String[])paths.get(i);
            if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) {
                this.missingColumns[i] = true;
                continue;
            }
            if (fileSchema.containsPath((String[])colPath)) {
                ColumnDescriptor fd = fileSchema.getColumnDescription((String[])colPath);
                if (!fd.equals(columns.get(i))) {
                    throw new UnsupportedOperationException("Schema evolution is not supported");
                }
                this.missingColumns[i] = false;
                continue;
            }
            if (((ColumnDescriptor)columns.get(i)).getMaxDefinitionLevel() == 0) {
                throw new IOException("Required column '" + Arrays.toString(colPath) + "' is missing in data file " + filePath);
            }
            ConstantColumnReader reader = new ConstantColumnReader(nonPartitionFields[i], this.capacity, this.useDecimal128);
            this.columnReaders[i] = reader;
            this.missingColumns[i] = true;
        }
        if (this.partitionSchema != null) {
            StructField[] partitionFields = this.partitionSchema.fields();
            for (int i = columns.size(); i < this.columnReaders.length; ++i) {
                int fieldIndex = i - columns.size();
                StructField field = partitionFields[fieldIndex];
                ConstantColumnReader reader = new ConstantColumnReader(field, this.capacity, this.partitionValues, fieldIndex, this.useDecimal128);
                this.columnReaders[i] = reader;
            }
        }
        this.vectors = new CometVector[numColumns];
        this.currentBatch = new ColumnarBatch((ColumnVector[])this.vectors);
        if (this.taskContext != null && (accu = this.getTaskAccumulator(this.taskContext.taskMetrics())).isDefined() && ((AccumulatorV2)accu.get()).getClass().getSimpleName().equals("NumRowGroupsAcc")) {
            AccumulatorV2 accumulatorV2 = (AccumulatorV2)accu.get();
        }
        this.handle = Native.initRecordBatchReader(filePath, fileSize, start, length, serializedRequestedArrowSchema, timeZoneId);
        this.isInitialized = true;
    }

    public void setSparkSchema(StructType schema) {
        this.sparkSchema = schema;
    }

    public AbstractColumnReader[] getColumnReaders() {
        return this.columnReaders;
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    }

    public boolean nextKeyValue() throws IOException {
        return this.nextBatch();
    }

    public Void getCurrentKey() {
        return null;
    }

    public ColumnarBatch getCurrentValue() {
        return this.currentBatch();
    }

    public float getProgress() {
        return 0.0f;
    }

    public ColumnarBatch currentBatch() {
        return this.currentBatch;
    }

    public boolean nextBatch() throws IOException {
        int batchSize;
        Preconditions.checkState((boolean)this.isInitialized, (String)"init() should be called first!");
        try {
            batchSize = this.loadNextBatch();
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Throwable e) {
            throw new IOException(e);
        }
        if (batchSize == 0) {
            return false;
        }
        long totalDecodeTime = 0L;
        long totalLoadTime = 0L;
        for (int i = 0; i < this.columnReaders.length; ++i) {
            AbstractColumnReader reader = this.columnReaders[i];
            long startNs = System.nanoTime();
            reader.readBatch(batchSize);
            this.vectors[i] = reader.currentBatch();
            totalLoadTime += System.nanoTime() - startNs;
        }
        SQLMetric loadMetric = this.metrics.get("ParquetNativeLoadTime");
        if (loadMetric != null) {
            loadMetric.add(totalLoadTime);
        }
        this.currentBatch.setNumRows(batchSize);
        return true;
    }

    @Override
    public void close() throws IOException {
        if (this.columnReaders != null) {
            for (AbstractColumnReader reader : this.columnReaders) {
                if (reader == null) continue;
                reader.close();
            }
        }
        if (this.importer != null) {
            this.importer.close();
            this.importer = null;
        }
        this.nativeUtil.close();
        Native.closeRecordBatchReader(this.handle);
    }

    private int loadNextBatch() throws Throwable {
        long startNs = System.nanoTime();
        int batchSize = Native.readNextRecordBatch(this.handle);
        if (batchSize == 0) {
            return batchSize;
        }
        if (this.importer != null) {
            this.importer.close();
        }
        this.importer = new CometSchemaImporter(ALLOCATOR);
        List columns = this.requestedSchema.getColumns();
        List fields = this.requestedSchema.getFields();
        for (int i = 0; i < fields.size(); ++i) {
            if (this.missingColumns[i]) continue;
            if (this.columnReaders[i] != null) {
                this.columnReaders[i].close();
            }
            DataType dataType = this.sparkSchema.fields()[i].dataType();
            Type field = (Type)fields.get(i);
            NativeColumnReader reader = new NativeColumnReader(this.handle, i, dataType, field, null, this.importer, this.nativeUtil, this.capacity, this.useDecimal128, this.useLegacyDateTimestamp);
            this.columnReaders[i] = reader;
        }
        return batchSize;
    }

    private Option<AccumulatorV2<?, ?>> getTaskAccumulator(TaskMetrics taskMetrics) {
        try {
            Method externalAccumsMethod = TaskMetrics.class.getDeclaredMethod("externalAccums", new Class[0]);
            externalAccumsMethod.setAccessible(true);
            String returnType = externalAccumsMethod.getReturnType().getName();
            if (returnType.equals("scala.collection.mutable.Buffer")) {
                return ((Buffer)externalAccumsMethod.invoke((Object)taskMetrics, new Object[0])).lastOption();
            }
            if (returnType.equals("scala.collection.Seq")) {
                return ((Seq)externalAccumsMethod.invoke((Object)taskMetrics, new Object[0])).lastOption();
            }
            return Option.apply(null);
        }
        catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            return Option.apply(null);
        }
    }
}

