package org.apache.zeppelin.flink;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Console;
import scala.Some;
import scala.collection.JavaConversions;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.AbstractFunction0;
import scala.tools.nsc.Settings;
import scala.tools.nsc.interpreter.IMain;
import scala.tools.nsc.interpreter.Results;
import scala.tools.nsc.interpreter.Results$Incomplete$;
import scala.tools.nsc.interpreter.Results$Success$;
import scala.tools.nsc.settings.MutableSettings;

/* loaded from: input_file:org/apache/zeppelin/flink/FlinkInterpreter.class */
public class FlinkInterpreter extends Interpreter {
    Logger logger;
    private ByteArrayOutputStream out;
    private Configuration flinkConf;
    private LocalFlinkMiniCluster localFlinkCluster;
    private FlinkILoop flinkIloop;
    private Map<String, Object> binder;
    private IMain imain;

    public FlinkInterpreter(Properties properties) {
        super(properties);
        this.logger = LoggerFactory.getLogger(FlinkInterpreter.class);
    }

    public void open() {
        File[] listFiles;
        this.out = new ByteArrayOutputStream();
        this.flinkConf = new Configuration();
        Properties properties = getProperties();
        for (String str : properties.keySet()) {
            this.flinkConf.setString(str, toString(properties.get(str)));
        }
        if (localMode()) {
            startFlinkMiniCluster();
        }
        String[] strArr = new String[0];
        String property = getProperty("zeppelin.interpreter.localRepo");
        if (property != null) {
            File file = new File(property);
            if (file.exists() && (listFiles = file.listFiles()) != null) {
                strArr = new String[listFiles.length];
                for (int i = 0; i < listFiles.length; i++) {
                    if (strArr.length > 0) {
                        strArr[i] = listFiles[i].getAbsolutePath();
                    }
                }
            }
        }
        this.flinkIloop = new FlinkILoop(getHost(), getPort(), this.flinkConf, new Some(strArr), (BufferedReader) null, new PrintWriter(this.out));
        this.flinkIloop.settings_$eq(createSettings());
        this.flinkIloop.createInterpreter();
        this.imain = this.flinkIloop.intp();
        ExecutionEnvironment scalaBenv = this.flinkIloop.scalaBenv();
        StreamExecutionEnvironment scalaSenv = this.flinkIloop.scalaSenv();
        scalaSenv.getConfig().disableSysoutLogging();
        scalaBenv.getConfig().disableSysoutLogging();
        this.imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
        Map map = (Map) getLastObject();
        this.imain.interpret("import scala.tools.nsc.io._");
        this.imain.interpret("import Properties.userHome");
        this.imain.interpret("import scala.compat.Platform.EOL");
        this.imain.interpret("import org.apache.flink.api.scala._");
        this.imain.interpret("import org.apache.flink.api.common.functions._");
        map.put("benv", scalaBenv);
        this.imain.interpret("val benv = _binder.get(\"benv\").asInstanceOf[" + scalaBenv.getClass().getName() + "]");
        map.put("senv", scalaSenv);
        this.imain.interpret("val senv = _binder.get(\"senv\").asInstanceOf[" + scalaSenv.getClass().getName() + "]");
    }

    private boolean localMode() {
        String property = getProperty("host");
        return property == null || property.trim().length() == 0 || property.trim().equals("local");
    }

    private String getHost() {
        return localMode() ? "localhost" : getProperty("host");
    }

    private int getPort() {
        return localMode() ? this.localFlinkCluster.getLeaderRPCPort() : Integer.parseInt(getProperty("port"));
    }

    private Settings createSettings() {
        URL[] classloaderUrls = getClassloaderUrls();
        Settings settings = new Settings();
        MutableSettings.PathSetting classpath = settings.classpath();
        String str = "";
        for (File file : currentClassPath()) {
            if (str.length() > 0) {
                str = str + File.pathSeparator;
            }
            str = str + file.getAbsolutePath();
        }
        if (classloaderUrls != null) {
            for (URL url : classloaderUrls) {
                if (str.length() > 0) {
                    str = str + File.pathSeparator;
                }
                str = str + url.getFile();
            }
        }
        classpath.v_$eq(str);
        settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(classpath);
        settings.explicitParentLoader_$eq(new Some(Thread.currentThread().getContextClassLoader()));
        MutableSettings.BooleanSetting usejavacp = settings.usejavacp();
        usejavacp.v_$eq(true);
        settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(usejavacp);
        MutableSettings.IntSetting maxClassfileName = settings.maxClassfileName();
        maxClassfileName.v_$eq(128);
        settings.scala$tools$nsc$settings$ScalaSettings$_setter_$maxClassfileName_$eq(maxClassfileName);
        return settings;
    }

    private List<File> currentClassPath() {
        List<File> classPath = classPath(Thread.currentThread().getContextClassLoader());
        String[] split = System.getProperty("java.class.path").split(File.pathSeparator);
        if (split != null) {
            for (String str : split) {
                classPath.add(new File(str));
            }
        }
        return classPath;
    }

