package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Stream;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/flink/sink/IcebergFilesCommitter.class */
class IcebergFilesCommitter extends AbstractStreamOperator<Void> implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private static final long INITIAL_CHECKPOINT_ID = -1;
    private static final String FLINK_JOB_ID = "flink.job-id";
    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
    private final TableLoader tableLoader;
    private final boolean replacePartitions;
    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
    private transient String flinkJobId;
    private transient Table table;
    private transient ManifestOutputFileFactory manifestOutputFileFactory;
    private transient long maxCommittedCheckpointId;
    private transient int continuousEmptyCheckpoints;
    private transient int maxContinuousEmptyCommits;
    private transient ListState<String> jobIdState;
    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
    private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
    private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>("iceberg-flink-job-id", BasicTypeInfo.STRING_TYPE_INFO);
    private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();

    /* JADX INFO: Access modifiers changed from: package-private */
    public IcebergFilesCommitter(TableLoader tableLoader, boolean z) {
        this.tableLoader = tableLoader;
        this.replacePartitions = z;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString();
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        this.maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(this.table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
        Preconditions.checkArgument(this.maxContinuousEmptyCommits > 0, "flink.max-continuous-empty-commits must be positive");
        this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(this.table, this.flinkJobId, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber());
        this.maxCommittedCheckpointId = -1L;
        this.checkpointsState = stateInitializationContext.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
        this.jobIdState = stateInitializationContext.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
        if (stateInitializationContext.isRestored()) {
            String str = (String) ((Iterable) this.jobIdState.get()).iterator().next();
            Preconditions.checkState(!Strings.isNullOrEmpty(str), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(this.table, str);
            NavigableMap<Long, byte[]> tailMap = Maps.newTreeMap((SortedMap) ((Iterable) this.checkpointsState.get()).iterator().next()).tailMap(Long.valueOf(this.maxCommittedCheckpointId), false);
            if (tailMap.isEmpty()) {
                return;
            }
            commitUpToCheckpoint(tailMap, str, tailMap.lastKey().longValue());
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        long checkpointId = stateSnapshotContext.getCheckpointId();
        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", this.table, Long.valueOf(checkpointId));
        this.dataFilesPerCheckpoint.put(Long.valueOf(checkpointId), writeToManifest(checkpointId));
        this.checkpointsState.clear();
        this.checkpointsState.add(this.dataFilesPerCheckpoint);
        this.jobIdState.clear();
        this.jobIdState.add(this.flinkJobId);
        this.writeResultsOfCurrentCkpt.clear();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        if (j > this.maxCommittedCheckpointId) {
            commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, j);
            this.maxCommittedCheckpointId = j;
        }
    }

    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> navigableMap, String str, long j) throws IOException {
        NavigableMap<Long, byte[]> headMap = navigableMap.headMap(Long.valueOf(j), true);
        ArrayList<ManifestFile> newArrayList = Lists.newArrayList();
        TreeMap newTreeMap = Maps.newTreeMap();
        for (Map.Entry<Long, byte[]> entry : headMap.entrySet()) {
            if (!Arrays.equals(EMPTY_MANIFEST_DATA, entry.getValue())) {
                DeltaManifests deltaManifests = (DeltaManifests) SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, entry.getValue());
                newTreeMap.put(entry.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io()));
                newArrayList.addAll(deltaManifests.manifests());
            }
        }
        int sum = newTreeMap.values().stream().mapToInt(writeResult -> {
            return writeResult.dataFiles().length + writeResult.deleteFiles().length;
        }).sum();
        this.continuousEmptyCheckpoints = sum == 0 ? this.continuousEmptyCheckpoints + 1 : 0;
        if (sum != 0 || this.continuousEmptyCheckpoints % this.maxContinuousEmptyCommits == 0) {
            if (this.replacePartitions) {
                replacePartitions(newTreeMap, str, j);
            } else {
                commitDeltaTxn(newTreeMap, str, j);
            }
            this.continuousEmptyCheckpoints = 0;
        }
        headMap.clear();
        for (ManifestFile manifestFile : newArrayList) {
            try {
                this.table.io().deleteFile(manifestFile.path());
            } catch (Exception e) {
                LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", MoreObjects.toStringHelper(this).add("flinkJobId", str).add("checkpointId", j).add("manifestPath", manifestFile.path()).toString(), e);
            }
        }
    }

    private void replacePartitions(NavigableMap<Long, WriteResult> navigableMap, String str, long j) {
        Preconditions.checkState(navigableMap.values().stream().mapToInt(writeResult -> {
            return writeResult.deleteFiles().length;
        }).sum() == 0, "Cannot overwrite partitions with delete files.");
        ReplacePartitions newReplacePartitions = this.table.newReplacePartitions();
        int i = 0;
        for (WriteResult writeResult2 : navigableMap.values()) {
            Preconditions.checkState(writeResult2.referencedDataFiles().length == 0, "Should have no referenced data files.");
            i += writeResult2.dataFiles().length;
            Stream stream = Arrays.stream(writeResult2.dataFiles());
            Objects.requireNonNull(newReplacePartitions);
            stream.forEach(newReplacePartitions::addFile);
        }
        commitOperation(newReplacePartitions, i, 0, "dynamic partition overwrite", str, j);
    }

    private void commitDeltaTxn(NavigableMap<Long, WriteResult> navigableMap, String str, long j) {
        if (navigableMap.values().stream().mapToInt(writeResult -> {
            return writeResult.deleteFiles().length;
        }).sum() == 0) {
            AppendFiles newAppend = this.table.newAppend();
            int i = 0;
            for (WriteResult writeResult2 : navigableMap.values()) {
                Preconditions.checkState(writeResult2.referencedDataFiles().length == 0, "Should have no referenced data files.");
                i += writeResult2.dataFiles().length;
                Stream stream = Arrays.stream(writeResult2.dataFiles());
                Objects.requireNonNull(newAppend);
                stream.forEach(newAppend::appendFile);
            }
            commitOperation(newAppend, i, 0, DataOperations.APPEND, str, j);
            return;
        }
        for (Map.Entry<Long, WriteResult> entry : navigableMap.entrySet()) {
            WriteResult value = entry.getValue();
            RowDelta newRowDelta = this.table.newRowDelta();
            int length = value.dataFiles().length;
            Stream stream2 = Arrays.stream(value.dataFiles());
            Objects.requireNonNull(newRowDelta);
            stream2.forEach(newRowDelta::addRows);
            int length2 = value.deleteFiles().length;
            Stream stream3 = Arrays.stream(value.deleteFiles());
            Objects.requireNonNull(newRowDelta);
            stream3.forEach(newRowDelta::addDeletes);
            commitOperation(newRowDelta, length, length2, "rowDelta", str, entry.getKey().longValue());
        }
    }

    private void commitOperation(SnapshotUpdate<?> snapshotUpdate, int i, int i2, String str, String str2, long j) {
        LOG.info("Committing {} with {} data files and {} delete files to table {}", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), this.table});
        snapshotUpdate.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(j));
        snapshotUpdate.set(FLINK_JOB_ID, str2);
        long currentTimeMillis = System.currentTimeMillis();
        snapshotUpdate.commit();
        LOG.info("Committed in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void processElement(StreamRecord<WriteResult> streamRecord) {
        this.writeResultsOfCurrentCkpt.add((WriteResult) streamRecord.getValue());
    }

    public void endInput() throws IOException {
        this.dataFilesPerCheckpoint.put(Long.valueOf(TableProperties.MAX_REF_AGE_MS_DEFAULT), writeToManifest(TableProperties.MAX_REF_AGE_MS_DEFAULT));
        this.writeResultsOfCurrentCkpt.clear();
        commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, TableProperties.MAX_REF_AGE_MS_DEFAULT);
    }

    private byte[] writeToManifest(long j) throws IOException {
        if (this.writeResultsOfCurrentCkpt.isEmpty()) {
            return EMPTY_MANIFEST_DATA;
        }
        return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, FlinkManifestUtil.writeCompletedFiles(WriteResult.builder().addAll(this.writeResultsOfCurrentCkpt).build(), () -> {
            return this.manifestOutputFileFactory.create(j);
        }, this.table.spec()));
    }

    public void dispose() throws Exception {
        if (this.tableLoader != null) {
            this.tableLoader.close();
        }
    }

    private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
        return new ListStateDescriptor<>("iceberg-files-committer-state", new SortedMapTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Comparators.forType(Types.LongType.get())));
    }

    static long getMaxCommittedCheckpointId(Table table, String str) {
        String str2;
        Snapshot currentSnapshot = table.currentSnapshot();
        long j = -1;
        while (true) {
            if (currentSnapshot == null) {
                break;
            }
            Map<String, String> summary = currentSnapshot.summary();
            if (str.equals(summary.get(FLINK_JOB_ID)) && (str2 = summary.get(MAX_COMMITTED_CHECKPOINT_ID)) != null) {
                j = Long.parseLong(str2);
                break;
            }
            Long parentId = currentSnapshot.parentId();
            currentSnapshot = parentId != null ? table.snapshot(parentId.longValue()) : null;
        }
        return j;
    }
}
