/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.Objects;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>
implements OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1L;
    protected final F userFunction;
    private transient boolean functionsClosed = false;

    public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = (Function)Objects.requireNonNull(userFunction);
    }

    public F getUserFunction() {
        return this.userFunction;
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        FunctionUtils.setFunctionRuntimeContext(this.userFunction, (RuntimeContext)this.getRuntimeContext());
    }

    @Override
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(this.userFunction, (Configuration)new Configuration());
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.functionsClosed = true;
        FunctionUtils.closeFunction(this.userFunction);
    }

    @Override
    public void dispose() {
        if (!this.functionsClosed) {
            this.functionsClosed = true;
            try {
                FunctionUtils.closeFunction(this.userFunction);
            }
            catch (Throwable t) {
                LOG.error("Exception while closing user function while failing or canceling task", t);
            }
        }
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);
        if (this.userFunction instanceof Checkpointed) {
            Object udfState;
            Checkpointed chkFunction = (Checkpointed)this.userFunction;
            try {
                udfState = chkFunction.snapshotState(checkpointId, timestamp);
            }
            catch (Exception e) {
                throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
            }
            if (udfState != null) {
                try {
                    StateBackend<?> stateBackend = this.getStateBackend();
                    StateHandle handle = stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
                    state.setFunctionState((StateHandle<Serializable>)handle);
                }
                catch (Exception e) {
                    throw new Exception("Failed to add the state snapshot of the function to the checkpoint: " + e.getMessage(), e);
                }
            }
        }
        return state;
    }

    @Override
    public void restoreState(StreamTaskState state) throws Exception {
        super.restoreState(state);
        StateHandle<Serializable> stateHandle = state.getFunctionState();
        if (this.userFunction instanceof Checkpointed && stateHandle != null) {
            Checkpointed chkFunction = (Checkpointed)this.userFunction;
            Serializable functionState = (Serializable)stateHandle.getState(this.getUserCodeClassloader());
            if (functionState != null) {
                try {
                    chkFunction.restoreState(functionState);
                }
                catch (Exception e) {
                    throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
                }
            }
        }
    }

    @Override
    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        super.notifyOfCompletedCheckpoint(checkpointId);
        if (this.userFunction instanceof CheckpointNotifier) {
            ((CheckpointNotifier)this.userFunction).notifyCheckpointComplete(checkpointId);
        }
    }

    @Override
    public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
        if (this.userFunction instanceof OutputTypeConfigurable) {
            OutputTypeConfigurable outputTypeConfigurable = (OutputTypeConfigurable)this.userFunction;
            outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
        }
    }

    public Configuration getUserFunctionParameters() {
        return new Configuration();
    }
}

