package org.apache.iceberg;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.common.AcidMetaDataFile;
import org.apache.iceberg.DeleteFileIndex;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.ScanMetricsUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/BaseDistributedDataScan.class */
abstract class BaseDistributedDataScan extends DataScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> implements BatchScan {
    private static final Logger LOG = LoggerFactory.getLogger(BaseDistributedDataScan.class);
    private static final long LOCAL_PLANNING_MAX_SLOT_SIZE = 134217728;
    private static final int MONITOR_POOL_SIZE = 2;
    private final int localParallelism;
    private final long localPlanningSizeThreshold;

    protected BaseDistributedDataScan(Table table, Schema schema, TableScanContext tableScanContext) {
        super(table, schema, tableScanContext);
        this.localParallelism = PLAN_SCANS_WITH_WORKER_POOL ? ThreadPools.WORKER_THREAD_POOL_SIZE : 1;
        this.localPlanningSizeThreshold = this.localParallelism * 134217728;
    }

    protected abstract int remoteParallelism();

    protected PlanningMode dataPlanningMode() {
        return PlanningMode.fromName(table().properties().getOrDefault(TableProperties.DATA_PLANNING_MODE, TableProperties.PLANNING_MODE_DEFAULT));
    }

    protected boolean shouldCopyRemotelyPlannedDataFiles() {
        return true;
    }

    protected abstract Iterable<CloseableIterable<DataFile>> planDataRemotely(List<ManifestFile> list, boolean z);

    protected PlanningMode deletePlanningMode() {
        return PlanningMode.fromName(table().properties().getOrDefault(TableProperties.DELETE_PLANNING_MODE, TableProperties.PLANNING_MODE_DEFAULT));
    }

    protected abstract DeleteFileIndex planDeletesRemotely(List<ManifestFile> list);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.SnapshotScan
    public CloseableIterable<ScanTask> doPlanFiles() {
        Snapshot snapshot = snapshot();
        List<ManifestFile> findMatchingDeleteManifests = findMatchingDeleteManifests(snapshot);
        boolean z = !findMatchingDeleteManifests.isEmpty() && mayHaveEqualityDeletes(snapshot);
        boolean shouldPlanDeletesLocally = shouldPlanDeletesLocally(findMatchingDeleteManifests, z);
        List<ManifestFile> findMatchingDataManifests = findMatchingDataManifests(snapshot);
        boolean z2 = z || shouldReturnColumnStats();
        boolean shouldPlanDataLocally = shouldPlanDataLocally(findMatchingDataManifests, z2);
        boolean shouldCopyDataFiles = shouldCopyDataFiles(shouldPlanDataLocally, z2);
        if (shouldPlanDataLocally && shouldPlanDeletesLocally) {
            return planFileTasksLocally(findMatchingDataManifests, findMatchingDeleteManifests);
        }
        ExecutorService newMonitorPool = newMonitorPool();
        CompletableFuture<DeleteFileIndex> newDeletesFuture = newDeletesFuture(findMatchingDeleteManifests, shouldPlanDeletesLocally, newMonitorPool);
        CompletableFuture<Iterable<CloseableIterable<DataFile>>> newDataFuture = newDataFuture(findMatchingDataManifests, shouldPlanDataLocally, z2, newMonitorPool);
        try {
            try {
                Iterable<CloseableIterable<ScanTask>> fileTasks = toFileTasks(newDataFuture, newDeletesFuture, shouldCopyDataFiles);
                if (shouldPlanWithExecutor() && (shouldPlanDataLocally || z)) {
                    ParallelIterable parallelIterable = new ParallelIterable(fileTasks, planExecutor());
                    newMonitorPool.shutdown();
                    return parallelIterable;
                }
                CloseableIterable<ScanTask> concat = CloseableIterable.concat(fileTasks);
                newMonitorPool.shutdown();
                return concat;
            } catch (CompletionException e) {
                newDeletesFuture.cancel(true);
                newDataFuture.cancel(true);
                throw new RuntimeException("Failed to plan files", e);
            }
        } catch (Throwable th) {
            newMonitorPool.shutdown();
            throw th;
        }
    }

