/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.python.api.functions;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.python.util.InterpreterUtils;
import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
import org.apache.flink.util.FlinkException;
import org.python.core.PyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbstractPythonUDF<F extends Function>
extends AbstractRichFunction {
    protected Logger log = LoggerFactory.getLogger(AbstractPythonUDF.class);
    private final byte[] serFun;
    protected transient F fun;

    AbstractPythonUDF(F fun) throws IOException {
        this.serFun = SerializationUtils.serializeObject(fun);
    }

    public void open(Configuration parameters) throws Exception {
        this.fun = (Function)InterpreterUtils.deserializeFunction(this.getRuntimeContext(), this.serFun);
        if (this.fun instanceof RichFunction) {
            try {
                RichFunction rf = (RichFunction)this.fun;
                rf.setRuntimeContext(this.getRuntimeContext());
                rf.open(parameters);
            }
            catch (PyException pe) {
                throw this.createAndLogException(pe);
            }
        }
    }

    public void close() throws Exception {
        if (this.fun instanceof RichFunction) {
            try {
                ((RichFunction)this.fun).close();
            }
            catch (PyException pe) {
                throw this.createAndLogException(pe);
            }
        }
    }

    FlinkException createAndLogException(PyException pe) {
        return AbstractPythonUDF.createAndLogException(pe, this.log);
    }

    static FlinkException createAndLogException(PyException pe, Logger log2) {
        StringWriter sw = new StringWriter();
        try (PrintWriter pw = new PrintWriter(sw);){
            pe.printStackTrace(pw);
        }
        String pythonStackTrace = sw.toString().trim();
        log2.error("Python function failed: " + System.lineSeparator() + pythonStackTrace);
        return new FlinkException("Python function failed: " + pythonStackTrace);
    }
}

