/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle.graph;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.runtime.operators.lifecycle.graph.TestDataElement;
import org.apache.flink.streaming.api.functions.source.legacy.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestEventSource
extends RichSourceFunction<TestDataElement>
implements ParallelSourceFunction<TestDataElement> {
    private static final Logger LOG = LoggerFactory.getLogger(TestEventSource.class);
    private final String operatorID;
    private final TestCommandDispatcher commandQueue;
    private volatile transient Queue<TestCommand> scheduledCommands;
    private volatile transient boolean isRunning = true;
    private final TestEventQueue eventQueue;
    private volatile transient TestCommandDispatcher.CommandExecutor commandExecutor;

    public TestEventSource(String operatorID, TestEventQueue eventQueue, TestCommandDispatcher commandQueue) {
        this.operatorID = operatorID;
        this.eventQueue = eventQueue;
        this.commandQueue = commandQueue;
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        this.isRunning = true;
        this.scheduledCommands = new LinkedBlockingQueue<TestCommand>();
        this.commandExecutor = cmd -> this.scheduledCommands.add(cmd);
        this.commandQueue.subscribe(this.commandExecutor, this.operatorID);
        this.eventQueue.add(new OperatorStartedEvent(this.operatorID, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<TestDataElement> ctx) {
        long lastSent = 0L;
        while (this.isRunning || !this.scheduledCommands.isEmpty()) {
            TestCommand cmd;
            TestCommand testCommand = cmd = lastSent == 0L ? null : this.scheduledCommands.poll();
            if (cmd == TestCommand.FINISH_SOURCES) {
                this.ack(cmd);
                this.stop();
                continue;
            }
            if (cmd == TestCommand.FAIL) {
                this.ack(cmd);
                throw new RuntimeException("requested to fail");
            }
            if (cmd == null) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)new TestDataElement(this.operatorID, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), ++lastSent));
                    continue;
                }
            }
            throw new RuntimeException("unknown command " + cmd);
        }
    }

    private void ack(TestCommand cmd) {
        this.eventQueue.add(new TestCommandAckEvent(this.operatorID, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber(), cmd));
    }

    public void cancel() {
        this.stop();
    }

    private void stop() {
        this.commandQueue.unsubscribe(this.operatorID, this.commandExecutor);
        this.isRunning = false;
        if (!this.scheduledCommands.isEmpty()) {
            LOG.info("Unsubscribed with remaining commands: {}", this.scheduledCommands);
        }
    }
}

