package org.apache.iceberg.actions;

import java.io.Closeable;
import java.util.AbstractSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/actions/RewriteDataFilesCommitManager.class */
public class RewriteDataFilesCommitManager {
    private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesCommitManager.class);
    private final Table table;
    private final long startingSnapshotId;
    private final boolean useStartingSequenceNumber;

    /* loaded from: input_file:org/apache/iceberg/actions/RewriteDataFilesCommitManager$CommitService.class */
    public class CommitService implements Closeable {
        private final ExecutorService committerService;
        private final ConcurrentLinkedQueue<RewriteFileGroup> completedRewrites;
        private final List<RewriteFileGroup> committedRewrites;
        private final int rewritesPerCommit;
        private final AtomicBoolean running = new AtomicBoolean(false);

        CommitService(int i) {
            RewriteDataFilesCommitManager.LOG.info("Creating commit service for table {} with {} groups per commit", RewriteDataFilesCommitManager.this.table, Integer.valueOf(i));
            this.rewritesPerCommit = i;
            this.committerService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Committer-Service").build());
            this.completedRewrites = Queues.newConcurrentLinkedQueue();
            this.committedRewrites = Lists.newArrayList();
        }

        public void start() {
            Preconditions.checkState(this.running.compareAndSet(false, true), "Rewrite Commit service already started");
            RewriteDataFilesCommitManager.LOG.info("Starting commit service for {}", RewriteDataFilesCommitManager.this.table);
            this.committerService.execute(() -> {
                while (true) {
                    if (!this.running.get() && this.completedRewrites.size() <= 0) {
                        return;
                    }
                    try {
                        if (this.completedRewrites.size() == 0) {
                            Thread.sleep(100L);
                        }
                        if (this.completedRewrites.size() >= this.rewritesPerCommit || (!this.running.get() && this.completedRewrites.size() > 0)) {
                            HashSet newHashSetWithExpectedSize = Sets.newHashSetWithExpectedSize(this.rewritesPerCommit);
                            for (int i = 0; i < this.rewritesPerCommit && !this.completedRewrites.isEmpty(); i++) {
                                newHashSetWithExpectedSize.add(this.completedRewrites.poll());
                            }
                            try {
                                RewriteDataFilesCommitManager.this.commitOrClean(newHashSetWithExpectedSize);
                                this.committedRewrites.addAll(newHashSetWithExpectedSize);
                            } catch (Exception e) {
                                RewriteDataFilesCommitManager.LOG.error("Failure during rewrite commit process, partial progress enabled. Ignoring", e);
                            }
                        }
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException("Interrupted while processing commits", e2);
                    }
                }
            });
        }

        public void offer(RewriteFileGroup rewriteFileGroup) {
            RewriteDataFilesCommitManager.LOG.debug("Offered to commit service: {}", rewriteFileGroup);
            Preconditions.checkState(this.running.get(), "Cannot add rewrites to a service which has already been closed");
            this.completedRewrites.add(rewriteFileGroup);
        }

        public List<RewriteFileGroup> results() {
            Preconditions.checkState(this.committerService.isShutdown(), "Cannot get results from a service which has not been closed");
            return this.committedRewrites;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            Preconditions.checkState(this.running.compareAndSet(true, false), "Cannot close already closed RewriteService");
            RewriteDataFilesCommitManager.LOG.info("Closing commit service for {}", RewriteDataFilesCommitManager.this.table);
            this.committerService.shutdown();
            try {
                if (!this.committerService.awaitTermination(10L, TimeUnit.MINUTES)) {
                    RewriteDataFilesCommitManager.LOG.warn("Commit operation did not complete within 10 minutes of the files being written. This may mean that changes were not successfully committed to the the Iceberg table.");
                }
                Preconditions.checkState(this.completedRewrites.isEmpty(), "File groups offered after service was closed, they were not successfully committed.");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Cannot complete commit for rewrite, commit service interrupted", e);
            }
        }
    }

    public RewriteDataFilesCommitManager(Table table) {
        this(table, table.currentSnapshot().snapshotId());
    }

    public RewriteDataFilesCommitManager(Table table, long j) {
        this(table, j, true);
    }

    public RewriteDataFilesCommitManager(Table table, long j, boolean z) {
        this.table = table;
        this.startingSnapshotId = j;
        this.useStartingSequenceNumber = z;
    }

    public void commitFileGroups(Set<RewriteFileGroup> set) {
        AbstractSet newHashSet = Sets.newHashSet();
        AbstractSet newHashSet2 = Sets.newHashSet();
        for (RewriteFileGroup rewriteFileGroup : set) {
            newHashSet = Sets.union(newHashSet, rewriteFileGroup.rewrittenFiles());
            newHashSet2 = Sets.union(newHashSet2, rewriteFileGroup.addedFiles());
        }
        RewriteFiles validateFromSnapshot = this.table.newRewrite().validateFromSnapshot(this.startingSnapshotId);
        if (this.useStartingSequenceNumber) {
            validateFromSnapshot.rewriteFiles(newHashSet, newHashSet2, this.table.snapshot(this.startingSnapshotId).sequenceNumber());
        } else {
            validateFromSnapshot.rewriteFiles(newHashSet, newHashSet2);
        }
        validateFromSnapshot.commit();
    }

    public void abortFileGroup(RewriteFileGroup rewriteFileGroup) {
        Preconditions.checkState(rewriteFileGroup.addedFiles() != null, "Cannot abort a fileGroup that was not rewritten");
        Tasks.foreach(rewriteFileGroup.addedFiles()).noRetry().suppressFailureWhenFinished().onFailure((dataFile, exc) -> {
            LOG.warn("Failed to delete: {}", dataFile.path(), exc);
        }).run(dataFile2 -> {
            this.table.io().deleteFile(dataFile2.path().toString());
        });
    }

    public void commitOrClean(Set<RewriteFileGroup> set) {
        try {
            commitFileGroups(set);
        } catch (CommitStateUnknownException e) {
            LOG.error("Commit state unknown for {}, cannot clean up files because they may have been committed successfully.", set, e);
            throw e;
        } catch (Exception e2) {
            LOG.error("Cannot commit groups {}, attempting to clean up written files", set, e2);
            set.forEach(this::abortFileGroup);
            throw e2;
        }
    }

    public CommitService service(int i) {
        return new CommitService(i);
    }
}
