/*
 * Decompiled with CFR 0.152.
 */
package org.apache.comet.parquet;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.comet.CometConf;
import org.apache.comet.parquet.AbstractColumnReader;
import org.apache.comet.parquet.ColumnReader;
import org.apache.comet.parquet.CometInputFile;
import org.apache.comet.parquet.ConstantColumnReader;
import org.apache.comet.parquet.FileReader;
import org.apache.comet.parquet.ReadOptions;
import org.apache.comet.parquet.RowIndexColumnReader;
import org.apache.comet.parquet.Utils;
import org.apache.comet.shaded.arrow.c.CometSchemaImporter;
import org.apache.comet.shaded.arrow.memory.BufferAllocator;
import org.apache.comet.shaded.arrow.memory.RootAllocator;
import org.apache.comet.shims.ShimBatchReader;
import org.apache.comet.shims.ShimFileFormat;
import org.apache.comet.vector.CometVector;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.comet.parquet.CometParquetReadSupport;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.AccumulatorV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;

public class BatchReader
extends RecordReader<Void, ColumnarBatch>
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
    protected static final BufferAllocator ALLOCATOR = new RootAllocator();
    private Configuration conf;
    private int capacity;
    private boolean isCaseSensitive;
    private boolean useFieldId;
    private boolean ignoreMissingIds;
    private StructType partitionSchema;
    private InternalRow partitionValues;
    private PartitionedFile file;
    private final Map<String, SQLMetric> metrics;
    private long rowsRead;
    private StructType sparkSchema;
    private MessageType requestedSchema;
    private CometVector[] vectors;
    private AbstractColumnReader[] columnReaders;
    private CometSchemaImporter importer;
    private ColumnarBatch currentBatch;
    private Future<Option<Throwable>> prefetchTask;
    private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
    private FileReader fileReader;
    private boolean[] missingColumns;
    private boolean isInitialized;
    private ParquetMetadata footer;
    private long totalRowCount;
    private long totalRowsLoaded;
    private boolean useDecimal128;
    private boolean useLazyMaterialization;
    private boolean useLegacyDateTimestamp;
    private final TaskContext taskContext;

    public BatchReader(String file, int capacity) {
        this(file, capacity, null, null);
    }

    public BatchReader(String file, int capacity, StructType partitionSchema, InternalRow partitionValues) {
        this(new Configuration(), file, capacity, partitionSchema, partitionValues);
    }

    public BatchReader(Configuration conf, String file, int capacity, StructType partitionSchema, InternalRow partitionValues) {
        conf.set("spark.sql.parquet.binaryAsString", "false");
        conf.set("spark.sql.parquet.int96AsTimestamp", "false");
        conf.set("spark.sql.caseSensitive", "false");
        conf.set("spark.sql.parquet.inferTimestampNTZ.enabled", "true");
        conf.set("spark.sql.legacy.parquet.nanosAsLong", "false");
        this.conf = conf;
        this.capacity = capacity;
        this.isCaseSensitive = false;
        this.useFieldId = false;
        this.ignoreMissingIds = false;
        this.partitionSchema = partitionSchema;
        this.partitionValues = partitionValues;
        this.file = ShimBatchReader.newPartitionedFile(partitionValues, file);
        this.metrics = new HashMap<String, SQLMetric>();
        this.taskContext = TaskContext$.MODULE$.get();
    }

    public BatchReader(AbstractColumnReader[] columnReaders) {
        int numColumns = columnReaders.length;
        this.columnReaders = new AbstractColumnReader[numColumns];
        this.vectors = new CometVector[numColumns];
        this.currentBatch = new ColumnarBatch((ColumnVector[])this.vectors);
        this.isInitialized = true;
        this.taskContext = TaskContext$.MODULE$.get();
        this.metrics = new HashMap<String, SQLMetric>();
    }

    BatchReader(Configuration conf, PartitionedFile inputSplit, ParquetMetadata footer, int capacity, StructType sparkSchema, boolean isCaseSensitive, boolean useFieldId, boolean ignoreMissingIds, boolean useLegacyDateTimestamp, StructType partitionSchema, InternalRow partitionValues, Map<String, SQLMetric> metrics) {
        this.conf = conf;
        this.capacity = capacity;
        this.sparkSchema = sparkSchema;
        this.isCaseSensitive = isCaseSensitive;
        this.useFieldId = useFieldId;
        this.ignoreMissingIds = ignoreMissingIds;
        this.useLegacyDateTimestamp = useLegacyDateTimestamp;
        this.partitionSchema = partitionSchema;
        this.partitionValues = partitionValues;
        this.file = inputSplit;
        this.footer = footer;
        this.metrics = metrics;
        this.taskContext = TaskContext$.MODULE$.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() throws URISyntaxException, IOException {
        boolean preFetchEnabled;
        Option<AccumulatorV2<?, ?>> accu;
        this.useDecimal128 = this.conf.getBoolean(CometConf.COMET_USE_DECIMAL_128().key(), ((Boolean)CometConf.COMET_USE_DECIMAL_128().defaultValue().get()).booleanValue());
        this.useLazyMaterialization = this.conf.getBoolean(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), ((Boolean)CometConf.COMET_USE_LAZY_MATERIALIZATION().defaultValue().get()).booleanValue());
        long start = this.file.start();
        long length = this.file.length();
        String filePath = this.file.filePath().toString();
        HadoopReadOptions.Builder builder = HadoopReadOptions.builder((Configuration)this.conf, (Path)new Path(filePath));
        if (start >= 0L && length >= 0L) {
            builder = builder.withRange(start, start + length);
        }
        ParquetReadOptions readOptions = builder.build();
        ReadOptions cometReadOptions = ReadOptions.builder(this.conf).build();
        Path path = new Path(new URI(filePath));
        this.fileReader = new FileReader(CometInputFile.fromPath(path, this.conf), this.footer, readOptions, cometReadOptions, this.metrics);
        MessageType fileSchema = this.requestedSchema = this.fileReader.getFileMetaData().getSchema();
        if (this.sparkSchema == null) {
            this.sparkSchema = new ParquetToSparkSchemaConverter(this.conf).convert(this.requestedSchema);
        } else {
            this.requestedSchema = CometParquetReadSupport.clipParquetSchema(this.requestedSchema, this.sparkSchema, this.isCaseSensitive, this.useFieldId, this.ignoreMissingIds);
            if (this.requestedSchema.getFieldCount() != this.sparkSchema.size()) {
                throw new IllegalArgumentException(String.format("Spark schema has %d columns while Parquet schema has %d columns", this.sparkSchema.size(), this.requestedSchema.getColumns().size()));
            }
        }
        this.totalRowCount = this.fileReader.getRecordCount();
        List columns = this.requestedSchema.getColumns();
        int numColumns = columns.size();
        if (this.partitionSchema != null) {
            numColumns += this.partitionSchema.size();
        }
        this.columnReaders = new AbstractColumnReader[numColumns];
        this.missingColumns = new boolean[columns.size()];
        List paths = this.requestedSchema.getPaths();
        StructField[] nonPartitionFields = this.sparkSchema.fields();
        ShimFileFormat.findRowIndexColumnIndexInSchema(this.sparkSchema);
        for (int i = 0; i < this.requestedSchema.getFieldCount(); ++i) {
            Type t = (Type)this.requestedSchema.getFields().get(i);
            Preconditions.checkState((t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED) ? 1 : 0) != 0, (String)"Complex type is not supported");
            Object[] colPath = (String[])paths.get(i);
            if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) {
                long[] rowIndices = this.fileReader.getRowIndices();
                this.columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], this.capacity, rowIndices);
                this.missingColumns[i] = true;
                continue;
            }
            if (fileSchema.containsPath((String[])colPath)) {
                ColumnDescriptor fd = fileSchema.getColumnDescription((String[])colPath);
                if (!fd.equals(columns.get(i))) {
                    throw new UnsupportedOperationException("Schema evolution is not supported");
                }
                this.missingColumns[i] = false;
                continue;
            }
            if (((ColumnDescriptor)columns.get(i)).getMaxDefinitionLevel() == 0) {
                throw new IOException("Required column '" + Arrays.toString(colPath) + "' is missing in data file " + filePath);
            }
            ConstantColumnReader reader = new ConstantColumnReader(nonPartitionFields[i], this.capacity, this.useDecimal128);
            this.columnReaders[i] = reader;
            this.missingColumns[i] = true;
        }
        if (this.partitionSchema != null) {
            StructField[] partitionFields = this.partitionSchema.fields();
            for (int i = columns.size(); i < this.columnReaders.length; ++i) {
                int fieldIndex = i - columns.size();
                StructField field = partitionFields[fieldIndex];
                ConstantColumnReader reader = new ConstantColumnReader(field, this.capacity, this.partitionValues, fieldIndex, this.useDecimal128);
                this.columnReaders[i] = reader;
            }
        }
        this.vectors = new CometVector[numColumns];
        this.currentBatch = new ColumnarBatch((ColumnVector[])this.vectors);
        this.fileReader.setRequestedSchema(this.requestedSchema.getColumns());
        if (this.taskContext != null && (accu = this.getTaskAccumulator(this.taskContext.taskMetrics())).isDefined() && ((AccumulatorV2)accu.get()).getClass().getSimpleName().equals("NumRowGroupsAcc")) {
            AccumulatorV2 intAccum = (AccumulatorV2)accu.get();
            intAccum.add((Object)this.fileReader.getRowGroups().size());
        }
        if (preFetchEnabled = this.conf.getBoolean(CometConf.COMET_SCAN_PREFETCH_ENABLED().key(), ((Boolean)CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get()).booleanValue())) {
            LOG.info("Prefetch enabled for BatchReader.");
            this.prefetchQueue = new LinkedBlockingQueue();
        }
        this.isInitialized = true;
        BatchReader batchReader = this;
        synchronized (batchReader) {
            this.notifyAll();
        }
    }

    public void setSparkSchema(StructType schema) {
        this.sparkSchema = schema;
    }

    public AbstractColumnReader[] getColumnReaders() {
        return this.columnReaders;
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    }

    public boolean nextKeyValue() throws IOException {
        return this.nextBatch();
    }

    public Void getCurrentKey() {
        return null;
    }

    public ColumnarBatch getCurrentValue() {
        return this.currentBatch();
    }

    public float getProgress() {
        return (float)this.rowsRead / (float)this.totalRowCount;
    }

    public ColumnarBatch currentBatch() {
        return this.currentBatch;
    }

    public Future<Option<Throwable>> getPrefetchTask() {
        return this.prefetchTask;
    }

    public LinkedBlockingQueue<Pair<PageReadStore, Long>> getPrefetchQueue() {
        return this.prefetchQueue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean nextBatch() throws IOException {
        boolean hasMore;
        if (this.prefetchTask == null) {
            Preconditions.checkState((boolean)this.isInitialized, (String)"init() should be called first!");
        } else {
            while (!this.isInitialized) {
                BatchReader batchReader = this;
                synchronized (batchReader) {
                    try {
                        Option<Throwable> exception;
                        this.wait(100L);
                        if (this.prefetchTask.isDone() && (exception = this.prefetchTask.get()).isDefined()) {
                            throw (Throwable)exception.get();
                        }
                    }
                    catch (RuntimeException e) {
                        throw e;
                    }
                    catch (Throwable e) {
                        throw new IOException(e);
                    }
                }
            }
        }
        if (this.rowsRead >= this.totalRowCount) {
            return false;
        }
        try {
            hasMore = this.loadNextRowGroupIfNecessary();
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Throwable e) {
            throw new IOException(e);
        }
        if (!hasMore) {
            return false;
        }
        int batchSize = (int)Math.min((long)this.capacity, this.totalRowsLoaded - this.rowsRead);
        return this.nextBatch(batchSize);
    }

    public boolean nextBatch(int batchSize) {
        SQLMetric loadMetric;
        long totalDecodeTime = 0L;
        long totalLoadTime = 0L;
        for (int i = 0; i < this.columnReaders.length; ++i) {
            AbstractColumnReader reader = this.columnReaders[i];
            long startNs = System.nanoTime();
            reader.readBatch(batchSize);
            totalDecodeTime += System.nanoTime() - startNs;
            startNs = System.nanoTime();
            this.vectors[i] = reader.currentBatch();
            totalLoadTime += System.nanoTime() - startNs;
        }
        SQLMetric decodeMetric = this.metrics.get("ParquetNativeDecodeTime");
        if (decodeMetric != null) {
            decodeMetric.add(totalDecodeTime);
        }
        if ((loadMetric = this.metrics.get("ParquetNativeLoadTime")) != null) {
            loadMetric.add(totalLoadTime);
        }
        this.currentBatch.setNumRows(batchSize);
        this.rowsRead += (long)batchSize;
        return true;
    }

    @Override
    public void close() throws IOException {
        if (this.columnReaders != null) {
            for (AbstractColumnReader reader : this.columnReaders) {
                if (reader == null) continue;
                reader.close();
            }
        }
        if (this.fileReader != null) {
            this.fileReader.close();
            this.fileReader = null;
        }
        if (this.importer != null) {
            this.importer.close();
            this.importer = null;
        }
    }

    private boolean loadNextRowGroupIfNecessary() throws Throwable {
        if (this.rowsRead != this.totalRowsLoaded) {
            return true;
        }
        SQLMetric rowGroupTimeMetric = this.metrics.get("ParquetLoadRowGroupTime");
        SQLMetric numRowGroupsMetric = this.metrics.get("ParquetRowGroups");
        long startNs = System.nanoTime();
        PageReadStore rowGroupReader = null;
        if (this.prefetchTask != null && this.prefetchQueue != null) {
            Pair<PageReadStore, Long> rowGroupReaderPair = this.prefetchQueue.take();
            rowGroupReader = (PageReadStore)rowGroupReaderPair.getLeft();
            long incBytesRead = (Long)rowGroupReaderPair.getRight();
            FileSystem.getAllStatistics().stream().forEach(statistic -> statistic.incrementBytesRead(incBytesRead));
        } else {
            rowGroupReader = this.fileReader.readNextRowGroup();
        }
        if (rowGroupTimeMetric != null) {
            rowGroupTimeMetric.add(System.nanoTime() - startNs);
        }
        if (rowGroupReader == null) {
            return false;
        }
        if (numRowGroupsMetric != null) {
            numRowGroupsMetric.add(1L);
        }
        if (this.importer != null) {
            this.importer.close();
        }
        this.importer = new CometSchemaImporter(ALLOCATOR);
        List columns = this.requestedSchema.getColumns();
        for (int i = 0; i < columns.size(); ++i) {
            if (this.missingColumns[i]) continue;
            if (this.columnReaders[i] != null) {
                this.columnReaders[i].close();
            }
            DataType dataType = this.sparkSchema.fields()[i].dataType();
            ColumnReader reader = Utils.getColumnReader(dataType, (ColumnDescriptor)columns.get(i), this.importer, this.capacity, this.useDecimal128, this.useLazyMaterialization, this.useLegacyDateTimestamp);
            reader.setPageReader(rowGroupReader.getPageReader((ColumnDescriptor)columns.get(i)));
            this.columnReaders[i] = reader;
        }
        this.totalRowsLoaded += rowGroupReader.getRowCount();
        return true;
    }

    public void submitPrefetchTask(ExecutorService threadPool) {
        this.prefetchTask = threadPool.submit(new PrefetchTask());
    }

    private Option<AccumulatorV2<?, ?>> getTaskAccumulator(TaskMetrics taskMetrics) {
        try {
            Method externalAccumsMethod = TaskMetrics.class.getDeclaredMethod("externalAccums", new Class[0]);
            externalAccumsMethod.setAccessible(true);
            String returnType = externalAccumsMethod.getReturnType().getName();
            if (returnType.equals("scala.collection.mutable.Buffer")) {
                return ((Buffer)externalAccumsMethod.invoke((Object)taskMetrics, new Object[0])).lastOption();
            }
            if (returnType.equals("scala.collection.Seq")) {
                return ((Seq)externalAccumsMethod.invoke((Object)taskMetrics, new Object[0])).lastOption();
            }
            return Option.apply(null);
        }
        catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            return Option.apply(null);
        }
    }

    private class PrefetchTask
    implements Callable<Option<Throwable>> {
        private PrefetchTask() {
        }

        private long getBytesRead() {
            return FileSystem.getAllStatistics().stream().mapToLong(s -> s.getThreadStatistics().getBytesRead()).sum();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public Option<Throwable> call() throws Exception {
            long baseline = this.getBytesRead();
            try {
                BatchReader.this.init();
                while (true) {
                    PageReadStore rowGroupReader;
                    if ((rowGroupReader = BatchReader.this.fileReader.readNextRowGroup()) == null) {
                        Option option = Option.empty();
                        return option;
                    }
                    long incBytesRead = this.getBytesRead() - baseline;
                    BatchReader.this.prefetchQueue.add((Pair<PageReadStore, Long>)Pair.of((Object)rowGroupReader, (Object)incBytesRead));
                    continue;
                    break;
                }
            }
            catch (Throwable e) {
                Option option = Option.apply((Object)e);
                return option;
            }
            finally {
                if (BatchReader.this.fileReader != null) {
                    BatchReader.this.fileReader.closeStream();
                }
            }
        }
    }
}

