package org.apache.iceberg.spark.actions;

import java.io.IOException;
import java.math.RoundingMode;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ImmutableRewritePositionDeleteFiles;
import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.actions.RewritePositionDeletesCommitManager;
import org.apache.iceberg.actions.RewritePositionDeletesGroup;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.class */
public class RewritePositionDeleteFilesSparkAction extends BaseSnapshotUpdateSparkAction<RewritePositionDeleteFilesSparkAction> implements RewritePositionDeleteFiles {
    private static final Logger LOG = LoggerFactory.getLogger(RewritePositionDeleteFilesSparkAction.class);
    private static final Set<String> VALID_OPTIONS = ImmutableSet.of("max-concurrent-file-group-rewrites", "partial-progress.enabled", "partial-progress.max-commits", "rewrite-job-order");
    private static final RewritePositionDeleteFiles.Result EMPTY_RESULT = ImmutableRewritePositionDeleteFiles.Result.builder().build();
    private final Table table;
    private final SparkBinPackPositionDeletesRewriter rewriter;
    private Expression filter;
    private int maxConcurrentFileGroupRewrites;
    private int maxCommits;
    private boolean partialProgressEnabled;
    private RewriteJobOrder rewriteJobOrder;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction$RewriteExecutionContext.class */
    public static class RewriteExecutionContext {
        private final StructLikeMap<Integer> numGroupsByPartition;
        private final int totalGroupCount;
        private final Map<StructLike, Integer> partitionIndexMap = Maps.newConcurrentMap();
        private final AtomicInteger groupIndex = new AtomicInteger(1);

        RewriteExecutionContext(StructLikeMap<List<List<PositionDeletesScanTask>>> structLikeMap) {
            this.numGroupsByPartition = structLikeMap.transformValues((v0) -> {
                return v0.size();
            });
            this.totalGroupCount = ((Integer) this.numGroupsByPartition.values().stream().reduce((v0, v1) -> {
                return Integer.sum(v0, v1);
            }).orElse(0)).intValue();
        }

        public int currentGlobalIndex() {
            return this.groupIndex.getAndIncrement();
        }

        public int currentPartitionIndex(StructLike structLike) {
            return this.partitionIndexMap.merge(structLike, 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            }).intValue();
        }

        public int groupsInPartition(StructLike structLike) {
            return ((Integer) this.numGroupsByPartition.get(structLike)).intValue();
        }

        public int totalGroupCount() {
            return this.totalGroupCount;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RewritePositionDeleteFilesSparkAction(SparkSession sparkSession, Table table) {
        super(sparkSession);
        this.filter = Expressions.alwaysTrue();
        this.table = table;
        this.rewriter = new SparkBinPackPositionDeletesRewriter(spark(), table);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.spark.actions.BaseSparkAction
    public RewritePositionDeleteFilesSparkAction self() {
        return this;
    }

    /* renamed from: filter, reason: merged with bridge method [inline-methods] */
    public RewritePositionDeleteFilesSparkAction m130filter(Expression expression) {
        this.filter = Expressions.and(this.filter, expression);
        return this;
    }

    /* renamed from: execute, reason: merged with bridge method [inline-methods] */
    public RewritePositionDeleteFiles.Result m131execute() {
        if (this.table.currentSnapshot() == null) {
            LOG.info("Nothing found to rewrite in empty table {}", this.table.name());
            return EMPTY_RESULT;
        }
        validateAndInitOptions();
        StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups = planFileGroups();
        RewriteExecutionContext rewriteExecutionContext = new RewriteExecutionContext(planFileGroups);
        if (rewriteExecutionContext.totalGroupCount() == 0) {
            LOG.info("Nothing found to rewrite in {}", this.table.name());
            return EMPTY_RESULT;
        }
        Stream<RewritePositionDeletesGroup> groupStream = toGroupStream(rewriteExecutionContext, planFileGroups);
        return this.partialProgressEnabled ? doExecuteWithPartialProgress(rewriteExecutionContext, groupStream, commitManager()) : doExecute(rewriteExecutionContext, groupStream, commitManager());
    }

    private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
        Table createMetadataTableInstance = MetadataTableUtils.createMetadataTableInstance(this.table, MetadataTableType.POSITION_DELETES);
        CloseableIterable<PositionDeletesScanTask> planFiles = planFiles(createMetadataTableInstance);
        try {
            return fileGroupsByPartition(groupByPartition(Partitioning.partitionType(createMetadataTableInstance), planFiles));
        } finally {
            try {
                planFiles.close();
            } catch (IOException e) {
                LOG.error("Cannot properly close file iterable while planning for rewrite", e);
            }
        }
    }

    private CloseableIterable<PositionDeletesScanTask> planFiles(Table table) {
        return CloseableIterable.transform(((BatchScan) table.newBatchScan().baseTableFilter(this.filter).ignoreResiduals()).planFiles(), scanTask -> {
            return (PositionDeletesScanTask) scanTask;
        });
    }

    private StructLikeMap<List<PositionDeletesScanTask>> groupByPartition(Types.StructType structType, Iterable<PositionDeletesScanTask> iterable) {
        StructLikeMap<List<PositionDeletesScanTask>> create = StructLikeMap.create(structType);
        for (PositionDeletesScanTask positionDeletesScanTask : iterable) {
            StructLike coercePartition = coercePartition(positionDeletesScanTask, structType);
            List list = (List) create.get(coercePartition);
            if (list == null) {
                list = Lists.newArrayList();
            }
            list.add(positionDeletesScanTask);
            create.put(coercePartition, list);
        }
        return create;
    }

    private StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition(StructLikeMap<List<PositionDeletesScanTask>> structLikeMap) {
        return structLikeMap.transformValues(this::planFileGroups);
    }

    private List<List<PositionDeletesScanTask>> planFileGroups(List<PositionDeletesScanTask> list) {
        return ImmutableList.copyOf(this.rewriter.planFileGroups(list));
    }

    private RewritePositionDeletesGroup rewriteDeleteFiles(RewriteExecutionContext rewriteExecutionContext, RewritePositionDeletesGroup rewritePositionDeletesGroup) {
        String jobDesc = jobDesc(rewritePositionDeletesGroup, rewriteExecutionContext);
        rewritePositionDeletesGroup.setOutputFiles((Set) withJobGroupInfo(newJobGroupInfo("REWRITE-POSITION-DELETES", jobDesc), () -> {
            return this.rewriter.rewrite(rewritePositionDeletesGroup.tasks());
        }));
        LOG.info("Rewrite position deletes ready to be committed - {}", jobDesc);
        return rewritePositionDeletesGroup;
    }

    private ExecutorService rewriteService() {
        return MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(this.maxConcurrentFileGroupRewrites, new ThreadFactoryBuilder().setNameFormat("Rewrite-Position-Delete-Service-%d").build()));
    }

