/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.storm.api;

import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.RebalanceOptions;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyInfo;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.storm.api.FlinkClient;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkLocalCluster {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
    private FlinkMiniCluster flink;
    public static final String SUBMIT_BLOCKING = "SUBMIT_STORM_TOPOLOGY_BLOCKING";
    private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();

    public FlinkLocalCluster() {
    }

    public FlinkLocalCluster(FlinkMiniCluster flink) {
        this.flink = Objects.requireNonNull(flink);
    }

    public void submitTopology(String topologyName, Map conf, FlinkTopology topology) throws Exception {
        this.submitTopologyWithOpts(topologyName, conf, topology, null);
    }

    public void submitTopologyWithOpts(String topologyName, Map conf, FlinkTopology topology, SubmitOptions submitOpts) throws Exception {
        Object blockingFlag;
        LOG.info("Running Storm topology on FlinkLocalCluster");
        boolean submitBlocking = false;
        if (conf != null && (blockingFlag = conf.get(SUBMIT_BLOCKING)) != null && blockingFlag instanceof Boolean) {
            submitBlocking = (Boolean)blockingFlag;
        }
        FlinkClient.addStormConfigToTopology(topology, conf);
        StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
        streamGraph.setJobName(topologyName);
        JobGraph jobGraph = streamGraph.getJobGraph();
        if (this.flink == null) {
            Configuration configuration = new Configuration();
            configuration.addAll(jobGraph.getJobConfiguration());
            configuration.setLong("taskmanager.memory.size", -1L);
            configuration.setInteger("taskmanager.numberOfTaskSlots", jobGraph.getMaximumParallelism());
            this.flink = new LocalFlinkMiniCluster(configuration, true);
            this.flink.start();
        }
        if (submitBlocking) {
            this.flink.submitJobAndWait(jobGraph, false);
        } else {
            this.flink.submitJobDetached(jobGraph);
        }
    }

    public void killTopology(String topologyName) {
        this.killTopologyWithOpts(topologyName, null);
    }

    public void killTopologyWithOpts(String name, KillOptions options) {
    }

    public void activate(String topologyName) {
    }

    public void deactivate(String topologyName) {
    }

    public void rebalance(String name, RebalanceOptions options) {
    }

    public void shutdown() {
        if (this.flink != null) {
            this.flink.stop();
            this.flink = null;
        }
    }

    public String getTopologyConf(String id) {
        return null;
    }

    public StormTopology getTopology(String id) {
        return null;
    }

    public ClusterSummary getClusterInfo() {
        return null;
    }

    public TopologyInfo getTopologyInfo(String id) {
        return null;
    }

    public Map<?, ?> getState() {
        return null;
    }

    public static FlinkLocalCluster getLocalCluster() {
        return currentFactory.createLocalCluster();
    }

    public static void initialize(LocalClusterFactory clusterFactory) {
        currentFactory = Objects.requireNonNull(clusterFactory);
    }

    public static class DefaultLocalClusterFactory
    implements LocalClusterFactory {
        @Override
        public FlinkLocalCluster createLocalCluster() {
            return new FlinkLocalCluster();
        }
    }

    public static interface LocalClusterFactory {
        public FlinkLocalCluster createLocalCluster();
    }
}

