package org.apache.flink.client.program;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/client/program/Client.class */
public class Client {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    private final Configuration configuration;
    private final InetSocketAddress jobManagerAddress;
    private final Optimizer compiler;
    private final ClassLoader userCodeClassLoader;
    private boolean printStatusDuringExecution;
    private int maxSlots;
    private JobID lastJobId;

    /* loaded from: input_file:org/apache/flink/client/program/Client$OptimizerPlanEnvironment.class */
    public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
        private final Optimizer compiler;
        private FlinkPlan optimizerPlan;

        private OptimizerPlanEnvironment(Optimizer optimizer) {
            this.compiler = optimizer;
        }

        public JobExecutionResult execute(String str) throws Exception {
            this.optimizerPlan = this.compiler.compile(createProgramPlan(str));
            throw new ProgramAbortException();
        }

        public String getExecutionPlan() throws Exception {
            this.optimizerPlan = this.compiler.compile(createProgramPlan(null, false));
            throw new ProgramAbortException();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setAsContext() {
            initializeContextEnvironment(new ExecutionEnvironmentFactory() { // from class: org.apache.flink.client.program.Client.OptimizerPlanEnvironment.1
                public ExecutionEnvironment createExecutionEnvironment() {
                    return OptimizerPlanEnvironment.this;
                }
            });
        }

        public void setPlan(FlinkPlan flinkPlan) {
            this.optimizerPlan = flinkPlan;
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/Client$ProgramAbortException.class */
    public static final class ProgramAbortException extends Error {
        private static final long serialVersionUID = 1;
    }

    public Client(InetSocketAddress inetSocketAddress, Configuration configuration, ClassLoader classLoader, int i) throws UnknownHostException {
        this.printStatusDuringExecution = true;
        this.maxSlots = -1;
        this.lastJobId = null;
        Preconditions.checkNotNull(inetSocketAddress, "JobManager address is null");
        Preconditions.checkNotNull(configuration, "Configuration is null");
        Preconditions.checkNotNull(classLoader, "User code ClassLoader is null");
        this.configuration = configuration;
        if (inetSocketAddress.isUnresolved()) {
            String hostName = inetSocketAddress.getHostName();
            if (hostName == null) {
                throw new IllegalArgumentException("Host in jobManagerAddress is null");
            }
            try {
                this.jobManagerAddress = new InetSocketAddress(InetAddress.getByName(hostName), inetSocketAddress.getPort());
            } catch (UnknownHostException e) {
                throw new UnknownHostException("Cannot resolve JobManager host name '" + hostName + "'.");
            }
        } else {
            this.jobManagerAddress = inetSocketAddress;
        }
        this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), this.configuration);
        this.userCodeClassLoader = classLoader;
        this.maxSlots = i;
    }

