package org.apache.flink.iteration.operator.headprocessor;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
import org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/iteration/operator/headprocessor/RegularHeadOperatorRecordProcessor.class */
public class RegularHeadOperatorRecordProcessor implements HeadOperatorRecordProcessor {
    protected static final Logger LOG = LoggerFactory.getLogger(RegularHeadOperatorRecordProcessor.class);
    private final HeadOperatorRecordProcessor.Context headOperatorContext;
    private final String senderId;
    private final Map<Integer, Long> numFeedbackRecordsPerEpoch = new HashMap();
    private int latestRoundAligned = -1;
    private int latestRoundGloballyAligned = -1;

    public RegularHeadOperatorRecordProcessor(HeadOperatorRecordProcessor.Context context) {
        this.headOperatorContext = context;
        this.senderId = OperatorUtils.getUniqueSenderId(context.getStreamConfig().getOperatorID(), context.getTaskInfo().getIndexOfThisSubtask());
    }

    @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor
    public void initializeState(HeadOperatorState headOperatorState, Iterable<StatePartitionStreamProvider> iterable) {
        Preconditions.checkArgument(headOperatorState != null, "The initialized state should not be null");
        this.numFeedbackRecordsPerEpoch.putAll(headOperatorState.getNumFeedbackRecordsEachRound());
        this.latestRoundAligned = headOperatorState.getLatestRoundAligned();
        this.latestRoundGloballyAligned = headOperatorState.getLatestRoundGloballyAligned();
        if (this.latestRoundAligned == 0 && this.latestRoundGloballyAligned == -1) {
            return;
        }
        for (int i = this.latestRoundGloballyAligned + 1; i <= this.latestRoundAligned; i++) {
            this.headOperatorContext.updateEpochToCoordinator(i, this.numFeedbackRecordsPerEpoch.getOrDefault(Integer.valueOf(i), 0L).longValue());
        }
    }

    @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor
    public void processElement(StreamRecord<IterationRecord<?>> streamRecord) {
        processRecord(streamRecord);
    }

    @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor
    public boolean processFeedbackElement(StreamRecord<IterationRecord<?>> streamRecord) {
        if (((IterationRecord) streamRecord.getValue()).getType() == IterationRecord.Type.RECORD) {
            this.numFeedbackRecordsPerEpoch.compute(Integer.valueOf(((IterationRecord) streamRecord.getValue()).getEpoch()), (num, l) -> {
                return Long.valueOf(l == null ? 1L : l.longValue() + 1);
            });
        }
        processRecord(streamRecord);
        return false;
    }

    @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor
    public boolean onGloballyAligned(GloballyAlignedEvent globallyAlignedEvent) {
        LOG.info("Received global event {}", globallyAlignedEvent);
        Preconditions.checkState((globallyAlignedEvent.getEpoch() == 0 && this.latestRoundGloballyAligned == 0) || globallyAlignedEvent.getEpoch() > this.latestRoundGloballyAligned, String.format("Receive unexpected global aligned event, latest = %d, this one = %d", Integer.valueOf(this.latestRoundGloballyAligned), Integer.valueOf(globallyAlignedEvent.getEpoch())));
        this.headOperatorContext.broadcastOutput(new StreamRecord<>(IterationRecord.newEpochWatermark(globallyAlignedEvent.isTerminated() ? Integer.MAX_VALUE : globallyAlignedEvent.getEpoch(), this.senderId), 0L));
        this.latestRoundGloballyAligned = Math.max(globallyAlignedEvent.getEpoch(), this.latestRoundGloballyAligned);
        return globallyAlignedEvent.isTerminated();
    }

    @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor
    public HeadOperatorState snapshotState() {
        return new HeadOperatorState(new HashMap(this.numFeedbackRecordsPerEpoch), this.latestRoundAligned, this.latestRoundGloballyAligned);
    }

    @VisibleForTesting
    public Map<Integer, Long> getNumFeedbackRecordsPerEpoch() {
        return this.numFeedbackRecordsPerEpoch;
    }

    @VisibleForTesting
    public int getLatestRoundAligned() {
        return this.latestRoundAligned;
    }

    @VisibleForTesting
    public int getLatestRoundGloballyAligned() {
        return this.latestRoundGloballyAligned;
    }

    private void processRecord(StreamRecord<IterationRecord<?>> streamRecord) {
        switch (((IterationRecord) streamRecord.getValue()).getType()) {
            case RECORD:
                this.headOperatorContext.output(streamRecord);
                return;
            case EPOCH_WATERMARK:
                LOG.info("Head Received epoch watermark {}", Integer.valueOf(((IterationRecord) streamRecord.getValue()).getEpoch()));
                boolean z = false;
                if (((IterationRecord) streamRecord.getValue()).getEpoch() != 0) {
                    Preconditions.checkState(((IterationRecord) streamRecord.getValue()).getEpoch() > this.latestRoundAligned, String.format("Unexpected epoch watermark: latest = %d, this one = %d", Integer.valueOf(this.latestRoundAligned), Integer.valueOf(((IterationRecord) streamRecord.getValue()).getEpoch())));
                    this.headOperatorContext.updateEpochToCoordinator(((IterationRecord) streamRecord.getValue()).getEpoch(), this.numFeedbackRecordsPerEpoch.getOrDefault(Integer.valueOf(((IterationRecord) streamRecord.getValue()).getEpoch()), 0L).longValue());
                } else if (this.latestRoundAligned <= 0) {
                    z = true;
                }
                if (z) {
                    this.headOperatorContext.updateEpochToCoordinator(((IterationRecord) streamRecord.getValue()).getEpoch(), this.numFeedbackRecordsPerEpoch.getOrDefault(Integer.valueOf(((IterationRecord) streamRecord.getValue()).getEpoch()), 0L).longValue());
                }
                this.latestRoundAligned = Math.max(((IterationRecord) streamRecord.getValue()).getEpoch(), this.latestRoundAligned);
                return;
            default:
                return;
        }
    }
}
