package org.apache.flink.api.scala;

import java.io.BufferedReader;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.scala.FlinkShell;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import scala.Enumeration;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.sys.package$;
import scala.tools.nsc.Settings;
import scopt.OptionDef;
import scopt.OptionParser;
import scopt.Read$;

/* compiled from: FlinkShell.scala */
/* loaded from: input_file:org/apache/flink/api/scala/FlinkShell$.class */
public final class FlinkShell$ {
    public static final FlinkShell$ MODULE$ = null;
    private Option<BufferedReader> bufferedReader;

    static {
        new FlinkShell$();
    }

    public Option<BufferedReader> bufferedReader() {
        return this.bufferedReader;
    }

    public void bufferedReader_$eq(Option<BufferedReader> option) {
        this.bufferedReader = option;
    }

    public void main(String[] strArr) {
        Some parse = new OptionParser<FlinkShell.Config>() { // from class: org.apache.flink.api.scala.FlinkShell$$anon$1
            {
                head(Predef$.MODULE$.wrapRefArray(new String[]{"Flink Scala Shell"}));
                cmd("local").action(new FlinkShell$$anon$1$$anonfun$1(this)).text("Starts Flink scala shell with a local Flink cluster").children(Predef$.MODULE$.wrapRefArray(new OptionDef[]{opt("addclasspath", Read$.MODULE$.stringRead()).abbr("a").valueName("<path/to/jar>").action(new FlinkShell$$anon$1$$anonfun$2(this)).text("Specifies additional jars to be used in Flink")}));
                cmd("remote").action(new FlinkShell$$anon$1$$anonfun$3(this)).text("Starts Flink scala shell connecting to a remote cluster").children(Predef$.MODULE$.wrapRefArray(new OptionDef[]{arg("<host>", Read$.MODULE$.stringRead()).action(new FlinkShell$$anon$1$$anonfun$4(this)).text("Remote host name as string"), arg("<port>", Read$.MODULE$.intRead()).action(new FlinkShell$$anon$1$$anonfun$5(this)).text("Remote port as integer\n"), opt("addclasspath", Read$.MODULE$.stringRead()).abbr("a").valueName("<path/to/jar>").action(new FlinkShell$$anon$1$$anonfun$6(this)).text("Specifies additional jars to be used in Flink")}));
                cmd("yarn").action(new FlinkShell$$anon$1$$anonfun$7(this)).text("Starts Flink scala shell connecting to a yarn cluster").children(Predef$.MODULE$.wrapRefArray(new OptionDef[]{opt("jobManagerMemory", Read$.MODULE$.stringRead()).abbr("jm").valueName("arg").action(new FlinkShell$$anon$1$$anonfun$8(this)).text("Memory for JobManager container"), opt("name", Read$.MODULE$.stringRead()).abbr("nm").action(new FlinkShell$$anon$1$$anonfun$9(this)).text("Set a custom name for the application on YARN"), opt("queue", Read$.MODULE$.stringRead()).abbr("qu").valueName("<arg>").action(new FlinkShell$$anon$1$$anonfun$10(this)).text("Specifies YARN queue"), opt("slots", Read$.MODULE$.intRead()).abbr("s").valueName("<arg>").action(new FlinkShell$$anon$1$$anonfun$11(this)).text("Number of slots per TaskManager"), opt("taskManagerMemory", Read$.MODULE$.stringRead()).abbr("tm").valueName("<arg>").action(new FlinkShell$$anon$1$$anonfun$12(this)).text("Memory per TaskManager container"), opt("addclasspath", Read$.MODULE$.stringRead()).abbr("a").valueName("<path/to/jar>").action(new FlinkShell$$anon$1$$anonfun$13(this)).text("Specifies additional jars to be used in Flink")}));
                opt("configDir", Read$.MODULE$.stringRead()).optional().action(new FlinkShell$$anon$1$$anonfun$14(this)).text("The configuration directory.");
                help("help").abbr("h").text("Prints this usage text");
            }
        }.parse(Predef$.MODULE$.wrapRefArray(strArr), new FlinkShell.Config(FlinkShell$Config$.MODULE$.apply$default$1(), FlinkShell$Config$.MODULE$.apply$default$2(), FlinkShell$Config$.MODULE$.apply$default$3(), FlinkShell$Config$.MODULE$.apply$default$4(), FlinkShell$Config$.MODULE$.apply$default$5(), FlinkShell$Config$.MODULE$.apply$default$6()));
        if (parse instanceof Some) {
            startShell((FlinkShell.Config) parse.x());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            Predef$.MODULE$.println("Could not parse program arguments");
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    @Internal
    public FlinkShell.YarnConfig ensureYarnConfig(FlinkShell.Config config) {
        FlinkShell.YarnConfig yarnConfig;
        Some yarnConfig2 = config.yarnConfig();
        if (yarnConfig2 instanceof Some) {
            yarnConfig = (FlinkShell.YarnConfig) yarnConfig2.x();
        } else {
            if (!None$.MODULE$.equals(yarnConfig2)) {
                throw new MatchError(yarnConfig2);
            }
            yarnConfig = new FlinkShell.YarnConfig(FlinkShell$YarnConfig$.MODULE$.apply$default$1(), FlinkShell$YarnConfig$.MODULE$.apply$default$2(), FlinkShell$YarnConfig$.MODULE$.apply$default$3(), FlinkShell$YarnConfig$.MODULE$.apply$default$4(), FlinkShell$YarnConfig$.MODULE$.apply$default$5());
        }
        return yarnConfig;
    }

    private String getConfigDir(FlinkShell.Config config) {
        String configurationDirectoryFromEnv;
        Some configDir = config.configDir();
        if (configDir instanceof Some) {
            configurationDirectoryFromEnv = (String) configDir.x();
        } else {
            if (!None$.MODULE$.equals(configDir)) {
                throw new MatchError(configDir);
            }
            configurationDirectoryFromEnv = CliFrontend.getConfigurationDirectoryFromEnv();
        }
        return configurationDirectoryFromEnv;
    }

    private Configuration getGlobalConfig(FlinkShell.Config config) {
        return GlobalConfiguration.loadConfiguration(new File(getConfigDir(config)).getAbsolutePath());
    }

    public void startShell(FlinkShell.Config config) {
        Predef$.MODULE$.println("Starting Flink Shell:");
        Tuple2 liftedTree1$1 = liftedTree1$1(config, getGlobalConfig(config));
        if (liftedTree1$1 == null) {
            throw new MatchError(liftedTree1$1);
        }
        Tuple2 tuple2 = new Tuple2((FlinkILoop) liftedTree1$1._1(), (Option) liftedTree1$1._2());
        FlinkILoop flinkILoop = (FlinkILoop) tuple2._1();
        Some some = (Option) tuple2._2();
        Settings settings = new Settings();
        settings.usejavacp().value_$eq(BoxesRunTime.boxToBoolean(true));
        settings.Yreplsync().value_$eq(BoxesRunTime.boxToBoolean(true));
        try {
            flinkILoop.process(settings);
            Predef$.MODULE$.println(" good bye ..");
        } finally {
            flinkILoop.closeInterpreter();
            if (some instanceof Some) {
                ClusterClient clusterClient = (ClusterClient) some.x();
                clusterClient.shutDownCluster();
                clusterClient.close();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
    }

    @Internal
    public Tuple2<Configuration, Option<ClusterClient<?>>> fetchConnectionInfo(FlinkShell.Config config, Configuration configuration) {
        Tuple2<Configuration, Some<MiniClusterClient>> createYarnClusterIfNeededAndGetConfig;
        Enumeration.Value executionMode = config.executionMode();
        Enumeration.Value LOCAL = FlinkShell$ExecutionMode$.MODULE$.LOCAL();
        if (LOCAL != null ? !LOCAL.equals(executionMode) : executionMode != null) {
            Enumeration.Value REMOTE = FlinkShell$ExecutionMode$.MODULE$.REMOTE();
            if (REMOTE != null ? !REMOTE.equals(executionMode) : executionMode != null) {
                Enumeration.Value YARN = FlinkShell$ExecutionMode$.MODULE$.YARN();
                if (YARN != null ? !YARN.equals(executionMode) : executionMode != null) {
                    Enumeration.Value UNDEFINED = FlinkShell$ExecutionMode$.MODULE$.UNDEFINED();
                    if (UNDEFINED != null ? !UNDEFINED.equals(executionMode) : executionMode != null) {
                        throw new MatchError(executionMode);
                    }
                    throw new IllegalArgumentException("please specify execution mode:\n[local | remote <host> <port> | yarn]");
                }
                createYarnClusterIfNeededAndGetConfig = createYarnClusterIfNeededAndGetConfig(config, configuration);
            } else {
                createYarnClusterIfNeededAndGetConfig = createRemoteConfig(config, configuration);
            }
        } else {
            createYarnClusterIfNeededAndGetConfig = createLocalClusterAndConfig(configuration);
        }
        return createYarnClusterIfNeededAndGetConfig;
    }

    private Tuple2<Configuration, Option<ClusterClient<Nothing$>>> createYarnClusterIfNeededAndGetConfig(FlinkShell.Config config, Configuration configuration) {
        Tuple2<Configuration, Some<ClusterClient<Nothing$>>> tuple2;
        Tuple2<Configuration, None$> fetchDeployedYarnClusterInfo;
        configuration.setBoolean(DeploymentOptions.ATTACHED, true);
        Option<FlinkShell.YarnConfig> yarnConfig = config.yarnConfig();
        if (yarnConfig instanceof Some) {
            tuple2 = deployNewYarnCluster(config, configuration);
        } else {
            if (!None$.MODULE$.equals(yarnConfig)) {
                throw new MatchError(yarnConfig);
            }
            tuple2 = new Tuple2<>(configuration, None$.MODULE$);
        }
        Tuple2<Configuration, Some<ClusterClient<Nothing$>>> tuple22 = tuple2;
        if (tuple22 == null) {
            throw new MatchError(tuple22);
        }
        Tuple2 tuple23 = new Tuple2((Configuration) tuple22._1(), (Option) tuple22._2());
        Configuration configuration2 = (Configuration) tuple23._1();
        Option option = (Option) tuple23._2();
        if (option instanceof Some) {
            fetchDeployedYarnClusterInfo = fetchDeployedYarnClusterInfo(config, configuration2, "yarn-cluster");
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            fetchDeployedYarnClusterInfo = fetchDeployedYarnClusterInfo(config, configuration2, "default");
        }
        Tuple2<Configuration, None$> tuple24 = fetchDeployedYarnClusterInfo;
        if (tuple24 == null) {
            throw new MatchError(tuple24);
        }
        Configuration configuration3 = (Configuration) tuple24._1();
        Predef$.MODULE$.println(new StringBuilder().append("Configuration: ").append(configuration3).toString());
        return new Tuple2<>(configuration3, option);
    }

    private Tuple2<Configuration, Some<ClusterClient<Nothing$>>> deployNewYarnCluster(FlinkShell.Config config, Configuration configuration) {
        Configuration configuration2 = new Configuration(configuration);
        String[] parseArgList = parseArgList(config, "yarn-cluster");
        CliFrontend cliFrontend = new CliFrontend(configuration2, CliFrontend.loadCustomCommandLines(configuration2, (String) config.configDir().getOrElse(new FlinkShell$$anonfun$15())));
        CommandLine parse = CliFrontendParser.parse(CliFrontendParser.mergeOptions(CliFrontendParser.getRunCommandOptions(), cliFrontend.getCustomCommandLineOptions()), parseArgList, true);
        Configuration applyCommandLineOptionsToConfiguration = cliFrontend.validateAndGetActiveCommandLine(parse).applyCommandLineOptionsToConfiguration(parse);
        ClusterClientFactory clusterClientFactory = new DefaultClusterClientServiceLoader().getClusterClientFactory(applyCommandLineOptionsToConfiguration);
        ClusterDescriptor createClusterDescriptor = clusterClientFactory.createClusterDescriptor(applyCommandLineOptionsToConfiguration);
        try {
            ClusterClient clusterClient = createClusterDescriptor.deploySessionCluster(clusterClientFactory.getClusterSpecification(applyCommandLineOptionsToConfiguration)).getClusterClient();
            applyCommandLineOptionsToConfiguration.set(DeploymentOptions.TARGET, "yarn-session");
            createClusterDescriptor.close();
            return new Tuple2<>(applyCommandLineOptionsToConfiguration, new Some(clusterClient));
        } catch (Throwable th) {
            applyCommandLineOptionsToConfiguration.set(DeploymentOptions.TARGET, "yarn-session");
            createClusterDescriptor.close();
            throw th;
        }
    }

    private Tuple2<Configuration, None$> fetchDeployedYarnClusterInfo(FlinkShell.Config config, Configuration configuration, String str) {
        Configuration configuration2 = new Configuration(configuration);
        String[] parseArgList = parseArgList(config, str);
        CliFrontend cliFrontend = new CliFrontend(configuration2, CliFrontend.loadCustomCommandLines(configuration2, (String) config.configDir().getOrElse(new FlinkShell$$anonfun$16())));
        CommandLine parse = CliFrontendParser.parse(CliFrontendParser.mergeOptions(CliFrontendParser.getRunCommandOptions(), cliFrontend.getCustomCommandLineOptions()), parseArgList, true);
        return new Tuple2<>(cliFrontend.validateAndGetActiveCommandLine(parse).applyCommandLineOptionsToConfiguration(parse), None$.MODULE$);
    }

    public String[] parseArgList(FlinkShell.Config config, String str) {
        String[] strArr;
        ArrayBuffer apply = (str != null ? !str.equals("default") : "default" != 0) ? ArrayBuffer$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"-m", str})) : (ArrayBuffer) ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        Some yarnConfig = config.yarnConfig();
        if (yarnConfig instanceof Some) {
            FlinkShell.YarnConfig yarnConfig2 = (FlinkShell.YarnConfig) yarnConfig.x();
            yarnConfig2.jobManagerMemory().foreach(new FlinkShell$$anonfun$parseArgList$1(apply));
            yarnConfig2.taskManagerMemory().foreach(new FlinkShell$$anonfun$parseArgList$2(apply));
            yarnConfig2.name().foreach(new FlinkShell$$anonfun$parseArgList$3(apply));
            yarnConfig2.queue().foreach(new FlinkShell$$anonfun$parseArgList$4(apply));
            yarnConfig2.slots().foreach(new FlinkShell$$anonfun$parseArgList$5(apply));
            strArr = (String[]) apply.toArray(ClassTag$.MODULE$.apply(String.class));
        } else {
            if (!None$.MODULE$.equals(yarnConfig)) {
                throw new MatchError(yarnConfig);
            }
            strArr = (String[]) apply.toArray(ClassTag$.MODULE$.apply(String.class));
        }
        return strArr;
    }

    private Tuple2<Configuration, None$> createRemoteConfig(FlinkShell.Config config, Configuration configuration) {
        if (config.host().isEmpty() || config.port().isEmpty()) {
            throw new IllegalArgumentException("<host> or <port> is not specified!");
        }
        Configuration configuration2 = new Configuration(configuration);
        setJobManagerInfoToConfig(configuration2, (String) config.host().get(), Predef$.MODULE$.int2Integer(BoxesRunTime.unboxToInt(config.port().get())));
        configuration2.set(DeploymentOptions.TARGET, "remote");
        configuration2.setBoolean(DeploymentOptions.ATTACHED, true);
        return new Tuple2<>(configuration2, None$.MODULE$);
    }

    private Tuple2<Configuration, Some<MiniClusterClient>> createLocalClusterAndConfig(Configuration configuration) {
        Configuration configuration2 = new Configuration(configuration);
        configuration2.setInteger(JobManagerOptions.PORT, 0);
        MiniCluster createLocalCluster = createLocalCluster(configuration2);
        int port = ((URI) createLocalCluster.getRestAddress().get()).getPort();
        setJobManagerInfoToConfig(configuration2, "localhost", Predef$.MODULE$.int2Integer(port));
        configuration2.set(DeploymentOptions.TARGET, "remote");
        configuration2.setBoolean(DeploymentOptions.ATTACHED, true);
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\\nStarting local Flink cluster (host: localhost, port: ", ").\\n"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(port)})));
        return new Tuple2<>(configuration2, new Some(new MiniClusterClient(configuration2, createLocalCluster)));
    }

    private MiniCluster createLocalCluster(Configuration configuration) {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS)).setNumTaskManagers(configuration.getInteger("local.number-taskmanager", 1)).build());
        miniCluster.start();
        return miniCluster;
    }

    private void setJobManagerInfoToConfig(Configuration configuration, String str, Integer num) {
        configuration.setString(JobManagerOptions.ADDRESS, str);
        configuration.setInteger(JobManagerOptions.PORT, Predef$.MODULE$.Integer2int(num));
        configuration.setString(RestOptions.ADDRESS, str);
        configuration.setInteger(RestOptions.PORT, Predef$.MODULE$.Integer2int(num));
    }

    private final Tuple2 liftedTree1$1(FlinkShell.Config config, Configuration configuration) {
        FlinkILoop flinkILoop;
        try {
            Tuple2<Configuration, Option<ClusterClient<?>>> fetchConnectionInfo = fetchConnectionInfo(config, configuration);
            if (fetchConnectionInfo == null) {
                throw new MatchError(fetchConnectionInfo);
            }
            Tuple2 tuple2 = new Tuple2((Configuration) fetchConnectionInfo._1(), (Option) fetchConnectionInfo._2());
            Configuration configuration2 = (Configuration) tuple2._1();
            Option option = (Option) tuple2._2();
            Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\\nConnecting to Flink cluster (host: ", ", port: ", ").\\n"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{configuration2.getString(JobManagerOptions.ADDRESS), BoxesRunTime.boxToInteger(configuration2.getInteger(JobManagerOptions.PORT))})));
            Some bufferedReader = bufferedReader();
            if (bufferedReader instanceof Some) {
                flinkILoop = new FlinkILoop(configuration2, config.externalJars(), (BufferedReader) bufferedReader.x(), new PrintWriter(new StringWriter()));
            } else {
                if (!None$.MODULE$.equals(bufferedReader)) {
                    throw new MatchError(bufferedReader);
                }
                flinkILoop = new FlinkILoop(configuration2, config.externalJars());
            }
            return new Tuple2(flinkILoop, option);
        } catch (IllegalArgumentException e) {
            Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{e.getMessage()})));
            throw package$.MODULE$.exit();
        }
    }

    private FlinkShell$() {
        MODULE$ = this;
        this.bufferedReader = None$.MODULE$;
    }
}
