package org.apache.flink.streaming.api.streamvertex;

import akka.actor.ActorRef;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.messages.CheckpointingMessages;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.invokable.ChainableInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.streaming.io.IndexedReaderIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/streamvertex/StreamVertex.class */
public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>, BarrierTransceiver, OperatorStateCarrier {
    private static int numTasks;
    protected StreamConfig configuration;
    protected int instanceID;
    private InputHandler<IN> inputHandler;
    protected OutputHandler<OUT> outputHandler;
    private StreamingRuntimeContext context;
    private Map<String, OperatorState<?>> states;
    protected ClassLoader userClassLoader;
    private EventListener<TaskEvent> superstepListener;
    private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class);
    private static int numVertices = 0;
    protected volatile boolean isRunning = false;
    private StreamInvokable<IN, OUT> userInvokable = null;

    /* loaded from: input_file:org/apache/flink/streaming/api/streamvertex/StreamVertex$SuperstepEventListener.class */
    private class SuperstepEventListener implements EventListener<TaskEvent> {
        private SuperstepEventListener() {
        }

        public void onEvent(TaskEvent taskEvent) {
            StreamVertex.this.actOnBarrier(((StreamingSuperstep) taskEvent).getId());
        }
    }

    public StreamVertex() {
        numTasks = newVertex();
        this.instanceID = numTasks;
        this.superstepListener = new SuperstepEventListener();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static int newVertex() {
        numVertices++;
        return numVertices;
    }

    public void registerInputOutput() {
        initialize();
        setInputsOutputs();
        setInvokable();
    }

    protected void initialize() {
        this.userClassLoader = getUserCodeClassLoader();
        this.configuration = new StreamConfig(getTaskConfiguration());
        this.states = new HashMap();
        this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
    }

    public void broadcastBarrierFromSource(long j) {
        if (this.isRunning) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received barrier from jobmanager: " + j);
            }
            actOnBarrier(j);
        }
    }

    public void confirmBarrier(long j) throws IOException {
        if (!this.configuration.getStateMonitoring() || this.states.isEmpty()) {
            getEnvironment().getJobManager().tell(new CheckpointingMessages.BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), this.context.getIndexOfThisSubtask(), j), ActorRef.noSender());
        } else {
            getEnvironment().getJobManager().tell(new CheckpointingMessages.StateBarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), Integer.valueOf(this.context.getIndexOfThisSubtask()), Long.valueOf(j), new LocalStateHandle(this.states)), ActorRef.noSender());
        }
    }

    public void setInputsOutputs() {
        this.inputHandler = new InputHandler<>(this);
        this.outputHandler = new OutputHandler<>(this);
    }

    protected void setInvokable() {
        this.userInvokable = (StreamInvokable) this.configuration.getUserInvokable(this.userClassLoader);
        this.userInvokable.setup(this);
    }

    public String getName() {
        return getEnvironment().getTaskName();
    }

    public int getInstanceID() {
        return this.instanceID;
    }

    public StreamingRuntimeContext createRuntimeContext(String str, Map<String, OperatorState<?>> map) {
        return new StreamingRuntimeContext(str, getEnvironment(), getUserCodeClassLoader(), getExecutionConfig(), map);
    }

    public void invoke() throws Exception {
        this.isRunning = true;
        boolean z = false;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Task {} invoked with instance id {}", getName(), Integer.valueOf(getInstanceID()));
        }
        try {
            try {
                this.userInvokable.setRuntimeContext(this.context);
                openOperator();
                this.userInvokable.invoke();
                closeOperator();
                z = false;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Task {} invoke finished instance id {}", getName(), Integer.valueOf(getInstanceID()));
                }
            } catch (Exception e) {
                if (z) {
                    try {
                        closeOperator();
                    } catch (Throwable th) {
                    }
                }
                if (LOG.isErrorEnabled()) {
                    LOG.error("StreamInvokable failed due to: {}", StringUtils.stringifyException(e));
                }
                throw e;
            }
        } finally {
            this.outputHandler.flushOutputs();
            clearBuffers();
            this.isRunning = false;
        }
    }

    protected void openOperator() throws Exception {
        this.userInvokable.open(getTaskConfiguration());
        for (ChainableInvokable<?, ?> chainableInvokable : this.outputHandler.chainedInvokables) {
            chainableInvokable.setRuntimeContext(this.context);
            chainableInvokable.open(getTaskConfiguration());
        }
    }

    protected void closeOperator() throws Exception {
        this.userInvokable.close();
        Iterator<ChainableInvokable<?, ?>> it = this.outputHandler.chainedInvokables.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void clearBuffers() throws IOException {
        if (this.outputHandler != null) {
            this.outputHandler.clearWriters();
        }
        if (this.inputHandler != null) {
            this.inputHandler.clearReaders();
        }
    }

    public void cancel() {
        if (this.userInvokable != null) {
            this.userInvokable.cancel();
        }
    }

    @Override // org.apache.flink.streaming.api.streamvertex.StreamTaskContext
    public StreamConfig getConfig() {
        return this.configuration;
    }

    public <X> MutableObjectIterator<X> getInput(int i) {
        if (i == 0) {
            return this.inputHandler.getInputIter();
        }
        throw new IllegalArgumentException("There is only 1 input");
    }

    public <X> IndexedReaderIterator<X> getIndexedInput(int i) {
        if (i == 0) {
            return this.inputHandler.getInputIter();
        }
        throw new IllegalArgumentException("There is only 1 input");
    }

    public <X> StreamRecordSerializer<X> getInputSerializer(int i) {
        if (i == 0) {
            return this.inputHandler.getInputSerializer();
        }
        throw new IllegalArgumentException("There is only 1 input");
    }

    @Override // org.apache.flink.streaming.api.streamvertex.StreamTaskContext
    public Collector<OUT> getOutputCollector() {
        return this.outputHandler.getCollector();
    }

    public <X, Y> CoReaderIterator<X, Y> getCoReader() {
        throw new IllegalArgumentException("CoReader not available");
    }

    public EventListener<TaskEvent> getSuperstepListener() {
        return this.superstepListener;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void actOnBarrier(long j) {
        if (this.isRunning) {
            try {
                this.outputHandler.broadcastBarrier(j);
                confirmBarrier(j);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Superstep " + j + " processed: " + this);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public String toString() {
        return this.configuration.getOperatorName() + " (" + this.context.getIndexOfThisSubtask() + ")";
    }

    public void injectState(StateHandle stateHandle) {
        this.states.putAll(stateHandle.getState(this.userClassLoader));
    }
}