    private RewritePositionDeletesCommitManager commitManager() {
        return new RewritePositionDeletesCommitManager(this.table);
    }

    private RewritePositionDeleteFiles.Result doExecute(RewriteExecutionContext rewriteExecutionContext, Stream<RewritePositionDeletesGroup> stream, RewritePositionDeletesCommitManager rewritePositionDeletesCommitManager) {
        ExecutorService rewriteService = rewriteService();
        ConcurrentLinkedQueue newConcurrentLinkedQueue = Queues.newConcurrentLinkedQueue();
        try {
            try {
                Tasks.foreach(stream).executeWith(rewriteService).stopOnFailure().noRetry().onFailure((rewritePositionDeletesGroup, exc) -> {
                    LOG.warn("Failure during rewrite process for group {}", rewritePositionDeletesGroup.info(), exc);
                }).run(rewritePositionDeletesGroup2 -> {
                    newConcurrentLinkedQueue.add(rewriteDeleteFiles(rewriteExecutionContext, rewritePositionDeletesGroup2));
                });
                rewriteService.shutdown();
                try {
                    rewritePositionDeletesCommitManager.commitOrClean(Sets.newHashSet(newConcurrentLinkedQueue));
                    return ImmutableRewritePositionDeleteFiles.Result.builder().rewriteResults((List) newConcurrentLinkedQueue.stream().map((v0) -> {
                        return v0.asResult();
                    }).collect(Collectors.toList())).build();
                } catch (ValidationException | CommitFailedException e) {
                    throw new RuntimeException(String.format("Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of conflicts, set %s which will break up the rewrite into multiple smaller commits controlled by %s. Separate smaller rewrite commits can succeed independently while any commits that conflict with another Iceberg operation will be ignored. This mode will create additional snapshots in the table history, one for each commit.", "partial-progress.enabled", "partial-progress.max-commits"), e);
                }
            } catch (Exception e2) {
                LOG.error("Cannot complete rewrite, {} is not enabled and one of the file set groups failed to be rewritten. This error occurred during the writing of new files, not during the commit process. This indicates something is wrong that doesn't involve conflicts with other Iceberg operations. Enabling {} may help in this case but the root cause should be investigated. Cleaning up {} groups which finished being written.", new Object[]{"partial-progress.enabled", "partial-progress.enabled", Integer.valueOf(newConcurrentLinkedQueue.size()), e2});
                Tasks.Builder suppressFailureWhenFinished = Tasks.foreach(newConcurrentLinkedQueue).suppressFailureWhenFinished();
                rewritePositionDeletesCommitManager.getClass();
                suppressFailureWhenFinished.run(rewritePositionDeletesCommitManager::abort);
                throw e2;
            }
        } catch (Throwable th) {
            rewriteService.shutdown();
            throw th;
        }
    }

