package org.apache.flink.storm.api;

import java.util.Map;
import java.util.Objects;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.RebalanceOptions;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.TopologyInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/storm/api/FlinkLocalCluster.class */
public class FlinkLocalCluster {
    private FlinkMiniCluster flink;
    public static final String SUBMIT_BLOCKING = "SUBMIT_STORM_TOPOLOGY_BLOCKING";
    private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
    private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();

    /* loaded from: input_file:org/apache/flink/storm/api/FlinkLocalCluster$DefaultLocalClusterFactory.class */
    public static class DefaultLocalClusterFactory implements LocalClusterFactory {
        @Override // org.apache.flink.storm.api.FlinkLocalCluster.LocalClusterFactory
        public FlinkLocalCluster createLocalCluster() {
            return new FlinkLocalCluster();
        }
    }

    /* loaded from: input_file:org/apache/flink/storm/api/FlinkLocalCluster$LocalClusterFactory.class */
    public interface LocalClusterFactory {
        FlinkLocalCluster createLocalCluster();
    }

    public FlinkLocalCluster() {
    }

    public FlinkLocalCluster(FlinkMiniCluster flinkMiniCluster) {
        this.flink = (FlinkMiniCluster) Objects.requireNonNull(flinkMiniCluster);
    }

    public void submitTopology(String str, Map map, FlinkTopology flinkTopology) throws Exception {
        submitTopologyWithOpts(str, map, flinkTopology, null);
    }

    public void submitTopologyWithOpts(String str, Map map, FlinkTopology flinkTopology, SubmitOptions submitOptions) throws Exception {
        LOG.info("Running Storm topology on FlinkLocalCluster");
        boolean z = false;
        if (map != null) {
            Object obj = map.get(SUBMIT_BLOCKING);
            if (obj instanceof Boolean) {
                z = ((Boolean) obj).booleanValue();
            }
        }
        FlinkClient.addStormConfigToTopology(flinkTopology, map);
        StreamGraph streamGraph = flinkTopology.getExecutionEnvironment().getStreamGraph();
        streamGraph.setJobName(str);
        JobGraph jobGraph = streamGraph.getJobGraph();
        if (this.flink == null) {
            Configuration configuration = new Configuration();
            configuration.addAll(jobGraph.getJobConfiguration());
            configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
            configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
            this.flink = new LocalFlinkMiniCluster(configuration, true);
            this.flink.start();
        }
        if (z) {
            this.flink.submitJobAndWait(jobGraph, false);
        } else {
            this.flink.submitJobDetached(jobGraph);
        }
    }

    public void killTopology(String str) {
        killTopologyWithOpts(str, null);
    }

    public void killTopologyWithOpts(String str, KillOptions killOptions) {
    }

    public void activate(String str) {
    }

    public void deactivate(String str) {
    }

    public void rebalance(String str, RebalanceOptions rebalanceOptions) {
    }

    public void shutdown() {
        if (this.flink != null) {
            this.flink.stop();
            this.flink = null;
        }
    }

    public String getTopologyConf(String str) {
        return null;
    }

    public StormTopology getTopology(String str) {
        return null;
    }

    public ClusterSummary getClusterInfo() {
        return null;
    }

    public TopologyInfo getTopologyInfo(String str) {
        return null;
    }

    public Map<?, ?> getState() {
        return null;
    }

    public static FlinkLocalCluster getLocalCluster() {
        return currentFactory.createLocalCluster();
    }

    public static void initialize(LocalClusterFactory localClusterFactory) {
        currentFactory = (LocalClusterFactory) Objects.requireNonNull(localClusterFactory);
    }
}
