package org.apache.flink.runtime.operators.lifecycle.graph;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.class */
public class TestEventSource extends RichSourceFunction<TestDataElement> implements ParallelSourceFunction<TestDataElement> {
    private static final Logger LOG = LoggerFactory.getLogger(TestEventSource.class);
    private final String operatorID;
    private final TestCommandDispatcher commandQueue;
    private volatile transient Queue<TestCommand> scheduledCommands;
    private volatile transient boolean isRunning = true;
    private final TestEventQueue eventQueue;
    private volatile transient TestCommandDispatcher.CommandExecutor commandExecutor;

    public TestEventSource(String str, TestEventQueue testEventQueue, TestCommandDispatcher testCommandDispatcher) {
        this.operatorID = str;
        this.eventQueue = testEventQueue;
        this.commandQueue = testCommandDispatcher;
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        this.isRunning = true;
        this.scheduledCommands = new LinkedBlockingQueue();
        this.commandExecutor = testCommand -> {
            this.scheduledCommands.add(testCommand);
        };
        this.commandQueue.subscribe(this.commandExecutor, this.operatorID);
        this.eventQueue.add(new OperatorStartedEvent(this.operatorID, getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), getRuntimeContext().getTaskInfo().getAttemptNumber()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void run(SourceFunction.SourceContext<TestDataElement> sourceContext) {
        long j = 0;
        while (true) {
            if (!this.isRunning && this.scheduledCommands.isEmpty()) {
                return;
            }
            TestCommand poll = j == 0 ? null : this.scheduledCommands.poll();
            if (poll == TestCommand.FINISH_SOURCES) {
                ack(poll);
                stop();
            } else {
                if (poll == TestCommand.FAIL) {
                    ack(poll);
                    throw new RuntimeException("requested to fail");
                }
                if (poll != null) {
                    throw new RuntimeException("unknown command " + poll);
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    long j2 = j + 1;
                    j = sourceContext;
                    sourceContext.collect(new TestDataElement(this.operatorID, getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), j2));
                }
            }
        }
    }

    private void ack(TestCommand testCommand) {
        this.eventQueue.add(new TestCommandAckEvent(this.operatorID, getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), getRuntimeContext().getTaskInfo().getAttemptNumber(), testCommand));
    }

    public void cancel() {
        stop();
    }

    private void stop() {
        this.commandQueue.unsubscribe(this.operatorID, this.commandExecutor);
        this.isRunning = false;
        if (this.scheduledCommands.isEmpty()) {
            return;
        }
        LOG.info("Unsubscribed with remaining commands: {}", this.scheduledCommands);
    }
}