    private RewritePositionDeleteFiles.Result doExecuteWithPartialProgress(RewriteExecutionContext rewriteExecutionContext, Stream<RewritePositionDeletesGroup> stream, RewritePositionDeletesCommitManager rewritePositionDeletesCommitManager) {
        ExecutorService rewriteService = rewriteService();
        RewritePositionDeletesCommitManager.CommitService service = rewritePositionDeletesCommitManager.service(IntMath.divide(rewriteExecutionContext.totalGroupCount(), this.maxCommits, RoundingMode.CEILING));
        service.start();
        Tasks.foreach(stream).suppressFailureWhenFinished().executeWith(rewriteService).noRetry().onFailure((rewritePositionDeletesGroup, exc) -> {
            LOG.error("Failure during rewrite group {}", rewritePositionDeletesGroup.info(), exc);
        }).run(rewritePositionDeletesGroup2 -> {
            service.offer(rewriteDeleteFiles(rewriteExecutionContext, rewritePositionDeletesGroup2));
        });
        rewriteService.shutdown();
        service.close();
        List results = service.results();
        if (results.isEmpty()) {
            LOG.error("{} is true but no rewrite commits succeeded. Check the logs to determine why the individual commits failed. If this is persistent it may help to increase {} which will break the rewrite operation into smaller commits.", "partial-progress.enabled", "partial-progress.max-commits");
        }
        return ImmutableRewritePositionDeleteFiles.Result.builder().rewriteResults((List) results.stream().map((v0) -> {
            return v0.asResult();
        }).collect(Collectors.toList())).build();
    }

    private Stream<RewritePositionDeletesGroup> toGroupStream(RewriteExecutionContext rewriteExecutionContext, Map<StructLike, List<List<PositionDeletesScanTask>>> map) {
        return map.entrySet().stream().filter(entry -> {
            return !((List) entry.getValue()).isEmpty();
        }).flatMap(entry2 -> {
            StructLike structLike = (StructLike) entry2.getKey();
            return ((List) entry2.getValue()).stream().map(list -> {
                return newRewriteGroup(rewriteExecutionContext, structLike, list);
            });
        }).sorted(RewritePositionDeletesGroup.comparator(this.rewriteJobOrder));
    }

    private RewritePositionDeletesGroup newRewriteGroup(RewriteExecutionContext rewriteExecutionContext, StructLike structLike, List<PositionDeletesScanTask> list) {
        int currentGlobalIndex = rewriteExecutionContext.currentGlobalIndex();
        return new RewritePositionDeletesGroup(ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder().globalIndex(currentGlobalIndex).partitionIndex(rewriteExecutionContext.currentPartitionIndex(structLike)).partition(structLike).build(), list);
    }

    private void validateAndInitOptions() {
        HashSet newHashSet = Sets.newHashSet(this.rewriter.validOptions());
        newHashSet.addAll(VALID_OPTIONS);
        HashSet newHashSet2 = Sets.newHashSet(options().keySet());
        newHashSet2.removeAll(newHashSet);
        Preconditions.checkArgument(newHashSet2.isEmpty(), "Cannot use options %s, they are not supported by the action or the rewriter %s", newHashSet2, this.rewriter.description());
        this.rewriter.init(options());
        this.maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(), "max-concurrent-file-group-rewrites", 5);
        this.maxCommits = PropertyUtil.propertyAsInt(options(), "partial-progress.max-commits", 10);
        this.partialProgressEnabled = PropertyUtil.propertyAsBoolean(options(), "partial-progress.enabled", false);
        this.rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(), "rewrite-job-order", REWRITE_JOB_ORDER_DEFAULT));
        Preconditions.checkArgument(this.maxConcurrentFileGroupRewrites >= 1, "Cannot set %s to %s, the value must be positive.", "max-concurrent-file-group-rewrites", this.maxConcurrentFileGroupRewrites);
        Preconditions.checkArgument(!this.partialProgressEnabled || this.maxCommits > 0, "Cannot set %s to %s, the value must be positive when %s is true", "partial-progress.max-commits", Integer.valueOf(this.maxCommits), "partial-progress.enabled");
    }

    private String jobDesc(RewritePositionDeletesGroup rewritePositionDeletesGroup, RewriteExecutionContext rewriteExecutionContext) {
        StructLike partition = rewritePositionDeletesGroup.info().partition();
        return partition.size() > 0 ? String.format("Rewriting %d position delete files (%s, file group %d/%d, %s (%d/%d)) in %s", Integer.valueOf(rewritePositionDeletesGroup.rewrittenDeleteFiles().size()), this.rewriter.description(), Integer.valueOf(rewritePositionDeletesGroup.info().globalIndex()), Integer.valueOf(rewriteExecutionContext.totalGroupCount()), partition, Integer.valueOf(rewritePositionDeletesGroup.info().partitionIndex()), Integer.valueOf(rewriteExecutionContext.groupsInPartition(partition)), this.table.name()) : String.format("Rewriting %d position files (%s, file group %d/%d) in %s", Integer.valueOf(rewritePositionDeletesGroup.rewrittenDeleteFiles().size()), this.rewriter.description(), Integer.valueOf(rewritePositionDeletesGroup.info().globalIndex()), Integer.valueOf(rewriteExecutionContext.totalGroupCount()), this.table.name());
    }

    private StructLike coercePartition(PositionDeletesScanTask positionDeletesScanTask, Types.StructType structType) {
        return PartitionUtil.coercePartition(structType, positionDeletesScanTask.spec(), positionDeletesScanTask.partition());
    }
}
