/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.environment;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
public class RemoteStreamEnvironment
extends StreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
    private final String host;
    private final int port;
    private final Configuration clientConfiguration;
    private final List<URL> jarFiles;
    private final List<URL> globalClasspaths;
    private final SavepointRestoreSettings savepointRestoreSettings;

    public RemoteStreamEnvironment(String host, int port, String ... jarFiles) {
        this(host, port, (Configuration)null, jarFiles);
    }

    public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String ... jarFiles) {
        this(host, port, clientConfiguration, jarFiles, (URL[])null);
    }

    public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
        this(host, port, clientConfiguration, jarFiles, globalClasspaths, null);
    }

    @PublicEvolving
    public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths, SavepointRestoreSettings savepointRestoreSettings) {
        if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
            throw new InvalidProgramException("The RemoteEnvironment cannot be used when submitting a program through a client, or running in a TestEnvironment context.");
        }
        if (host == null) {
            throw new NullPointerException("Host must not be null.");
        }
        if (port < 1 || port >= 65535) {
            throw new IllegalArgumentException("Port out of range");
        }
        this.host = host;
        this.port = port;
        this.clientConfiguration = clientConfiguration == null ? new Configuration() : clientConfiguration;
        this.jarFiles = new ArrayList<URL>(jarFiles.length);
        for (String jarFile : jarFiles) {
            try {
                URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
                this.jarFiles.add(jarFileUrl);
                JobWithJars.checkJarFile((URL)jarFileUrl);
            }
            catch (MalformedURLException e) {
                throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
            }
            catch (IOException e) {
                throw new RuntimeException("Problem with jar file " + jarFile, e);
            }
        }
        this.globalClasspaths = globalClasspaths == null ? Collections.emptyList() : Arrays.asList(globalClasspaths);
        this.savepointRestoreSettings = savepointRestoreSettings;
    }

    @PublicEvolving
    public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, List<URL> jarFiles, String host, int port, Configuration clientConfiguration, List<URL> globalClasspaths, String jobName, SavepointRestoreSettings savepointRestoreSettings) throws ProgramInvocationException {
        StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph(jobName);
        return RemoteStreamEnvironment.executeRemotely(streamGraph, streamExecutionEnvironment.getClass().getClassLoader(), streamExecutionEnvironment.getConfig(), jarFiles, host, port, clientConfiguration, globalClasspaths, savepointRestoreSettings);
    }

    private static JobExecutionResult executeRemotely(StreamGraph streamGraph, ClassLoader envClassLoader, ExecutionConfig executionConfig, List<URL> jarFiles, String host, int port, Configuration clientConfiguration, List<URL> globalClasspaths, SavepointRestoreSettings savepointRestoreSettings) throws ProgramInvocationException {
        RestClusterClient client;
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", (Object)host, (Object)port);
        }
        ClassLoader userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, (ClassLoader)envClassLoader, (Configuration)clientConfiguration);
        Configuration configuration = new Configuration();
        configuration.addAll(clientConfiguration);
        configuration.setString(JobManagerOptions.ADDRESS, host);
        configuration.setInteger(JobManagerOptions.PORT, port);
        configuration.setInteger(RestOptions.PORT, port);
        try {
            client = new RestClusterClient(configuration, (Object)"RemoteStreamEnvironment");
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), streamGraph.getJobGraph().getJobID(), (Throwable)e);
        }
        client.setPrintStatusDuringExecution(executionConfig.isSysoutLoggingEnabled());
        if (savepointRestoreSettings == null) {
            savepointRestoreSettings = SavepointRestoreSettings.none();
        }
        try {
            JobExecutionResult e = client.run((FlinkPlan)streamGraph, jarFiles, globalClasspaths, userCodeClassLoader, savepointRestoreSettings).getJobExecutionResult();
            return e;
        }
        catch (ProgramInvocationException e) {
            throw e;
        }
        catch (Exception e) {
            String term = e.getMessage() == null ? "." : ": " + e.getMessage();
            throw new ProgramInvocationException("The program execution failed" + term, streamGraph.getJobGraph().getJobID(), (Throwable)e);
        }
        finally {
            try {
                client.shutdown();
            }
            catch (Exception e) {
                LOG.warn("Could not properly shut down the cluster client.", (Throwable)e);
            }
        }
    }

    @Override
    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        this.transformations.clear();
        return this.executeRemotely(streamGraph, this.jarFiles);
    }

    @Deprecated
    protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
        return RemoteStreamEnvironment.executeRemotely(streamGraph, this.getClass().getClassLoader(), this.getConfig(), jarFiles, this.host, this.port, this.clientConfiguration, this.globalClasspaths, this.savepointRestoreSettings);
    }

    public String toString() {
        return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (this.getParallelism() == -1 ? "default" : Integer.valueOf(this.getParallelism())) + ")";
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public Configuration getClientConfiguration() {
        return this.clientConfiguration;
    }
}

