/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.storm.api;

import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.utils.Utils;
import java.io.File;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Map;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.storm.api.FlinkClient;
import org.apache.flink.storm.api.FlinkTopology;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSubmitter {
    public static final Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);

    public static void submitTopology(String name, Map<?, ?> stormConf, FlinkTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
        FlinkSubmitter.submitTopology(name, stormConf, topology);
    }

    public static void submitTopology(String name, Map stormConf, FlinkTopology topology) throws AlreadyAliveException, InvalidTopologyException {
        if (!Utils.isValidConf((Map)stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        Configuration flinkConfig = GlobalConfiguration.getConfiguration();
        if (!stormConf.containsKey("nimbus.host")) {
            stormConf.put("nimbus.host", flinkConfig.getString("jobmanager.rpc.address", "localhost"));
        }
        if (!stormConf.containsKey("nimbus.thrift.port")) {
            stormConf.put("nimbus.thrift.port", new Integer(flinkConfig.getInteger("jobmanager.rpc.port", 6123)));
        }
        String serConf = JSONValue.toJSONString(stormConf);
        FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
        try {
            if (client.getTopologyJobId(name) != null) {
                throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
            }
            String localJar = System.getProperty("storm.jar");
            if (localJar == null) {
                try {
                    for (URL url : ((ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment()).getJars()) {
                        localJar = new File(url.toURI()).getAbsolutePath();
                    }
                }
                catch (URISyntaxException uRISyntaxException) {
                }
                catch (ClassCastException classCastException) {
                    // empty catch block
                }
            }
            logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
            client.submitTopologyWithOpts(name, localJar, topology);
        }
        catch (InvalidTopologyException e) {
            logger.warn("Topology submission exception: " + e.get_msg());
            throw e;
        }
        catch (AlreadyAliveException e) {
            logger.warn("Topology already alive exception", (Throwable)e);
            throw e;
        }
        logger.info("Finished submitting topology: " + name);
    }

    public static void submitTopologyWithProgressBar(String name, Map<?, ?> stormConf, FlinkTopology topology) throws AlreadyAliveException, InvalidTopologyException {
        FlinkSubmitter.submitTopology(name, stormConf, topology);
    }

    public static String submitJar(Map conf, String localJar) {
        return FlinkSubmitter.submitJar(localJar);
    }

    public static String submitJar(String localJar) {
        if (localJar == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload");
        }
        return localJar;
    }

    public static interface FlinkProgressListener {
    }
}

