package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.class */
public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
    private static final long serialVersionUID = 1;
    protected final F userFunction;

    public AbstractUdfStreamOperator(F f) {
        this.userFunction = f;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final void setup(Output<OUT> output, RuntimeContext runtimeContext) {
        super.setup(output, runtimeContext);
        FunctionUtils.setFunctionRuntimeContext(this.userFunction, runtimeContext);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        FunctionUtils.openFunction(this.userFunction, configuration);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        FunctionUtils.closeFunction(this.userFunction);
    }

    @Override // org.apache.flink.streaming.api.operators.StatefulStreamOperator
    public void restoreInitialState(Serializable serializable) throws Exception {
        if (!(this.userFunction instanceof Checkpointed)) {
            throw new IllegalStateException("Trying to restore state of a non-checkpointed function");
        }
        setStateOnFunction(serializable, this.userFunction);
    }

    @Override // org.apache.flink.streaming.api.operators.StatefulStreamOperator
    public Serializable getStateSnapshotFromFunction(long j, long j2) throws Exception {
        if (this.userFunction instanceof Checkpointed) {
            return ((Checkpointed) this.userFunction).snapshotState(j, j2);
        }
        return null;
    }

    @Override // org.apache.flink.streaming.api.operators.StatefulStreamOperator
    public void confirmCheckpointCompleted(long j, long j2) throws Exception {
        if (this.userFunction instanceof CheckpointCommitter) {
            try {
                ((CheckpointCommitter) this.userFunction).commitCheckpoint(j);
            } catch (Exception e) {
                throw new Exception("Error while confirming checkpoint " + j + " to the stream function", e);
            }
        }
    }

    public F getUserFunction() {
        return this.userFunction;
    }

    private static <T extends Serializable> void setStateOnFunction(Serializable serializable, Function function) {
        ((Checkpointed) function).restoreState(serializable);
    }
}
