package org.apache.storm.flux;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.flux.model.BoltDef;
import org.apache.storm.flux.model.ExecutionContext;
import org.apache.storm.flux.model.SpoutDef;
import org.apache.storm.flux.model.StreamDef;
import org.apache.storm.flux.model.TopologyDef;
import org.apache.storm.flux.parser.FluxParser;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.TopologyInitialStatus;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ResourceUtils;

/* loaded from: input_file:org/apache/storm/flux/Flux.class */
public class Flux {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Flux.class);
    private static final Long DEFAULT_LOCAL_SLEEP_TIME = 60000L;
    private static final Long DEFAULT_ZK_PORT = 2181L;
    private static final String OPTION_LOCAL = "local";
    private static final String OPTION_REMOTE = "remote";
    private static final String OPTION_RESOURCE = "resource";
    private static final String OPTION_SLEEP = "sleep";
    private static final String OPTION_DRY_RUN = "dry-run";
    private static final String OPTION_NO_DETAIL = "no-detail";
    private static final String OPTION_NO_SPLASH = "no-splash";
    private static final String OPTION_INACTIVE = "inactive";
    private static final String OPTION_ZOOKEEPER = "zookeeper";
    private static final String OPTION_FILTER = "filter";
    private static final String OPTION_ENV_FILTER = "env-filter";

    public static void main(String[] strArr) throws Exception {
        Options options = new Options();
        options.addOption(option(0, "l", "local", "Run the topology in local mode."));
        options.addOption(option(0, "r", OPTION_REMOTE, "Deploy the topology to a remote cluster."));
        options.addOption(option(0, "R", "resource", "Treat the supplied path as a classpath resource instead of a file."));
        options.addOption(option(1, "s", OPTION_SLEEP, "ms", "When running locally, the amount of time to sleep (in ms.) before killing the topology and shutting down the local cluster."));
        options.addOption(option(0, "d", OPTION_DRY_RUN, "Do not run or deploy the topology. Just build, validate, and print information about the topology."));
        options.addOption(option(0, "q", OPTION_NO_DETAIL, "Suppress the printing of topology details."));
        options.addOption(option(0, "n", OPTION_NO_SPLASH, "Suppress the printing of the splash screen."));
        options.addOption(option(0, "i", OPTION_INACTIVE, "Deploy the topology, but do not activate it."));
        options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the specified <host>:<port> instead of the in-process ZooKeeper. (requires Storm 0.9.3 or later)"));
        options.addOption(option(1, "f", "filter", ResourceUtils.URL_PROTOCOL_FILE, "Perform property substitution. Use the specified file as a source of properties, and replace keys identified with {$[property name]} with the value defined in the properties file."));
        options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keysidentified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
        CommandLine parse = new BasicParser().parse(options, strArr);
        if (parse.getArgs().length != 1) {
            usage(options);
            System.exit(1);
        }
        runCli(parse);
    }

    private static Option option(int i, String str, String str2, String str3) {
        return option(i, str, str2, str2, str3);
    }

    private static Option option(int i, String str, String str2, String str3, String str4) {
        OptionBuilder.hasArgs(i);
        OptionBuilder.withArgName(str3);
        OptionBuilder.withLongOpt(str2);
        OptionBuilder.withDescription(str4);
        return OptionBuilder.create(str);
    }

    private static void usage(Options options) {
        new HelpFormatter().printHelp("storm jar <my_topology_uber_jar.jar> " + Flux.class.getName() + " [options] <topology-config.yaml>", options);
    }

    private static void runCli(CommandLine commandLine) throws Exception {
        TopologyDef parseFile;
        String str;
        SubmitOptions submitOptions;
        if (!commandLine.hasOption(OPTION_NO_SPLASH)) {
            printSplash();
        }
        boolean hasOption = commandLine.hasOption("dump-yaml");
        String str2 = (String) commandLine.getArgList().get(0);
        String str3 = null;
        if (commandLine.hasOption("filter")) {
            str3 = commandLine.getOptionValue("filter");
        }
        boolean hasOption2 = commandLine.hasOption(OPTION_ENV_FILTER);
        if (commandLine.hasOption("resource")) {
            printf("Parsing classpath resource: %s", str2);
            parseFile = FluxParser.parseResource(str2, hasOption, true, FluxParser.parseProperties(str3, true), hasOption2);
        } else {
            printf("Parsing file: %s", new File(str2).getAbsolutePath());
            parseFile = FluxParser.parseFile(str2, hasOption, true, FluxParser.parseProperties(str3, false), hasOption2);
        }
        String name = parseFile.getName();
        Config buildConfig = FluxBuilder.buildConfig(parseFile);
        ExecutionContext executionContext = new ExecutionContext(parseFile, buildConfig);
        StormTopology buildTopology = FluxBuilder.buildTopology(executionContext);
        if (!commandLine.hasOption(OPTION_NO_DETAIL)) {
            printTopologyInfo(executionContext);
        }
        if (commandLine.hasOption(OPTION_DRY_RUN)) {
            return;
        }
        if (commandLine.hasOption(OPTION_REMOTE)) {
            LOG.info("Running remotely...");
            if (commandLine.hasOption(OPTION_INACTIVE)) {
                LOG.info("Deploying topology in an INACTIVE state...");
                submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE);
            } else {
                LOG.info("Deploying topology in an ACTIVE state...");
                submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
            }
            StormSubmitter.submitTopology(name, buildConfig, buildTopology, submitOptions, (StormSubmitter.ProgressListener) null);
            return;
        }
        LOG.info("Running in local mode...");
        String optionValue = commandLine.getOptionValue(OPTION_SLEEP);
        Long l = DEFAULT_LOCAL_SLEEP_TIME;
        if (optionValue != null) {
            l = Long.valueOf(Long.parseLong(optionValue));
        }
        LOG.debug("Sleep time: {}", l);
        LocalCluster localCluster = null;
        if (commandLine.hasOption(OPTION_ZOOKEEPER)) {
            String optionValue2 = commandLine.getOptionValue(OPTION_ZOOKEEPER);
            LOG.info("Using ZooKeeper at '{}' instead of in-process one.", optionValue2);
            long longValue = DEFAULT_ZK_PORT.longValue();
            if (optionValue2.contains(":")) {
                String[] split = optionValue2.split(":");
                str = split[0];
                longValue = split.length > 1 ? Long.parseLong(split[1]) : DEFAULT_ZK_PORT.longValue();
            } else {
                str = optionValue2;
            }
            try {
                localCluster = new LocalCluster(str, Long.valueOf(longValue));
            } catch (NoSuchMethodError e) {
                LOG.error("The --zookeeper option can only be used with Apache Storm 0.9.3 and later.");
                System.exit(1);
            }
        } else {
            localCluster = new LocalCluster();
        }
        localCluster.submitTopology(name, buildConfig, buildTopology);
        Utils.sleep(l.longValue());
        localCluster.killTopology(name);
        localCluster.shutdown();
    }

    static void printTopologyInfo(ExecutionContext executionContext) {
        TopologyDef topologyDef = executionContext.getTopologyDef();
        if (topologyDef.isDslTopology()) {
            print("---------- TOPOLOGY DETAILS ----------");
            printf("Topology Name: %s", topologyDef.getName());
            print("--------------- SPOUTS ---------------");
            for (SpoutDef spoutDef : topologyDef.getSpouts()) {
                printf("%s [%d] (%s)", spoutDef.getId(), Integer.valueOf(spoutDef.getParallelism()), spoutDef.getClassName());
            }
            print("---------------- BOLTS ---------------");
            for (BoltDef boltDef : topologyDef.getBolts()) {
                printf("%s [%d] (%s)", boltDef.getId(), Integer.valueOf(boltDef.getParallelism()), boltDef.getClassName());
            }
            print("--------------- STREAMS ---------------");
            for (StreamDef streamDef : topologyDef.getStreams()) {
                printf("%s --%s--> %s", streamDef.getFrom(), streamDef.getGrouping().getType(), streamDef.getTo());
            }
            print("--------------------------------------");
        }
    }

    private static void printf(String str, Object... objArr) {
        print(String.format(str, objArr));
    }

    private static void print(String str) {
        System.out.println(str);
    }

    private static void printSplash() throws IOException {
        InputStream resourceAsStream = Flux.class.getResourceAsStream("/splash.txt");
        if (resourceAsStream == null) {
            return;
        }
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream, "UTF-8"));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                return;
            } else {
                System.out.println(readLine);
            }
        }
    }
}
