package org.apache.flink.storm.api;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.apache.flink.storm.util.StormConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/storm/api/FlinkClient.class */
public class FlinkClient {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkClient.class);
    private final Map<?, ?> conf;
    private final String jobManagerHost;
    private final int jobManagerPort;
    private final String timeout;

    public FlinkClient(Map map, String str, int i) {
        this(map, str, i, null);
    }

    public FlinkClient(Map map, String str, int i, Integer num) {
        this.conf = map;
        this.jobManagerHost = str;
        this.jobManagerPort = i;
        if (num != null) {
            this.timeout = num + " ms";
        } else {
            this.timeout = null;
        }
    }

    public static FlinkClient getConfiguredClient(Map map) {
        return new FlinkClient(map, (String) map.get("nimbus.host"), Utils.getInt(map.get("nimbus.thrift.port")).intValue());
    }

    public FlinkClient getClient() {
        return this;
    }

    public void submitTopology(String str, String str2, FlinkTopology flinkTopology) throws AlreadyAliveException, InvalidTopologyException {
        submitTopologyWithOpts(str, str2, flinkTopology);
    }

    public void submitTopologyWithOpts(String str, String str2, FlinkTopology flinkTopology) throws AlreadyAliveException, InvalidTopologyException {
        if (getTopologyJobId(str) != null) {
            throw new AlreadyAliveException();
        }
        try {
            URI uri = new File(str2).getAbsoluteFile().toURI();
            URL url = uri.toURL();
            JobWithJars.checkJarFile(url);
            try {
                addStormConfigToTopology(flinkTopology, this.conf);
                JobGraph jobGraph = flinkTopology.getStreamGraph().getJobGraph(str);
                jobGraph.addJar(new Path(uri));
                Configuration jobConfiguration = jobGraph.getJobConfiguration();
                jobConfiguration.setString("jobmanager.rpc.address", this.jobManagerHost);
                jobConfiguration.setInteger("jobmanager.rpc.port", this.jobManagerPort);
                try {
                    try {
                        new Client(jobConfiguration).runDetached(jobGraph, JobWithJars.buildUserCodeClassLoader(Lists.newArrayList(url), Collections.emptyList(), getClass().getClassLoader()));
                    } catch (ProgramInvocationException e) {
                        throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
                    }
                } catch (IOException e2) {
                    throw new RuntimeException("Could not establish a connection to the job manager", e2);
                }
            } catch (ClassNotFoundException e3) {
                LOG.error("Could not register class for Kryo serialization.", e3);
                throw new InvalidTopologyException("Could not register class for Kryo serialization.");
            }
        } catch (IOException e4) {
            throw new RuntimeException("Problem with jar file " + str2, e4);
        }
    }

    public void killTopology(String str) throws NotAliveException {
        killTopologyWithOpts(str, null);
    }

    public void killTopologyWithOpts(String str, KillOptions killOptions) throws NotAliveException {
        JobID topologyJobId = getTopologyJobId(str);
        if (topologyJobId == null) {
            throw new NotAliveException("Storm topology with name " + str + " not found.");
        }
        if (killOptions != null) {
            try {
                Thread.sleep(1000 * killOptions.get_wait_secs());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        Configuration configuration = GlobalConfiguration.getConfiguration();
        configuration.setString("jobmanager.rpc.address", this.jobManagerHost);
        configuration.setInteger("jobmanager.rpc.port", this.jobManagerPort);
        try {
            try {
                new Client(configuration).cancel(topologyJobId);
            } catch (Exception e2) {
                throw new RuntimeException("Cannot stop job.", e2);
            }
        } catch (IOException e3) {
            throw new RuntimeException("Could not establish a connection to the job manager", e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobID getTopologyJobId(String str) {
        Configuration configuration = GlobalConfiguration.getConfiguration();
        if (this.timeout != null) {
            configuration.setString("akka.ask.timeout", this.timeout);
        }
        try {
            ActorRef jobManager = getJobManager();
            FiniteDuration timeout = getTimeout();
            try {
                Object result = Await.result(Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(timeout)), timeout);
                if (!(result instanceof JobManagerMessages.RunningJobsStatus)) {
                    throw new RuntimeException("ReqeustRunningJobs requires a response of type RunningJobs. Instead the response is of type " + result.getClass() + ".");
                }
                for (JobStatusMessage jobStatusMessage : ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages()) {
                    if (jobStatusMessage.getJobName().equals(str)) {
                        return jobStatusMessage.getJobId();
                    }
                }
                return null;
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
            }
        } catch (IOException e2) {
            throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost + ":" + this.jobManagerPort, e2);
        }
    }

    private FiniteDuration getTimeout() {
        Configuration configuration = GlobalConfiguration.getConfiguration();
        if (this.timeout != null) {
            configuration.setString("akka.ask.timeout", this.timeout);
        }
        return AkkaUtils.getTimeout(configuration);
    }

    private ActorRef getJobManager() throws IOException {
        Configuration configuration = GlobalConfiguration.getConfiguration();
        try {
            return JobManager.getJobManagerActorRef(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), AkkaUtils.createActorSystem(configuration, new Some(new Tuple2("", 0))), AkkaUtils.getLookupTimeout(configuration));
        } catch (Exception e) {
            throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void addStormConfigToTopology(FlinkTopology flinkTopology, Map map) throws ClassNotFoundException {
        if (map != null) {
            ExecutionConfig config = FlinkTopology.getExecutionEnvironment().getConfig();
            config.setGlobalJobParameters(new StormConfig(map));
            List list = (List) map.get("topology.kryo.register");
            if (list != null) {
                for (Object obj : list) {
                    if (obj instanceof String) {
                        config.registerKryoType(Class.forName((String) obj));
                    } else {
                        for (Map.Entry entry : ((Map) obj).entrySet()) {
                            config.registerTypeWithKryoSerializer(Class.forName((String) entry.getKey()), Class.forName((String) entry.getValue()));
                        }
                    }
                }
            }
        }
    }
}
