package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.Statistics;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/spark/source/Reader.class */
class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics {
    private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
    private static final Filter[] NO_FILTERS = new Filter[0];
    private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
    private final JavaSparkContext sparkContext;
    private final Table table;
    private final SparkReadConf readConf;
    private final TableScan baseScan;
    private final boolean localityPreferred;
    private final boolean readTimestampWithoutZone;
    private Schema schema;
    private StructType requestedSchema = null;
    private List<Expression> filterExpressions = null;
    private Filter[] pushedFilters = NO_FILTERS;
    private StructType type = null;
    private List<CombinedScanTask> tasks = null;
    private Boolean readUsingBatch = null;
    private int batchSize = 0;

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$BatchReader.class */
    private static class BatchReader extends BatchDataReader implements InputPartitionReader<ColumnarBatch> {
        BatchReader(CombinedScanTask combinedScanTask, Table table, Schema schema, boolean z, int i) {
            super(combinedScanTask, table, schema, z, i);
        }
    }

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$BatchReaderFactory.class */
    private static class BatchReaderFactory implements ReaderFactory<ColumnarBatch> {
        private final int batchSize;

        BatchReaderFactory(int i) {
            this.batchSize = i;
        }

        @Override // org.apache.iceberg.spark.source.Reader.ReaderFactory
        public InputPartitionReader<ColumnarBatch> create(CombinedScanTask combinedScanTask, Table table, Schema schema, boolean z) {
            return new BatchReader(combinedScanTask, table, schema, z, this.batchSize);
        }
    }

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$InternalRowReaderFactory.class */
    private static class InternalRowReaderFactory implements ReaderFactory<InternalRow> {
        private static final InternalRowReaderFactory INSTANCE = new InternalRowReaderFactory();

        private InternalRowReaderFactory() {
        }

        @Override // org.apache.iceberg.spark.source.Reader.ReaderFactory
        public InputPartitionReader<InternalRow> create(CombinedScanTask combinedScanTask, Table table, Schema schema, boolean z) {
            return new RowReader(combinedScanTask, table, schema, z);
        }
    }

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$ReadTask.class */
    private static class ReadTask<T> implements Serializable, InputPartition<T> {
        private final CombinedScanTask task;
        private final Broadcast<Table> tableBroadcast;
        private final String expectedSchemaString;
        private final boolean caseSensitive;
        private final boolean localityPreferred;
        private final ReaderFactory<T> readerFactory;
        private transient Schema expectedSchema;
        private transient String[] preferredLocations;

        private ReadTask(CombinedScanTask combinedScanTask, Broadcast<Table> broadcast, String str, boolean z, boolean z2, ReaderFactory<T> readerFactory) {
            this.expectedSchema = null;
            this.preferredLocations = null;
            this.task = combinedScanTask;
            this.tableBroadcast = broadcast;
            this.expectedSchemaString = str;
            this.caseSensitive = z;
            this.localityPreferred = z2;
            this.preferredLocations = getPreferredLocations();
            this.readerFactory = readerFactory;
        }

        public InputPartitionReader<T> createPartitionReader() {
            return this.readerFactory.create(this.task, (Table) this.tableBroadcast.value(), lazyExpectedSchema(), this.caseSensitive);
        }

        public String[] preferredLocations() {
            return this.preferredLocations;
        }

        private Schema lazyExpectedSchema() {
            if (this.expectedSchema == null) {
                this.expectedSchema = SchemaParser.fromJson(this.expectedSchemaString);
            }
            return this.expectedSchema;
        }

