package org.apache.flink.streaming.api.environment;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.class */
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
    private final String host;
    private final int port;
    private final List<File> jarFiles;

    public RemoteStreamEnvironment(String str, int i, String... strArr) {
        if (str == null) {
            throw new NullPointerException("Host must not be null.");
        }
        if (i < 1 || i >= 65535) {
            throw new IllegalArgumentException("Port out of range");
        }
        this.host = str;
        this.port = i;
        this.jarFiles = new ArrayList();
        for (String str2 : strArr) {
            File file = new File(str2);
            try {
                JobWithJars.checkJarFile(file);
                this.jarFiles.add(file);
            } catch (IOException e) {
                throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public JobExecutionResult execute() throws ProgramInvocationException {
        return executeRemotely(this.streamGraph.getJobGraph());
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public JobExecutionResult execute(String str) throws ProgramInvocationException {
        return executeRemotely(this.streamGraph.getJobGraph(str));
    }

    private JobExecutionResult executeRemotely(JobGraph jobGraph) throws ProgramInvocationException {
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", this.host, Integer.valueOf(this.port));
        }
        Iterator<File> it = this.jarFiles.iterator();
        while (it.hasNext()) {
            jobGraph.addJar(new Path(it.next().getAbsolutePath()));
        }
        try {
            Client client = new Client(new InetSocketAddress(this.host, this.port), jobGraph.getJobConfiguration(), JobWithJars.buildUserCodeClassLoader(this.jarFiles, getClass().getClassLoader()), -1);
            client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
            JobSubmissionResult run = client.run(jobGraph, true);
            if (run instanceof JobExecutionResult) {
                return (JobExecutionResult) run;
            }
            LOG.warn("The Client didn't return a JobExecutionResult");
            return new JobExecutionResult(run.getJobID(), -1L, null);
        } catch (UnknownHostException e) {
            throw new ProgramInvocationException(e.getMessage(), e);
        } catch (ProgramInvocationException e2) {
            throw e2;
        } catch (Exception e3) {
            throw new ProgramInvocationException("The program execution failed" + (e3.getMessage() == null ? "." : ": " + e3.getMessage()), e3);
        }
    }

    public String toString() {
        return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (getParallelism() == -1 ? "default" : Integer.valueOf(getParallelism())) + DefaultExpressionEngine.DEFAULT_INDEX_END;
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }
}
