package org.apache.flink.runtime.operators.lifecycle.graph;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.InputEndedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorFinishedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.runtime.operators.lifecycle.event.WatermarkReceivedEvent;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/runtime/operators/lifecycle/graph/TwoInputTestStreamOperator.class */
class TwoInputTestStreamOperator extends AbstractStreamOperator<TestDataElement> implements TwoInputStreamOperator<TestDataElement, TestDataElement, TestDataElement>, BoundedMultiInput, ProcessingTimeService.ProcessingTimeCallback {
    private final String operatorID;
    private long lastDataSent;
    private final Map<String, OperatorFinishedEvent.LastVertexDataInfo> lastDataReceived = new HashMap();
    private boolean timerRegistered;
    private final TestEventQueue eventQueue;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TwoInputTestStreamOperator(String str, TestEventQueue testEventQueue) {
        this.operatorID = str;
        this.eventQueue = testEventQueue;
    }

    public void open() throws Exception {
        super.open();
        this.eventQueue.add(new OperatorStartedEvent(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber()));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        collectEvent(new CheckpointStartedEvent(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber(), stateSnapshotContext.getCheckpointId()));
        super.snapshotState(stateSnapshotContext);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        collectEvent(new CheckpointCompletedEvent(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber(), j));
        super.notifyCheckpointComplete(j);
    }

    public void finish() throws Exception {
        collectEvent(new OperatorFinishedEvent(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber(), this.lastDataSent, new OperatorFinishedEvent.LastReceivedVertexDataInfo(this.lastDataReceived)));
        super.finish();
    }

    private void processElement(StreamRecord<TestDataElement> streamRecord) {
        TestDataElement testDataElement = (TestDataElement) streamRecord.getValue();
        this.lastDataReceived.computeIfAbsent(testDataElement.operatorId, str -> {
            return new OperatorFinishedEvent.LastVertexDataInfo();
        }).bySubtask.put(Integer.valueOf(testDataElement.subtaskIndex), Long.valueOf(testDataElement.seq));
        Output output = this.output;
        String str2 = this.operatorID;
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        long j = this.lastDataSent + 1;
        this.lastDataSent = j;
        output.collect(new StreamRecord(new TestDataElement(str2, indexOfThisSubtask, j)));
        if (this.timerRegistered) {
            return;
        }
        registerTimer();
        this.timerRegistered = true;
    }

    public void processWatermark1(Watermark watermark) throws Exception {
        collectEvent(new WatermarkReceivedEvent(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber(), watermark.getTimestamp(), 1));
        super.processWatermark1(watermark);
    }

    public void processWatermark2(Watermark watermark) throws Exception {
        collectEvent(new WatermarkReceivedEvent(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber(), watermark.getTimestamp(), 2));
        super.processWatermark2(watermark);
    }

    public void onProcessingTime(long j) {
        registerTimer();
    }

    private void registerTimer() {
        getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + 1, this);
    }

    public void endInput(int i) throws Exception {
        collectEvent(new InputEndedEvent(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber(), i));
    }

    public void processElement1(StreamRecord streamRecord) {
        processElement(streamRecord);
    }

    public void processElement2(StreamRecord streamRecord) {
        processElement(streamRecord);
    }

    private void collectEvent(TestEvent testEvent) {
        this.eventQueue.add(testEvent);
    }
}
