package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/flink/sink/IcebergCommitter.class */
class IcebergCommitter implements Committer<IcebergCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergCommitter.class);
    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
    public static final WriteResult EMPTY_WRITE_RESULT = WriteResult.builder().addDataFiles(Lists.newArrayList()).addDeleteFiles(Lists.newArrayList()).build();

    @VisibleForTesting
    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
    private final String branch;
    private final Map<String, String> snapshotProperties;
    private final boolean replacePartitions;
    private IcebergFilesCommitterMetrics committerMetrics;
    private Table table;
    private final TableLoader tableLoader;
    private int maxContinuousEmptyCommits;
    private ExecutorService workerPool;
    private int continuousEmptyCheckpoints;

    /* JADX INFO: Access modifiers changed from: package-private */
    public IcebergCommitter(TableLoader tableLoader, String str, Map<String, String> map, boolean z, int i, String str2, IcebergFilesCommitterMetrics icebergFilesCommitterMetrics) {
        this.continuousEmptyCheckpoints = 0;
        this.branch = str;
        this.snapshotProperties = map;
        this.replacePartitions = z;
        this.committerMetrics = icebergFilesCommitterMetrics;
        this.tableLoader = tableLoader;
        if (!tableLoader.isOpen()) {
            tableLoader.open();
        }
        this.table = tableLoader.loadTable();
        this.maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(this.table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
        Preconditions.checkArgument(this.maxContinuousEmptyCommits > 0, "flink.max-continuous-empty-commits must be positive");
        this.workerPool = ThreadPools.newWorkerPool("iceberg-committer-pool-" + this.table.name() + "-" + str2, i);
        this.continuousEmptyCheckpoints = 0;
    }

    public void commit(Collection<Committer.CommitRequest<IcebergCommittable>> collection) throws IOException, InterruptedException {
        if (collection.isEmpty()) {
            return;
        }
        TreeMap newTreeMap = Maps.newTreeMap();
        for (Committer.CommitRequest<IcebergCommittable> commitRequest : collection) {
            newTreeMap.put(((IcebergCommittable) commitRequest.getCommittable()).checkpointId(), commitRequest);
        }
        IcebergCommittable icebergCommittable = (IcebergCommittable) ((Committer.CommitRequest) newTreeMap.lastEntry().getValue()).getCommittable();
        long maxCommittedCheckpointId = SinkUtil.getMaxCommittedCheckpointId(this.table, icebergCommittable.jobId(), icebergCommittable.operatorId(), this.branch);
        newTreeMap.headMap(Long.valueOf(maxCommittedCheckpointId), true).values().forEach((v0) -> {
            v0.signalAlreadyCommitted();
        });
        NavigableMap<Long, Committer.CommitRequest<IcebergCommittable>> tailMap = newTreeMap.tailMap(Long.valueOf(maxCommittedCheckpointId), false);
        if (tailMap.isEmpty()) {
            return;
        }
        commitPendingRequests(tailMap, icebergCommittable.jobId(), icebergCommittable.operatorId());
    }

    private void commitPendingRequests(NavigableMap<Long, Committer.CommitRequest<IcebergCommittable>> navigableMap, String str, String str2) throws IOException {
        long longValue = navigableMap.lastKey().longValue();
        ArrayList newArrayList = Lists.newArrayList();
        TreeMap newTreeMap = Maps.newTreeMap();
        for (Map.Entry<Long, Committer.CommitRequest<IcebergCommittable>> entry : navigableMap.entrySet()) {
            if (Arrays.equals(EMPTY_MANIFEST_DATA, ((IcebergCommittable) entry.getValue().getCommittable()).manifest())) {
                newTreeMap.put(entry.getKey(), EMPTY_WRITE_RESULT);
            } else {
                DeltaManifests deltaManifests = (DeltaManifests) SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, ((IcebergCommittable) entry.getValue().getCommittable()).manifest());
                newTreeMap.put(entry.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io(), this.table.specs()));
                newArrayList.addAll(deltaManifests.manifests());
            }
        }
        CommitSummary commitSummary = new CommitSummary(newTreeMap);
        commitPendingResult(newTreeMap, commitSummary, str, str2);
        if (this.committerMetrics != null) {
            this.committerMetrics.updateCommitSummary(commitSummary);
        }
        FlinkManifestUtil.deleteCommittedManifests(this.table, newArrayList, str, longValue);
    }

    private void logCommitSummary(CommitSummary commitSummary, String str) {
        LOG.info("Preparing for commit: {} on table: {} branch: {} with summary: {}.", new Object[]{str, this.table, this.branch, commitSummary});
    }

    private void commitPendingResult(NavigableMap<Long, WriteResult> navigableMap, CommitSummary commitSummary, String str, String str2) {
        long dataFilesCount = commitSummary.dataFilesCount() + commitSummary.deleteFilesCount();
        this.continuousEmptyCheckpoints = dataFilesCount == 0 ? this.continuousEmptyCheckpoints + 1 : 0;
        if (dataFilesCount == 0 && this.continuousEmptyCheckpoints % this.maxContinuousEmptyCommits != 0) {
            LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", Long.valueOf(navigableMap.lastKey().longValue()));
            return;
        }
        if (this.replacePartitions) {
            replacePartitions(navigableMap, commitSummary, str, str2);
        } else {
            commitDeltaTxn(navigableMap, commitSummary, str, str2);
        }
        this.continuousEmptyCheckpoints = 0;
    }

    private void replacePartitions(NavigableMap<Long, WriteResult> navigableMap, CommitSummary commitSummary, String str, String str2) {
        long longValue = navigableMap.lastKey().longValue();
        Preconditions.checkState(commitSummary.deleteFilesCount() == 0, "Cannot overwrite partitions with delete files.");
        ReplacePartitions scanManifestsWith = this.table.newReplacePartitions().scanManifestsWith(this.workerPool);
        for (WriteResult writeResult : navigableMap.values()) {
            Preconditions.checkState(writeResult.referencedDataFiles().length == 0, "Should have no referenced data files.");
            Stream stream = Arrays.stream(writeResult.dataFiles());
            Objects.requireNonNull(scanManifestsWith);
            stream.forEach(scanManifestsWith::addFile);
        }
        logCommitSummary(commitSummary, "dynamic partition overwrite");
        commitOperation(scanManifestsWith, "dynamic partition overwrite", str, str2, longValue);
    }

    private void commitDeltaTxn(NavigableMap<Long, WriteResult> navigableMap, CommitSummary commitSummary, String str, String str2) {
        long longValue = navigableMap.lastKey().longValue();
        if (commitSummary.deleteFilesCount() == 0) {
            AppendFiles scanManifestsWith = this.table.newAppend().scanManifestsWith(this.workerPool);
            for (WriteResult writeResult : navigableMap.values()) {
                Preconditions.checkState(writeResult.referencedDataFiles().length == 0, "Should have no referenced data files for append.");
                Stream stream = Arrays.stream(writeResult.dataFiles());
                Objects.requireNonNull(scanManifestsWith);
                stream.forEach(scanManifestsWith::appendFile);
            }
            logCommitSummary(commitSummary, DataOperations.APPEND);
            commitOperation(scanManifestsWith, DataOperations.APPEND, str, str2, longValue);
            return;
        }
        for (Map.Entry<Long, WriteResult> entry : navigableMap.entrySet()) {
            WriteResult value = entry.getValue();
            RowDelta scanManifestsWith2 = this.table.newRowDelta().scanManifestsWith(this.workerPool);
            Stream stream2 = Arrays.stream(value.dataFiles());
            Objects.requireNonNull(scanManifestsWith2);
            stream2.forEach(scanManifestsWith2::addRows);
            Stream stream3 = Arrays.stream(value.deleteFiles());
            Objects.requireNonNull(scanManifestsWith2);
            stream3.forEach(scanManifestsWith2::addDeletes);
            logCommitSummary(commitSummary, "rowDelta");
            commitOperation(scanManifestsWith2, "rowDelta", str, str2, entry.getKey().longValue());
        }
    }

    private void commitOperation(SnapshotUpdate<?> snapshotUpdate, String str, String str2, String str3, long j) {
        Map<String, String> map = this.snapshotProperties;
        Objects.requireNonNull(snapshotUpdate);
        map.forEach(snapshotUpdate::set);
        snapshotUpdate.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(j));
        snapshotUpdate.set(SinkUtil.FLINK_JOB_ID, str2);
        snapshotUpdate.set(SinkUtil.OPERATOR_ID, str3);
        snapshotUpdate.toBranch2(this.branch);
        long nanoTime = System.nanoTime();
        snapshotUpdate.commit();
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
        LOG.info("Committed {} to table: {}, branch: {}, checkpointId {} in {} ms", new Object[]{str, this.table.name(), this.branch, Long.valueOf(j), Long.valueOf(millis)});
        if (this.committerMetrics != null) {
            this.committerMetrics.commitDuration(millis);
        }
    }

    public void close() throws IOException {
        this.tableLoader.close();
    }
}