    public Client(Configuration configuration, ClassLoader classLoader) throws UnknownHostException {
        this.printStatusDuringExecution = true;
        this.maxSlots = -1;
        this.lastJobId = null;
        Preconditions.checkNotNull(configuration, "Configuration is null");
        Preconditions.checkNotNull(classLoader, "User code ClassLoader is null");
        this.configuration = configuration;
        this.userCodeClassLoader = classLoader;
        String string = configuration.getString("jobmanager.rpc.address", (String) null);
        if (string == null) {
            throw new IllegalConfigurationException("Cannot find address to job manager's RPC service in the global configuration.");
        }
        int integer = configuration.getInteger("jobmanager.rpc.port", 6123);
        if (integer < 0) {
            throw new IllegalConfigurationException("Cannot find port to job manager's RPC service in the global configuration.");
        }
        try {
            this.jobManagerAddress = new InetSocketAddress(InetAddress.getByName(string), integer);
            this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), this.configuration);
        } catch (UnknownHostException e) {
            throw new UnknownHostException("Cannot resolve the JobManager hostname '" + string + "' specified in the configuration");
        }
    }

    public void setPrintStatusDuringExecution(boolean z) {
        this.printStatusDuringExecution = z;
    }

    public int getMaxSlots() {
        return this.maxSlots;
    }

    public String getOptimizedPlanAsJson(PackagedProgram packagedProgram, int i) throws CompilerException, ProgramInvocationException {
        return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(getOptimizedPlan(packagedProgram, i));
    }

    public FlinkPlan getOptimizedPlan(PackagedProgram packagedProgram, int i) throws CompilerException, ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
        if (packagedProgram.isUsingProgramEntryPoint()) {
            return getOptimizedPlan(packagedProgram.getPlanWithJars(), i);
        }
        if (!packagedProgram.isUsingInteractiveMode()) {
            throw new RuntimeException();
        }
        OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(this.compiler);
        if (i > 0) {
            optimizerPlanEnvironment.setParallelism(i);
        }
        optimizerPlanEnvironment.setAsContext();
        PrintStream printStream = System.out;
        PrintStream printStream2 = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setOut(new PrintStream(byteArrayOutputStream));
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        System.setErr(new PrintStream(byteArrayOutputStream2));
        try {
            try {
                ContextEnvironment.enableLocalExecution(false);
                packagedProgram.invokeInteractiveModeForExecution();
                ContextEnvironment.enableLocalExecution(true);
                System.setOut(printStream);
                System.setErr(printStream2);
                System.err.println(byteArrayOutputStream2);
                System.out.println(byteArrayOutputStream);
                throw new ProgramInvocationException("The program plan could not be fetched - the program aborted pre-maturely.\nSystem.err: " + byteArrayOutputStream2.toString() + "\nSystem.out: " + byteArrayOutputStream.toString() + '\n');
            } catch (ProgramInvocationException e) {
                throw e;
            } catch (Throwable th) {
                if (optimizerPlanEnvironment.optimizerPlan == null) {
                    throw new ProgramInvocationException("The program caused an error: ", th);
                }
                FlinkPlan flinkPlan = optimizerPlanEnvironment.optimizerPlan;
                ContextEnvironment.enableLocalExecution(true);
                System.setOut(printStream);
                System.setErr(printStream2);
                System.err.println(byteArrayOutputStream2);
                System.out.println(byteArrayOutputStream);
                return flinkPlan;
            }
        } catch (Throwable th2) {
            ContextEnvironment.enableLocalExecution(true);
            System.setOut(printStream);
            System.setErr(printStream2);
            System.err.println(byteArrayOutputStream2);
            System.out.println(byteArrayOutputStream);
            throw th2;
        }
    }

    public FlinkPlan getOptimizedPlan(Plan plan, int i) throws CompilerException {
        if (i > 0 && plan.getDefaultParallelism() <= 0) {
            LOG.debug("Changing plan default parallelism from {} to {}", Integer.valueOf(plan.getDefaultParallelism()), Integer.valueOf(i));
            plan.setDefaultParallelism(i);
        }
        LOG.debug("Set parallelism {}, plan default parallelism {}", Integer.valueOf(i), Integer.valueOf(plan.getDefaultParallelism()));
        return this.compiler.compile(plan);
    }

    public FlinkPlan getOptimizedPlan(JobWithJars jobWithJars, int i) throws CompilerException, ProgramInvocationException {
        return getOptimizedPlan(jobWithJars.getPlan(), i);
    }

    public JobGraph getJobGraph(PackagedProgram packagedProgram, FlinkPlan flinkPlan) throws ProgramInvocationException {
        return getJobGraph(flinkPlan, packagedProgram.getAllLibraries());
    }

    private JobGraph getJobGraph(FlinkPlan flinkPlan, List<File> list) {
        JobGraph jobGraph = flinkPlan instanceof StreamingPlan ? ((StreamingPlan) flinkPlan).getJobGraph() : new JobGraphGenerator().compileJobGraph((OptimizedPlan) flinkPlan);
        Iterator<File> it = list.iterator();
        while (it.hasNext()) {
            jobGraph.addJar(new Path(it.next().getAbsolutePath()));
        }
        return jobGraph;
    }

    public JobSubmissionResult run(PackagedProgram packagedProgram, int i, boolean z) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
        if (packagedProgram.isUsingProgramEntryPoint()) {
            return run(packagedProgram.getPlanWithJars(), i, z);
        }
        if (!packagedProgram.isUsingInteractiveMode()) {
            throw new RuntimeException();
        }
        LOG.info("Starting program in interactive mode");
        ContextEnvironment.setAsContext(this, packagedProgram.getAllLibraries(), packagedProgram.getUserCodeClassLoader(), i, z);
        ContextEnvironment.enableLocalExecution(false);
        try {
            packagedProgram.invokeInteractiveModeForExecution();
            ContextEnvironment.enableLocalExecution(true);
            return new JobSubmissionResult(this.lastJobId);
        } catch (Throwable th) {
            ContextEnvironment.enableLocalExecution(true);
            throw th;
        }
    }

    public JobSubmissionResult run(PackagedProgram packagedProgram, OptimizedPlan optimizedPlan, boolean z) throws ProgramInvocationException {
        return run(optimizedPlan, packagedProgram.getAllLibraries(), z);
    }

    public JobSubmissionResult run(JobWithJars jobWithJars, int i, boolean z) throws CompilerException, ProgramInvocationException {
        return run((OptimizedPlan) getOptimizedPlan(jobWithJars, i), jobWithJars.getJarFiles(), z);
    }

    public JobSubmissionResult run(OptimizedPlan optimizedPlan, List<File> list, boolean z) throws ProgramInvocationException {
        JobGraph jobGraph = getJobGraph((FlinkPlan) optimizedPlan, list);
        this.lastJobId = jobGraph.getJobID();
        return run(jobGraph, z);
    }

    public JobSubmissionResult run(JobGraph jobGraph, boolean z) throws ProgramInvocationException {
        this.lastJobId = jobGraph.getJobID();
        LOG.info("JobManager actor system address is " + this.jobManagerAddress);
        LOG.info("Starting client actor system");
        try {
            ActorSystem startJobClientActorSystem = JobClient.startJobClientActorSystem(this.configuration);
            LOG.info("Looking up JobManager");
            try {
                ActorRef jobManagerRemoteReference = JobManager.getJobManagerRemoteReference(this.jobManagerAddress, startJobClientActorSystem, this.configuration);
                LOG.info("JobManager runs at " + jobManagerRemoteReference.path());
                FiniteDuration timeout = AkkaUtils.getTimeout(this.configuration);
                LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
                LOG.info("Checking and uploading JAR files");
                try {
                    try {
                        JobClient.uploadJarFiles(jobGraph, jobManagerRemoteReference, timeout);
                        try {
                            if (!z) {
                                JobClient.submitJobDetached(jobManagerRemoteReference, jobGraph, timeout);
                                JobSubmissionResult jobSubmissionResult = new JobSubmissionResult(jobGraph.getJobID());
                                startJobClientActorSystem.shutdown();
                                startJobClientActorSystem.awaitTermination();
                                return jobSubmissionResult;
                            }
                            try {
                                JobExecutionResult jobExecutionResult = JobClient.submitJobAndWait(startJobClientActorSystem, jobManagerRemoteReference, jobGraph, timeout, this.printStatusDuringExecution).toJobExecutionResult(this.userCodeClassLoader);
                                startJobClientActorSystem.shutdown();
                                startJobClientActorSystem.awaitTermination();
                                return jobExecutionResult;
                            } catch (Exception e) {
                                throw new ProgramInvocationException("Failed to deserialize the accumulator result after the job execution", e);
                            }
                        } catch (Exception e2) {
                            throw new ProgramInvocationException("Exception during program execution.", e2);
                        } catch (JobExecutionException e3) {
                            throw new ProgramInvocationException("The program execution failed: " + e3.getMessage(), e3);
                        }
                    } catch (IOException e4) {
                        throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e4);
                    }
                } catch (Throwable th) {
                    startJobClientActorSystem.shutdown();
                    startJobClientActorSystem.awaitTermination();
                    throw th;
                }
            } catch (IOException e5) {
                throw new ProgramInvocationException("Failed to resolve JobManager", e5);
            }
        } catch (Exception e6) {
            throw new ProgramInvocationException("Could start client actor system.", e6);
        }
    }
}