        private String[] getPreferredLocations() {
            if (!this.localityPreferred) {
                return new String[0];
            }
            return Util.blockLocations(this.task, SparkSession.active().sparkContext().hadoopConfiguration());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$ReaderFactory.class */
    public interface ReaderFactory<T> extends Serializable {
        InputPartitionReader<T> create(CombinedScanTask combinedScanTask, Table table, Schema schema, boolean z);
    }

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$RowReader.class */
    private static class RowReader extends RowDataReader implements InputPartitionReader<InternalRow> {
        RowReader(CombinedScanTask combinedScanTask, Table table, Schema schema, boolean z) {
            super(combinedScanTask, table, schema, z);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Reader(SparkSession sparkSession, Table table, boolean z, DataSourceOptions dataSourceOptions) {
        this.schema = null;
        this.sparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        this.table = table;
        this.readConf = new SparkReadConf(sparkSession, table, dataSourceOptions.asMap());
        this.baseScan = configureBaseScan(z, dataSourceOptions);
        this.schema = this.baseScan.schema();
        if (table.io() instanceof HadoopFileIO) {
            String str = "no_exist";
            try {
                Configuration newHadoopConf = SparkSession.active().sessionState().newHadoopConf();
                mergeIcebergHadoopConfs(newHadoopConf, table.properties());
                mergeIcebergHadoopConfs(newHadoopConf, dataSourceOptions.asMap());
                str = new Path(table.location()).getFileSystem(newHadoopConf).getScheme().toLowerCase(Locale.ENGLISH);
            } catch (IOException e) {
                LOG.warn("Failed to get Hadoop Filesystem", e);
            }
            String str2 = str;
            this.localityPreferred = ((Boolean) dataSourceOptions.get(SparkReadOptions.LOCALITY).map(Boolean::parseBoolean).orElseGet(() -> {
                return Boolean.valueOf(LOCALITY_WHITELIST_FS.contains(str2));
            })).booleanValue();
        } else {
            this.localityPreferred = false;
        }
        this.readTimestampWithoutZone = this.readConf.handleTimestampWithoutZone();
    }

    private void validateOptions(Long l, Long l2, Long l3, Long l4) {
        if (l != null && l2 != null) {
            throw new IllegalArgumentException("Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
        }
        if ((l != null || l2 != null) && (l3 != null || l4 != null)) {
            throw new IllegalArgumentException("Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or as-of-timestamp is specified");
        }
        if (l3 == null && l4 != null) {
            throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
        }
    }

    private TableScan configureBaseScan(boolean z, DataSourceOptions dataSourceOptions) {
        Long snapshotId = this.readConf.snapshotId();
        Long asOfTimestamp = this.readConf.asOfTimestamp();
        Long startSnapshotId = this.readConf.startSnapshotId();
        Long endSnapshotId = this.readConf.endSnapshotId();
        validateOptions(snapshotId, asOfTimestamp, startSnapshotId, endSnapshotId);
        TableScan caseSensitive = this.table.newScan().caseSensitive(z);
        if (snapshotId != null) {
            caseSensitive = caseSensitive.useSnapshot(snapshotId.longValue());
        }
        if (asOfTimestamp != null) {
            caseSensitive = caseSensitive.asOfTime(asOfTimestamp.longValue());
        }
        if (startSnapshotId != null) {
            caseSensitive = endSnapshotId != null ? caseSensitive.appendsBetween(startSnapshotId.longValue(), endSnapshotId.longValue()) : caseSensitive.appendsAfter(startSnapshotId.longValue());
        }
        Long l = (Long) dataSourceOptions.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null);
        if (l != null) {
            caseSensitive = caseSensitive.option(TableProperties.SPLIT_SIZE, l.toString());
        }
        Integer num = (Integer) dataSourceOptions.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
        if (num != null) {
            caseSensitive = caseSensitive.option(TableProperties.SPLIT_LOOKBACK, num.toString());
        }
        Long l2 = (Long) dataSourceOptions.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
        if (l2 != null) {
            caseSensitive = caseSensitive.option(TableProperties.SPLIT_OPEN_FILE_COST, l2.toString());
        }
        return caseSensitive;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Schema snapshotSchema() {
        return this.baseScan.schema();
    }

    private Schema lazySchema() {
        if (this.schema == null) {
            if (this.requestedSchema != null) {
                this.schema = SparkSchemaUtil.prune(this.baseScan.schema(), this.requestedSchema, filterExpression(), this.baseScan.isCaseSensitive());
            } else {
                this.schema = this.baseScan.schema();
            }
        }
        return this.schema;
    }

    private Expression filterExpression() {
        return this.filterExpressions != null ? this.filterExpressions.stream().reduce(Expressions.alwaysTrue(), Expressions::and) : Expressions.alwaysTrue();
    }

    private StructType lazyType() {
        if (this.type == null) {
            Preconditions.checkArgument(this.readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(lazySchema()), SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
            this.type = SparkSchemaUtil.convert(lazySchema());
        }
        return this.type;
    }

    public StructType readSchema() {
        return lazyType();
    }

    public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
        Preconditions.checkState(enableBatchRead(), "Batched reads not enabled");
        Preconditions.checkState(this.batchSize > 0, "Invalid batch size");
        String json = SchemaParser.toJson(lazySchema());
        ValidationException.check(tasks().stream().noneMatch(TableScanUtil::hasDeletes), "Cannot scan table %s: cannot apply required delete files", this.table);
        Broadcast broadcast = this.sparkContext.broadcast(SerializableTable.copyOf(this.table));
        List<CombinedScanTask> tasks = tasks();
        boolean isCaseSensitive = this.baseScan.isCaseSensitive();
        InputPartition[] inputPartitionArr = new InputPartition[tasks.size()];
        Tasks.range(inputPartitionArr.length).stopOnFailure().executeWith(this.localityPreferred ? ThreadPools.getWorkerPool() : null).run(num -> {
            inputPartitionArr[num.intValue()] = new ReadTask((CombinedScanTask) tasks.get(num.intValue()), broadcast, json, isCaseSensitive, this.localityPreferred, new BatchReaderFactory(this.batchSize));
        });
        LOG.info("Batching input partitions with {} tasks.", Integer.valueOf(inputPartitionArr.length));
        return Arrays.asList(inputPartitionArr);
    }

    public List<InputPartition<InternalRow>> planInputPartitions() {
        String json = SchemaParser.toJson(lazySchema());
        Broadcast broadcast = this.sparkContext.broadcast(SerializableTable.copyOf(this.table));
        List<CombinedScanTask> tasks = tasks();
        boolean isCaseSensitive = this.baseScan.isCaseSensitive();
        InputPartition[] inputPartitionArr = new InputPartition[tasks.size()];
        Tasks.range(inputPartitionArr.length).stopOnFailure().executeWith(this.localityPreferred ? ThreadPools.getWorkerPool() : null).run(num -> {
            inputPartitionArr[num.intValue()] = new ReadTask((CombinedScanTask) tasks.get(num.intValue()), broadcast, json, isCaseSensitive, this.localityPreferred, InternalRowReaderFactory.INSTANCE);
        });
        return Arrays.asList(inputPartitionArr);
    }

    public Filter[] pushFilters(Filter[] filterArr) {
        this.tasks = null;
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(filterArr.length);
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(filterArr.length);
        for (Filter filter : filterArr) {
            Expression convert = SparkFilters.convert(filter);
            if (convert != null) {
                newArrayListWithExpectedSize.add(convert);
                newArrayListWithExpectedSize2.add(filter);
            }
        }
        this.filterExpressions = newArrayListWithExpectedSize;
        this.pushedFilters = (Filter[]) newArrayListWithExpectedSize2.toArray(new Filter[0]);
        this.schema = null;
        this.type = null;
        return filterArr;
    }

    public Filter[] pushedFilters() {
        return this.pushedFilters;
    }

    public void pruneColumns(StructType structType) {
        this.requestedSchema = structType;
        this.schema = null;
        this.type = null;
    }

    public Statistics estimateStatistics() {
        if (this.table.currentSnapshot() == null) {
            return new Stats(0L, 0L);
        }
        if (!this.table.spec().isUnpartitioned() && filterExpression() == Expressions.alwaysTrue()) {
            long propertyAsLong = PropertyUtil.propertyAsLong(this.table.currentSnapshot().summary(), SnapshotSummary.TOTAL_RECORDS_PROP, TableProperties.MAX_REF_AGE_MS_DEFAULT);
            return new Stats(SparkSchemaUtil.estimateSize(lazyType(), propertyAsLong), propertyAsLong);
        }
        long j = 0;
        Iterator<CombinedScanTask> it = tasks().iterator();
        while (it.hasNext()) {
            for (FileScanTask fileScanTask : it.next().files()) {
                j = (long) (j + ((fileScanTask.length() / fileScanTask.file().fileSizeInBytes()) * fileScanTask.file().recordCount()));
            }
        }
        return new Stats(SparkSchemaUtil.estimateSize(lazyType(), j), j);
    }

    public boolean enableBatchRead() {
        if (this.readUsingBatch == null) {
            boolean allMatch = tasks().stream().allMatch(combinedScanTask -> {
                return !combinedScanTask.isDataTask() && combinedScanTask.files().stream().allMatch(fileScanTask -> {
                    return fileScanTask.file().format().equals(FileFormat.PARQUET);
                });
            });
            boolean allMatch2 = tasks().stream().allMatch(combinedScanTask2 -> {
                return !combinedScanTask2.isDataTask() && combinedScanTask2.files().stream().allMatch(fileScanTask -> {
                    return fileScanTask.file().format().equals(FileFormat.ORC);
                });
            });
            this.readUsingBatch = Boolean.valueOf(batchReadsEnabled(allMatch, allMatch2) && tasks().stream().noneMatch(TableScanUtil::hasDeletes) && (allMatch2 || (allMatch && (lazySchema().columns().size() > 0) && lazySchema().columns().stream().allMatch(nestedField -> {
                return nestedField.type().isPrimitiveType();
            }))));
            if (this.readUsingBatch.booleanValue()) {
                this.batchSize = batchSize(allMatch, allMatch2);
            }
        }
        return this.readUsingBatch.booleanValue();
    }

    private boolean batchReadsEnabled(boolean z, boolean z2) {
        if (z) {
            return this.readConf.parquetVectorizationEnabled();
        }
        if (z2) {
            return this.readConf.orcVectorizationEnabled();
        }
        return false;
    }

    private int batchSize(boolean z, boolean z2) {
        if (z) {
            return this.readConf.parquetBatchSize();
        }
        if (z2) {
            return this.readConf.orcBatchSize();
        }
        return 0;
    }

    private static void mergeIcebergHadoopConfs(Configuration configuration, Map<String, String> map) {
        map.keySet().stream().filter(str -> {
            return str.startsWith("hadoop.");
        }).forEach(str2 -> {
            configuration.set(str2.replaceFirst("hadoop.", ""), (String) map.get(str2));
        });
    }

    private List<CombinedScanTask> tasks() {
        if (this.tasks == null) {
            TableScan project = this.baseScan.project(lazySchema());
            if (this.filterExpressions != null) {
                Iterator<Expression> it = this.filterExpressions.iterator();
                while (it.hasNext()) {
                    project = project.filter(it.next());
                }
            }
            try {
                CloseableIterable<CombinedScanTask> planTasks = project.planTasks();
                Throwable th = null;
                try {
                    try {
                        this.tasks = Lists.newArrayList(planTasks);
                        if (planTasks != null) {
                            if (0 != 0) {
                                try {
                                    planTasks.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                planTasks.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeIOException(e, "Failed to close table scan: %s", project);
            }
        }
        return this.tasks;
    }

    public String toString() {
        return String.format("IcebergScan(table=%s, type=%s, filters=%s, batchedReads=%s)", this.table, lazySchema().asStruct(), this.filterExpressions, Boolean.valueOf(enableBatchRead()));
    }
}
