package org.apache.iceberg;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.JobGroupUtils;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.actions.ManifestFileBean;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;

/* loaded from: input_file:org/apache/iceberg/SparkDistributedDataScan.class */
public class SparkDistributedDataScan extends BaseDistributedDataScan {
    private static final Joiner COMMA = Joiner.on(',');
    private static final String DELETE_PLANNING_JOB_GROUP_ID = "DELETE-PLANNING";
    private static final String DATA_PLANNING_JOB_GROUP_ID = "DATA-PLANNING";
    private final SparkSession spark;
    private final JavaSparkContext sparkContext;
    private final SparkReadConf readConf;
    private Broadcast<Table> tableBroadcast;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/SparkDistributedDataScan$ReadDataManifest.class */
    public static class ReadDataManifest implements FlatMapFunction<ManifestFileBean, DataFile> {
        private final Broadcast<Table> table;
        private final Expression filter;
        private final boolean withStats;
        private final boolean isCaseSensitive;

        ReadDataManifest(Broadcast<Table> broadcast, TableScanContext tableScanContext, boolean z) {
            this.table = broadcast;
            this.filter = tableScanContext.rowFilter();
            this.withStats = z;
            this.isCaseSensitive = tableScanContext.caseSensitive();
        }

        public Iterator<DataFile> call(ManifestFileBean manifestFileBean) throws Exception {
            return new ClosingIterator(ManifestFiles.read(manifestFileBean, ((Table) this.table.value()).io(), ((Table) this.table.value()).specs()).select(this.withStats ? BaseScan.SCAN_WITH_STATS_COLUMNS : BaseScan.SCAN_COLUMNS).filterRows(this.filter).caseSensitive(this.isCaseSensitive).iterator());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/SparkDistributedDataScan$ReadDeleteManifest.class */
    public static class ReadDeleteManifest implements FlatMapFunction<ManifestFileBean, DeleteFile> {
        private final Broadcast<Table> table;
        private final Expression filter;
        private final boolean isCaseSensitive;

        ReadDeleteManifest(Broadcast<Table> broadcast, TableScanContext tableScanContext) {
            this.table = broadcast;
            this.filter = tableScanContext.rowFilter();
            this.isCaseSensitive = tableScanContext.caseSensitive();
        }

        public Iterator<DeleteFile> call(ManifestFileBean manifestFileBean) throws Exception {
            return new ClosingIterator(ManifestFiles.readDeleteManifest(manifestFileBean, ((Table) this.table.value()).io(), ((Table) this.table.value()).specs()).select(BaseScan.DELETE_SCAN_WITH_STATS_COLUMNS).filterRows(this.filter).caseSensitive(this.isCaseSensitive).iterator());
        }
    }

    public SparkDistributedDataScan(SparkSession sparkSession, Table table, SparkReadConf sparkReadConf) {
        this(sparkSession, table, sparkReadConf, table.schema(), newTableScanContext(table));
    }

    private SparkDistributedDataScan(SparkSession sparkSession, Table table, SparkReadConf sparkReadConf, Schema schema, TableScanContext tableScanContext) {
        super(table, schema, tableScanContext);
        this.tableBroadcast = null;
        this.spark = sparkSession;
        this.sparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        this.readConf = sparkReadConf;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: newRefinedScan, reason: merged with bridge method [inline-methods] */
    public BatchScan m1newRefinedScan(Table table, Schema schema, TableScanContext tableScanContext) {
        return new SparkDistributedDataScan(this.spark, table, this.readConf, schema, tableScanContext);
    }

    protected int remoteParallelism() {
        return this.readConf.parallelism();
    }

    protected PlanningMode dataPlanningMode() {
        return this.readConf.dataPlanningMode();
    }

    protected boolean shouldCopyRemotelyPlannedDataFiles() {
        return false;
    }

    protected Iterable<CloseableIterable<DataFile>> planDataRemotely(List<ManifestFile> list, boolean z) {
        return (Iterable) withJobGroupInfo(new JobGroupInfo(DATA_PLANNING_JOB_GROUP_ID, jobDesc("data")), () -> {
            return doPlanDataRemotely(list, z);
        });
    }

    private Iterable<CloseableIterable<DataFile>> doPlanDataRemotely(List<ManifestFile> list, boolean z) {
        scanMetrics().scannedDataManifests().increment(list.size());
        List collectPartitions = collectPartitions(this.sparkContext.parallelize(toBeans(list), list.size()).flatMap(new ReadDataManifest(tableBroadcast(), context(), z)));
        scanMetrics().skippedDataFiles().increment(liveFilesCount(list) - collectPartitions.stream().mapToInt((v0) -> {
            return v0.size();
        }).sum());
        return Iterables.transform(collectPartitions, (v0) -> {
            return CloseableIterable.withNoopClose(v0);
        });
    }

    protected PlanningMode deletePlanningMode() {
        return this.readConf.deletePlanningMode();
    }

    protected DeleteFileIndex planDeletesRemotely(List<ManifestFile> list) {
        return (DeleteFileIndex) withJobGroupInfo(new JobGroupInfo(DELETE_PLANNING_JOB_GROUP_ID, jobDesc("deletes")), () -> {
            return doPlanDeletesRemotely(list);
        });
    }

    private DeleteFileIndex doPlanDeletesRemotely(List<ManifestFile> list) {
        scanMetrics().scannedDeleteManifests().increment(list.size());
        List collect = this.sparkContext.parallelize(toBeans(list), list.size()).flatMap(new ReadDeleteManifest(tableBroadcast(), context())).collect();
        scanMetrics().skippedDeleteFiles().increment(liveFilesCount(list) - collect.size());
        return DeleteFileIndex.builderFor(collect).specsById(table().specs()).caseSensitive(isCaseSensitive()).scanMetrics(scanMetrics()).build();
    }

    private <T> T withJobGroupInfo(JobGroupInfo jobGroupInfo, Supplier<T> supplier) {
        return (T) JobGroupUtils.withJobGroupInfo(this.sparkContext, jobGroupInfo, supplier);
    }

    private String jobDesc(String str) {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add("snapshot_id=" + snapshot().snapshotId());
        return String.format("Planning %s (%s) for %s", str, COMMA.join(newArrayList), table().name());
    }

    private List<ManifestFileBean> toBeans(List<ManifestFile> list) {
        return (List) list.stream().map(ManifestFileBean::fromManifest).collect(Collectors.toList());
    }

    private Broadcast<Table> tableBroadcast() {
        if (this.tableBroadcast == null) {
            this.tableBroadcast = this.sparkContext.broadcast(SerializableTableWithSize.copyOf(table()));
        }
        return this.tableBroadcast;
    }

    private <T> List<List<T>> collectPartitions(JavaRDD<T> javaRDD) {
        return Arrays.asList(javaRDD.collectPartitions(IntStream.range(0, javaRDD.getNumPartitions()).toArray()));
    }

    private int liveFilesCount(List<ManifestFile> list) {
        return list.stream().mapToInt(this::liveFilesCount).sum();
    }

    private int liveFilesCount(ManifestFile manifestFile) {
        return manifestFile.existingFilesCount().intValue() + manifestFile.addedFilesCount().intValue();
    }

    private static TableScanContext newTableScanContext(Table table) {
        if (!(table instanceof BaseTable)) {
            return TableScanContext.empty();
        }
        return ImmutableTableScanContext.builder().metricsReporter(((BaseTable) table).reporter()).build();
    }

    public /* bridge */ /* synthetic */ CloseableIterable planTasks() {
        return super.planTasks();
    }
}
