package org.apache.flink.streaming.api.invokable;

import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/StreamInvokable.class */
public abstract class StreamInvokable<IN, OUT> implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
    protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
    protected StreamRecordSerializer<IN> inSerializer;
    protected StreamRecord<IN> reuse;
    protected boolean isMutable;
    protected Collector<OUT> collector;
    protected Function userFunction;
    protected volatile boolean isRunning;

    public StreamInvokable(Function function) {
        this.userFunction = function;
    }

    public void initialize(Collector<OUT> collector, MutableObjectIterator<StreamRecord<IN>> mutableObjectIterator, StreamRecordSerializer<IN> streamRecordSerializer, boolean z) {
        this.collector = collector;
        this.recordIterator = mutableObjectIterator;
        this.inSerializer = streamRecordSerializer;
        if (this.inSerializer != null) {
            this.reuse = streamRecordSerializer.m11createInstance();
        }
        this.isMutable = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetReuse() {
        this.reuse = this.inSerializer.m11createInstance();
    }

    protected abstract void immutableInvoke() throws Exception;

    protected abstract void mutableInvoke() throws Exception;

    protected abstract void callUserFunction() throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public void callUserFunctionAndLogException() {
        try {
            callUserFunction();
        } catch (Exception e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Calling user function failed due to: {}", StringUtils.stringifyException(e));
            }
        }
    }

    public void invoke() throws Exception {
        if (this.isMutable) {
            mutableInvoke();
        } else {
            immutableInvoke();
        }
    }

    public void open(Configuration configuration) throws Exception {
        this.isRunning = true;
        if (this.userFunction instanceof RichFunction) {
            this.userFunction.open(configuration);
        }
    }

    public void close() throws Exception {
        this.isRunning = false;
        if (this.userFunction instanceof RichFunction) {
            this.userFunction.close();
        }
    }
}