    @Override // org.apache.iceberg.Scan
    public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
        return TableScanUtil.planTaskGroups(planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
    }

    private List<ManifestFile> findMatchingDataManifests(Snapshot snapshot) {
        List<ManifestFile> dataManifests = snapshot.dataManifests(io());
        scanMetrics().totalDataManifests().increment(dataManifests.size());
        List<ManifestFile> filterManifests = filterManifests(dataManifests);
        scanMetrics().skippedDataManifests().increment(dataManifests.size() - filterManifests.size());
        return filterManifests;
    }

    private List<ManifestFile> findMatchingDeleteManifests(Snapshot snapshot) {
        List<ManifestFile> deleteManifests = snapshot.deleteManifests(io());
        scanMetrics().totalDeleteManifests().increment(deleteManifests.size());
        List<ManifestFile> filterManifests = filterManifests(deleteManifests);
        scanMetrics().skippedDeleteManifests().increment(deleteManifests.size() - filterManifests.size());
        return filterManifests;
    }

    private List<ManifestFile> filterManifests(List<ManifestFile> list) {
        Map specCache = specCache(this::newManifestEvaluator);
        return (List) list.stream().filter(manifestFile -> {
            return manifestFile.hasAddedFiles() || manifestFile.hasExistingFiles();
        }).filter(manifestFile2 -> {
            return ((ManifestEvaluator) specCache.get(Integer.valueOf(manifestFile2.partitionSpecId()))).eval(manifestFile2);
        }).collect(Collectors.toList());
    }

    private boolean shouldPlanDeletesLocally(List<ManifestFile> list, boolean z) {
        PlanningMode deletePlanningMode = deletePlanningMode();
        return (deletePlanningMode == PlanningMode.AUTO && z) || shouldPlanLocally(deletePlanningMode, list);
    }

    private boolean shouldPlanDataLocally(List<ManifestFile> list, boolean z) {
        PlanningMode dataPlanningMode = dataPlanningMode();
        return (dataPlanningMode == PlanningMode.AUTO && z) || shouldPlanLocally(dataPlanningMode, list);
    }

    private boolean shouldPlanLocally(PlanningMode planningMode, List<ManifestFile> list) {
        if (context().planWithCustomizedExecutor()) {
            return true;
        }
        switch (planningMode) {
            case LOCAL:
                return true;
            case DISTRIBUTED:
                return list.isEmpty();
            case AUTO:
                return remoteParallelism() <= this.localParallelism || list.size() <= 2 * this.localParallelism || totalSize(list) <= this.localPlanningSizeThreshold;
            default:
                throw new IllegalArgumentException("Unknown planning mode: " + planningMode);
        }
    }

    private long totalSize(List<ManifestFile> list) {
        return list.stream().mapToLong((v0) -> {
            return v0.length();
        }).sum();
    }

    private boolean shouldCopyDataFiles(boolean z, boolean z2) {
        return z || shouldCopyRemotelyPlannedDataFiles() || (z2 && !shouldReturnColumnStats());
    }

    private CloseableIterable<ScanTask> planFileTasksLocally(List<ManifestFile> list, List<ManifestFile> list2) {
        LOG.info("Planning file tasks locally for table {}", table().name());
        return newManifestGroup(list, list2).planFiles();
    }

    private CompletableFuture<DeleteFileIndex> newDeletesFuture(List<ManifestFile> list, boolean z, ExecutorService executorService) {
        return CompletableFuture.supplyAsync(() -> {
            if (z) {
                LOG.info("Planning deletes locally for table {}", table().name());
                return planDeletesLocally(list);
            }
            LOG.info("Planning deletes remotely for table {}", table().name());
            return planDeletesRemotely(list);
        }, executorService);
    }

    private DeleteFileIndex planDeletesLocally(List<ManifestFile> list) {
        DeleteFileIndex.Builder builderFor = DeleteFileIndex.builderFor(io(), list);
        if (shouldPlanWithExecutor() && list.size() > 1) {
            builderFor.planWith(planExecutor());
        }
        return builderFor.specsById(table().specs()).filterData(filter()).caseSensitive(isCaseSensitive()).scanMetrics(scanMetrics()).build();
    }