    private List<File> classPath(ClassLoader classLoader) {
        URL[] uRLs;
        LinkedList linkedList = new LinkedList();
        if (classLoader == null) {
            return linkedList;
        }
        if ((classLoader instanceof URLClassLoader) && (uRLs = ((URLClassLoader) classLoader).getURLs()) != null) {
            for (URL url : uRLs) {
                linkedList.add(new File(url.getFile()));
            }
        }
        return linkedList;
    }

    public Object getLastObject() {
        return this.imain.lastRequest().lineRep().call("$result", JavaConversions.asScalaBuffer(new LinkedList()));
    }

    public void close() {
        this.flinkIloop.closeInterpreter();
        if (localMode()) {
            stopFlinkMiniCluster();
        }
    }

    public InterpreterResult interpret(String str, InterpreterContext interpreterContext) {
        return (str == null || str.trim().length() == 0) ? new InterpreterResult(InterpreterResult.Code.SUCCESS) : interpret(str.split("\n"), interpreterContext);
    }

    public InterpreterResult interpret(String[] strArr, InterpreterContext interpreterContext) {
        final String str;
        String str2;
        final IMain intp = this.flinkIloop.intp();
        String[] strArr2 = new String[strArr.length + 1];
        for (int i = 0; i < strArr.length; i++) {
            strArr2[i] = strArr[i];
        }
        strArr2[strArr.length] = "print(\"\")";
        System.setOut(new PrintStream(this.out));
        this.out.reset();
        InterpreterResult.Code code = null;
        String str3 = "";
        boolean z = false;
        for (int i2 = 0; i2 < strArr2.length; i2++) {
            final String str4 = strArr2[i2];
            try {
                if (i2 + 1 < strArr2.length) {
                    String trim = strArr2[i2 + 1].trim();
                    boolean z2 = false;
                    if (trim.isEmpty() || trim.startsWith("//") || trim.startsWith("}") || trim.startsWith("object")) {
                        z2 = true;
                    } else if (!z && trim.startsWith("/*")) {
                        z = true;
                        z2 = true;
                    } else if (z && trim.lastIndexOf("*/") >= 0) {
                        z = false;
                        z2 = true;
                    } else if (trim.length() > 1 && trim.charAt(0) == '.' && trim.charAt(1) != '.' && trim.charAt(1) != '/') {
                        z2 = true;
                    } else if (z) {
                        z2 = true;
                    }
                    if (z2) {
                        str2 = str3 + str4 + "\n";
                        str3 = str2;
                    }
                }
                code = getResultCode((Results.Result) Console.withOut(System.out, new AbstractFunction0<Results.Result>() { // from class: org.apache.zeppelin.flink.FlinkInterpreter.1
                    /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                    public Results.Result m0apply() {
                        return intp.interpret(str + str4);
                    }
                }));
                if (code == InterpreterResult.Code.ERROR) {
                    return new InterpreterResult(code, this.out.toString());
                }
                str2 = code == InterpreterResult.Code.INCOMPLETE ? str3 + str4 + "\n" : "";
                str3 = str2;
            } catch (Exception e) {
                this.logger.info("Interpreter exception", e);
                return new InterpreterResult(InterpreterResult.Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
            }
            str = str3;
        }
        return code == InterpreterResult.Code.INCOMPLETE ? new InterpreterResult(code, "Incomplete expression") : new InterpreterResult(code, this.out.toString());
    }

    private InterpreterResult.Code getResultCode(Results.Result result) {
        return result instanceof Results$Success$ ? InterpreterResult.Code.SUCCESS : result instanceof Results$Incomplete$ ? InterpreterResult.Code.INCOMPLETE : InterpreterResult.Code.ERROR;
    }

    public void cancel(InterpreterContext interpreterContext) {
        if (localMode()) {
            for (JobID jobID : this.localFlinkCluster.getCurrentlyRunningJobsJava()) {
                this.logger.info("Stop job: " + jobID);
                cancelJobLocalMode(jobID);
            }
        }
    }

    private void cancelJobLocalMode(JobID jobID) {
        FiniteDuration timeout = AkkaUtils.getTimeout(this.localFlinkCluster.configuration());
        this.localFlinkCluster.getLeaderGateway(timeout).ask(new JobManagerMessages.CancelJob(jobID), timeout);
    }

    public Interpreter.FormType getFormType() {
        return Interpreter.FormType.NATIVE;
    }

    public int getProgress(InterpreterContext interpreterContext) {
        return 0;
    }

    public List<InterpreterCompletion> completion(String str, int i, InterpreterContext interpreterContext) {
        return new LinkedList();
    }

    private void startFlinkMiniCluster() {
        this.localFlinkCluster = new LocalFlinkMiniCluster(this.flinkConf, false);
        try {
            this.localFlinkCluster.start(true);
        } catch (Exception e) {
            throw new RuntimeException("Could not start Flink mini cluster.", e);
        }
    }

    private void stopFlinkMiniCluster() {
        if (this.localFlinkCluster != null) {
            this.localFlinkCluster.stop();
            this.localFlinkCluster = null;
        }
    }

    static final String toString(Object obj) {
        return obj instanceof String ? (String) obj : "";
    }
}
