package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.Collection;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/flink/sink/IcebergWriteAggregator.class */
class IcebergWriteAggregator extends AbstractStreamOperator<CommittableMessage<IcebergCommittable>> implements OneInputStreamOperator<CommittableMessage<WriteResult>, CommittableMessage<IcebergCommittable>> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergWriteAggregator.class);
    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
    private final Collection<WriteResult> results = Sets.newHashSet();
    private transient ManifestOutputFileFactory icebergManifestOutputFileFactory;
    private transient Table table;
    private final TableLoader tableLoader;

    /* JADX INFO: Access modifiers changed from: package-private */
    public IcebergWriteAggregator(TableLoader tableLoader) {
        this.tableLoader = tableLoader;
    }

    public void open() throws Exception {
        if (!this.tableLoader.isOpen()) {
            this.tableLoader.open();
        }
        String jobID = getContainingTask().getEnvironment().getJobID().toString();
        String operatorID = getOperatorID().toString();
        int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        Preconditions.checkArgument(indexOfThisSubtask == 0, "The subTaskId must be zero in the IcebergWriteAggregator");
        int attemptNumber = getRuntimeContext().getTaskInfo().getAttemptNumber();
        this.table = this.tableLoader.loadTable();
        this.icebergManifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(() -> {
            return this.table;
        }, this.table.properties(), jobID, operatorID, indexOfThisSubtask, attemptNumber);
    }

    public void finish() throws IOException {
        prepareSnapshotPreBarrier(TableProperties.MAX_REF_AGE_MS_DEFAULT);
    }

    public void prepareSnapshotPreBarrier(long j) throws IOException {
        IcebergCommittable icebergCommittable = new IcebergCommittable(writeToManifest(this.results, j), getContainingTask().getEnvironment().getJobID().toString(), getRuntimeContext().getOperatorUniqueID(), j);
        this.output.collect(new StreamRecord(new CommittableSummary(0, 1, Long.valueOf(j), 1, 1, 0)));
        this.output.collect(new StreamRecord(new CommittableWithLineage(icebergCommittable, Long.valueOf(j), 0)));
        LOG.info("Emitted commit message to downstream committer operator");
        this.results.clear();
    }

    public byte[] writeToManifest(Collection<WriteResult> collection, long j) throws IOException {
        if (collection.isEmpty()) {
            return EMPTY_MANIFEST_DATA;
        }
        return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, FlinkManifestUtil.writeCompletedFiles(WriteResult.builder().addAll(collection).build(), () -> {
            return this.icebergManifestOutputFileFactory.create(j);
        }, this.table.spec()));
    }

    public void processElement(StreamRecord<CommittableMessage<WriteResult>> streamRecord) throws Exception {
        if (streamRecord.isRecord() && (streamRecord.getValue() instanceof CommittableWithLineage)) {
            this.results.add((WriteResult) ((CommittableWithLineage) streamRecord.getValue()).getCommittable());
        }
    }
}