    private CompletableFuture<Iterable<CloseableIterable<DataFile>>> newDataFuture(List<ManifestFile> list, boolean z, boolean z2, ExecutorService executorService) {
        return CompletableFuture.supplyAsync(() -> {
            if (z) {
                LOG.info("Planning data locally for table {}", table().name());
                return newManifestGroup((List<ManifestFile>) list, z2).fileGroups();
            }
            LOG.info("Planning data remotely for table {}", table().name());
            return planDataRemotely(list, z2);
        }, executorService);
    }

    private Iterable<CloseableIterable<ScanTask>> toFileTasks(CompletableFuture<Iterable<CloseableIterable<DataFile>>> completableFuture, CompletableFuture<DeleteFileIndex> completableFuture2, boolean z) {
        String json = SchemaParser.toJson(tableSchema());
        Map specCache = specCache(PartitionSpecParser::toJson);
        Map specCache2 = specCache(this::newResidualEvaluator);
        return Iterables.transform(completableFuture.join(), closeableIterable -> {
            return toFileTasks(closeableIterable, completableFuture2, z, json, specCache, specCache2);
        });
    }

    private CloseableIterable<ScanTask> toFileTasks(CloseableIterable<DataFile> closeableIterable, CompletableFuture<DeleteFileIndex> completableFuture, boolean z, String str, Map<Integer, String> map, Map<Integer, ResidualEvaluator> map2) {
        return CloseableIterable.transform(closeableIterable, dataFile -> {
            DeleteFile[] forDataFile = ((DeleteFileIndex) completableFuture.join()).forDataFile(dataFile);
            String str2 = (String) map.get(Integer.valueOf(dataFile.specId()));
            ResidualEvaluator residualEvaluator = (ResidualEvaluator) map2.get(Integer.valueOf(dataFile.specId()));
            ScanMetricsUtil.fileTask(scanMetrics(), dataFile, forDataFile);
            return new BaseFileScanTask(z ? (DataFile) copy(dataFile) : dataFile, forDataFile, str, str2, residualEvaluator);
        });
    }

    private <F extends ContentFile<F>> F copy(F f) {
        return (F) ContentFileUtil.copy(f, shouldReturnColumnStats(), columnsToKeepStats());
    }

    private ManifestEvaluator newManifestEvaluator(PartitionSpec partitionSpec) {
        return ManifestEvaluator.forPartitionFilter(Projections.inclusive(partitionSpec, isCaseSensitive()).project(filter()), partitionSpec, isCaseSensitive());
    }

    private ResidualEvaluator newResidualEvaluator(PartitionSpec partitionSpec) {
        return ResidualEvaluator.of(partitionSpec, residualFilter(), isCaseSensitive());
    }

    private <R> Map<Integer, R> specCache(Function<PartitionSpec, R> function) {
        HashMap newHashMap = Maps.newHashMap();
        table().specs().forEach((num, partitionSpec) -> {
            newHashMap.put(num, function.apply(partitionSpec));
        });
        return newHashMap;
    }

    private boolean mayHaveEqualityDeletes(Snapshot snapshot) {
        String str = snapshot.summary().get("total-equality-deletes");
        return str == null || !str.equals(AcidMetaDataFile.CURRENT_VERSION);
    }

    private ExecutorService newMonitorPool() {
        return ThreadPools.newFixedThreadPool("iceberg-planning-monitor-service", 2);
    }

    @Override // org.apache.iceberg.SnapshotScan, org.apache.iceberg.TableScan
    public /* bridge */ /* synthetic */ BatchScan asOfTime(long j) {
        return (BatchScan) super.asOfTime(j);
    }

    @Override // org.apache.iceberg.SnapshotScan, org.apache.iceberg.TableScan
    public /* bridge */ /* synthetic */ BatchScan useRef(String str) {
        return (BatchScan) super.useRef(str);
    }

    @Override // org.apache.iceberg.SnapshotScan, org.apache.iceberg.TableScan
    public /* bridge */ /* synthetic */ BatchScan useSnapshot(long j) {
        return (BatchScan) super.useSnapshot(j);
    }
}
