package org.apache.flink.streaming.api.invokable;

import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
import org.apache.flink.streaming.io.IndexedReaderIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/StreamInvokable.class */
public abstract class StreamInvokable<IN, OUT> implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
    protected StreamTaskContext<OUT> taskContext;
    protected IndexedReaderIterator<StreamRecord<IN>> recordIterator;
    protected StreamRecordSerializer<IN> inSerializer;
    protected TypeSerializer<IN> objectSerializer;
    protected StreamRecord<IN> nextRecord;
    protected IN nextObject;
    protected boolean isMutable;
    public Collector<OUT> collector;
    protected Function userFunction;
    protected volatile boolean isRunning;
    protected ExecutionConfig executionConfig = null;
    private ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/StreamInvokable$ChainingStrategy.class */
    public enum ChainingStrategy {
        ALWAYS,
        NEVER,
        HEAD
    }

    public StreamInvokable(Function function) {
        this.userFunction = function;
    }

    public void setup(StreamTaskContext<OUT> streamTaskContext) {
        this.collector = streamTaskContext.getOutputCollector();
        this.recordIterator = (IndexedReaderIterator<StreamRecord<IN>>) streamTaskContext.getIndexedInput(0);
        this.inSerializer = (StreamRecordSerializer<IN>) streamTaskContext.getInputSerializer(0);
        if (this.inSerializer != null) {
            this.nextRecord = this.inSerializer.m21createInstance();
            this.objectSerializer = this.inSerializer.getObjectSerializer();
        }
        this.taskContext = streamTaskContext;
        this.executionConfig = streamTaskContext.getExecutionConfig();
    }

    public abstract void invoke() throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamRecord<IN> readNext() throws IOException {
        this.nextRecord = this.inSerializer.m21createInstance();
        try {
            this.nextRecord = (StreamRecord) this.recordIterator.next(this.nextRecord);
            try {
                this.nextObject = this.nextRecord.getObject();
            } catch (NullPointerException e) {
            }
            return this.nextRecord;
        } catch (IOException e2) {
            if (this.isRunning) {
                throw new RuntimeException("Could not read next record due to: " + StringUtils.stringifyException(e2));
            }
            return null;
        } catch (IllegalStateException e3) {
            if (this.isRunning) {
                throw new RuntimeException("Could not read next record due to: " + StringUtils.stringifyException(e3));
            }
            return null;
        }
    }

    protected void callUserFunction() throws Exception {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void callUserFunctionAndLogException() {
        try {
            callUserFunction();
        } catch (Exception e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Calling user function failed due to: {}", StringUtils.stringifyException(e));
            }
            throw new RuntimeException(e);
        }
    }

    public void open(Configuration configuration) throws Exception {
        this.isRunning = true;
        FunctionUtils.openFunction(this.userFunction, configuration);
    }

    public void close() {
        this.isRunning = false;
        this.collector.close();
        try {
            FunctionUtils.closeFunction(this.userFunction);
        } catch (Exception e) {
            throw new RuntimeException("Error when closing the function: " + e.getMessage());
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        FunctionUtils.setFunctionRuntimeContext(this.userFunction, runtimeContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public IN copy(IN in) {
        return (IN) this.objectSerializer.copy(in);
    }

    public void setChainingStrategy(ChainingStrategy chainingStrategy) {
        if (chainingStrategy == ChainingStrategy.ALWAYS && !(this instanceof ChainableInvokable)) {
            throw new RuntimeException("Invokable needs to extend ChainableInvokable to be chained");
        }
        this.chainingStrategy = chainingStrategy;
    }

    public ChainingStrategy getChainingStrategy() {
        return this.chainingStrategy;
    }

    public Function getUserFunction() {
        return this.userFunction;
    }
}
