/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.storm.api;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class FlinkClient {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkClient.class);
    private final Map<?, ?> conf;
    private final String jobManagerHost;
    private final int jobManagerPort;
    private final String timeout;

    public FlinkClient(Map conf, String host, int port) {
        this(conf, host, port, null);
    }

    public FlinkClient(Map conf, String host, int port, Integer timeout) {
        this.conf = conf;
        this.jobManagerHost = host;
        this.jobManagerPort = port;
        this.timeout = timeout != null ? timeout + " ms" : null;
    }

    public static FlinkClient getConfiguredClient(Map conf) {
        String nimbusHost = (String)conf.get("nimbus.host");
        int nimbusPort = Utils.getInt(conf.get("nimbus.thrift.port"));
        return new FlinkClient(conf, nimbusHost, nimbusPort);
    }

    public FlinkClient getClient() {
        return this;
    }

    public void submitTopology(String name, String uploadedJarLocation, FlinkTopology topology) throws AlreadyAliveException, InvalidTopologyException {
        this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
    }

    public void submitTopologyWithOpts(String name, String uploadedJarLocation, FlinkTopology topology) throws AlreadyAliveException, InvalidTopologyException {
        StandaloneClusterClient client;
        URL uploadedJarUrl;
        URI uploadedJarUri;
        if (this.getTopologyJobId(name) != null) {
            throw new AlreadyAliveException();
        }
        try {
            uploadedJarUri = new File(uploadedJarLocation).getAbsoluteFile().toURI();
            uploadedJarUrl = uploadedJarUri.toURL();
            JobWithJars.checkJarFile((URL)uploadedJarUrl);
        }
        catch (IOException e) {
            throw new RuntimeException("Problem with jar file " + uploadedJarLocation, e);
        }
        try {
            FlinkClient.addStormConfigToTopology(topology, this.conf);
        }
        catch (ClassNotFoundException e) {
            LOG.error("Could not register class for Kryo serialization.", (Throwable)e);
            throw new InvalidTopologyException("Could not register class for Kryo serialization.");
        }
        StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
        streamGraph.setJobName(name);
        JobGraph jobGraph = streamGraph.getJobGraph();
        jobGraph.addJar(new Path(uploadedJarUri));
        Configuration configuration = jobGraph.getJobConfiguration();
        configuration.setString("jobmanager.rpc.address", this.jobManagerHost);
        configuration.setInteger("jobmanager.rpc.port", this.jobManagerPort);
        try {
            client = new StandaloneClusterClient(configuration);
        }
        catch (IOException e) {
            throw new RuntimeException("Could not establish a connection to the job manager", e);
        }
        try {
            ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader((List)Lists.newArrayList((Object[])new URL[]{uploadedJarUrl}), Collections.emptyList(), (ClassLoader)this.getClass().getClassLoader());
            client.runDetached(jobGraph, classLoader);
        }
        catch (ProgramInvocationException e) {
            throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
        }
    }

    public void killTopology(String name) throws NotAliveException {
        this.killTopologyWithOpts(name, null);
    }

    public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException {
        StandaloneClusterClient client;
        JobID jobId = this.getTopologyJobId(name);
        if (jobId == null) {
            throw new NotAliveException("Storm topology with name " + name + " not found.");
        }
        if (options != null) {
            try {
                Thread.sleep(1000 * options.get_wait_secs());
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        Configuration configuration = GlobalConfiguration.getConfiguration();
        configuration.setString("jobmanager.rpc.address", this.jobManagerHost);
        configuration.setInteger("jobmanager.rpc.port", this.jobManagerPort);
        try {
            client = new StandaloneClusterClient(configuration);
        }
        catch (IOException e) {
            throw new RuntimeException("Could not establish a connection to the job manager", e);
        }
        try {
            client.stop(jobId);
        }
        catch (Exception e) {
            throw new RuntimeException("Cannot stop job.", e);
        }
    }

    JobID getTopologyJobId(String id) {
        block7: {
            Configuration configuration = GlobalConfiguration.getConfiguration();
            if (this.timeout != null) {
                configuration.setString("akka.ask.timeout", this.timeout);
            }
            try {
                Object result2;
                ActorRef jobManager = this.getJobManager();
                FiniteDuration askTimeout = this.getTimeout();
                Future response = Patterns.ask((ActorRef)jobManager, (Object)JobManagerMessages.getRequestRunningJobsStatus(), (Timeout)new Timeout(askTimeout));
                try {
                    result2 = Await.result((Awaitable)response, (Duration)askTimeout);
                }
                catch (Exception e) {
                    throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
                }
                if (result2 instanceof JobManagerMessages.RunningJobsStatus) {
                    List jobs = ((JobManagerMessages.RunningJobsStatus)result2).getStatusMessages();
                    for (JobStatusMessage status : jobs) {
                        if (!status.getJobName().equals(id)) continue;
                        return status.getJobId();
                    }
                    break block7;
                }
                throw new RuntimeException("ReqeustRunningJobs requires a response of type RunningJobs. Instead the response is of type " + result2.getClass() + ".");
            }
            catch (IOException e) {
                throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost + ":" + this.jobManagerPort, e);
            }
        }
        return null;
    }

    private FiniteDuration getTimeout() {
        Configuration configuration = GlobalConfiguration.getConfiguration();
        if (this.timeout != null) {
            configuration.setString("akka.ask.timeout", this.timeout);
        }
        return AkkaUtils.getClientTimeout((Configuration)configuration);
    }

    private ActorRef getJobManager() throws IOException {
        ActorSystem actorSystem;
        Configuration configuration = GlobalConfiguration.getConfiguration();
        try {
            Tuple2 systemEndpoint = new Tuple2((Object)"", (Object)0);
            actorSystem = AkkaUtils.createActorSystem((Configuration)configuration, (Option)new Some((Object)systemEndpoint));
        }
        catch (Exception e) {
            throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
        }
        return JobManager.getJobManagerActorRef((InetSocketAddress)new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), (ActorSystem)actorSystem, (FiniteDuration)AkkaUtils.getLookupTimeout((Configuration)configuration));
    }

    static void addStormConfigToTopology(FlinkTopology topology, Map conf) throws ClassNotFoundException {
        if (conf != null) {
            ExecutionConfig flinkConfig = topology.getExecutionEnvironment().getConfig();
            flinkConfig.setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)new StormConfig(conf));
            List registeredClasses = (List)conf.get("topology.kryo.register");
            if (registeredClasses != null) {
                for (Object klass : registeredClasses) {
                    if (klass instanceof String) {
                        flinkConfig.registerKryoType(Class.forName((String)klass));
                        continue;
                    }
                    for (Map.Entry register : ((Map)klass).entrySet()) {
                        flinkConfig.registerTypeWithKryoSerializer(Class.forName((String)register.getKey()), Class.forName((String)register.getValue()));
                    }
                }
            }
        }
    }
}

