package org.apache.paimon.flink.sink;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

/* loaded from: input_file:org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.class */
public class BatchWriteGeneratorTagOperator<CommitT, GlobalCommitT> implements OneInputStreamOperator<CommitT, CommitT>, SetupableStreamOperator, BoundedOneInput {
    private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-";
    private static final long serialVersionUID = 1;
    private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;
    protected final FileStoreTable table;

    public BatchWriteGeneratorTagOperator(CommitterOperator<CommitT, GlobalCommitT> committerOperator, FileStoreTable fileStoreTable) {
        this.table = fileStoreTable;
        this.commitOperator = committerOperator;
    }

    public void initializeState(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        this.commitOperator.initializeState(streamTaskStateInitializer);
    }

    public OperatorSnapshotFutures snapshotState(long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        return this.commitOperator.snapshotState(j, j2, checkpointOptions, checkpointStreamFactory);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.commitOperator.notifyCheckpointComplete(j);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        this.commitOperator.notifyCheckpointAborted(j);
    }

    private void createTag() {
        SnapshotManager snapshotManager = this.table.snapshotManager();
        Snapshot latestSnapshot = snapshotManager.latestSnapshot();
        if (latestSnapshot == null) {
            return;
        }
        TagManager tagManager = this.table.tagManager();
        TagDeletion newTagDeletion = this.table.store().newTagDeletion();
        String str = BATCH_WRITE_TAG_PREFIX + LocalDateTime.ofInstant(Instant.ofEpochMilli(latestSnapshot.timeMillis()), ZoneId.systemDefault()).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
        try {
            if (tagManager.tagExists(str)) {
                tagManager.deleteTag(str, newTagDeletion, snapshotManager);
            }
            tagManager.createTag(latestSnapshot, str, this.table.store().createTagCallbacks());
            expireTag();
        } catch (Exception e) {
            if (tagManager.tagExists(str)) {
                tagManager.deleteTag(str, newTagDeletion, snapshotManager);
            }
        }
    }

    private void expireTag() {
        if (this.table.coreOptions().tagNumRetainedMax() != null) {
            SnapshotManager snapshotManager = this.table.snapshotManager();
            if (snapshotManager.latestSnapshot() == null) {
                return;
            }
            TagManager tagManager = this.table.tagManager();
            TagDeletion newTagDeletion = this.table.store().newTagDeletion();
            long tagCount = tagManager.tagCount();
            while (tagCount > r0.intValue()) {
                Iterator<List<String>> it = tagManager.tags().values().iterator();
                while (true) {
                    if (it.hasNext()) {
                        List<String> next = it.next();
                        if (tagCount - next.size() >= r0.intValue()) {
                            tagManager.deleteAllTagsOfOneSnapshot(next, newTagDeletion, snapshotManager);
                            tagCount -= next.size();
                        } else {
                            Iterator<String> it2 = tagManager.sortTagsOfOneSnapshot(next).iterator();
                            while (it2.hasNext()) {
                                tagManager.deleteTag(it2.next(), newTagDeletion, snapshotManager);
                                tagCount--;
                                if (tagCount == r0.intValue()) {
                                    break;
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    public void open() throws Exception {
        this.commitOperator.open();
    }

    public void processElement(StreamRecord<CommitT> streamRecord) throws Exception {
        this.commitOperator.processElement(streamRecord);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        this.commitOperator.processWatermark(watermark);
    }

    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        this.commitOperator.processWatermarkStatus(watermarkStatus);
    }

    public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        this.commitOperator.processLatencyMarker(latencyMarker);
    }

    public void finish() throws Exception {
        createTag();
        this.commitOperator.finish();
    }

    public void close() throws Exception {
        this.commitOperator.close();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        this.commitOperator.prepareSnapshotPreBarrier(j);
    }

    public void setKeyContextElement1(StreamRecord<?> streamRecord) throws Exception {
        this.commitOperator.setKeyContextElement1(streamRecord);
    }

    public void setKeyContextElement2(StreamRecord<?> streamRecord) throws Exception {
        this.commitOperator.setKeyContextElement2(streamRecord);
    }

    public OperatorMetricGroup getMetricGroup() {
        return this.commitOperator.getMetricGroup();
    }

    public OperatorID getOperatorID() {
        return this.commitOperator.getOperatorID();
    }

    public void setCurrentKey(Object obj) {
        this.commitOperator.setCurrentKey(obj);
    }

    public Object getCurrentKey() {
        return this.commitOperator.getCurrentKey();
    }

    public void setKeyContextElement(StreamRecord<CommitT> streamRecord) throws Exception {
        this.commitOperator.setKeyContextElement(streamRecord);
    }

    public void endInput() throws Exception {
        this.commitOperator.endInput();
    }

    public void setup(StreamTask streamTask, StreamConfig streamConfig, Output output) {
        this.commitOperator.setup(streamTask, streamConfig, output);
    }

    public ChainingStrategy getChainingStrategy() {
        return this.commitOperator.getChainingStrategy();
    }

    public void setChainingStrategy(ChainingStrategy chainingStrategy) {
        this.commitOperator.setChainingStrategy(chainingStrategy);
    }
}
