package org.apache.storm.utils;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.generated.ComponentObject;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.json.simple.JSONValue;
import org.apache.storm.shade.org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/utils/TopologySpoutLag.class */
public class TopologySpoutLag {
    private static final String SPOUT_ID = "spoutId";
    private static final String SPOUT_TYPE = "spoutType";
    private static final String SPOUT_LAG_RESULT = "spoutLagResult";
    private static final String ERROR_INFO = "errorInfo";
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TopologySpoutLag.class);

    public static Map<String, Map<String, Object>> lag(StormTopology stormTopology, Map map) {
        HashMap hashMap = new HashMap();
        String str = null;
        for (Map.Entry<String, SpoutSpec> entry : stormTopology.get_spouts().entrySet()) {
            try {
                SpoutSpec value = entry.getValue();
                str = getClassNameFromComponentObject(value.get_spout_object());
                logger.debug("spout classname: {}", str);
                if (str.endsWith("storm.kafka.spout.KafkaSpout")) {
                    hashMap.put(entry.getKey(), getLagResultForNewKafkaSpout(entry.getKey(), value, map));
                } else if (str.endsWith("storm.kafka.KafkaSpout")) {
                    hashMap.put(entry.getKey(), getLagResultForOldKafkaSpout(entry.getKey(), value, map));
                }
            } catch (Exception e) {
                logger.warn("Exception thrown while getting lag for spout id: " + entry.getKey() + " and spout class: " + str);
                logger.warn("Exception message:" + e.getMessage(), (Throwable) e);
            }
        }
        return hashMap;
    }

    private static String getClassNameFromComponentObject(ComponentObject componentObject) {
        try {
            return Utils.getSetComponentObject(componentObject).getClass().getCanonicalName();
        } catch (RuntimeException e) {
            if (e.getCause() instanceof ClassNotFoundException) {
                return e.getCause().getMessage().trim();
            }
            throw e;
        }
    }

    private static List<String> getCommandLineOptionsForNewKafkaSpout(Map<String, Object> map) {
        logger.debug("json configuration: {}", map);
        ArrayList arrayList = new ArrayList();
        arrayList.add("-t");
        arrayList.add((String) map.get("config.topics"));
        arrayList.add("-g");
        arrayList.add((String) map.get("config.groupid"));
        arrayList.add("-b");
        arrayList.add((String) map.get("config.bootstrap.servers"));
        String str = (String) map.get("config.security.protocol");
        if (str != null && !str.isEmpty()) {
            arrayList.add("-s");
            arrayList.add(str);
        }
        return arrayList;
    }

    private static List<String> getCommandLineOptionsForOldKafkaSpout(Map<String, Object> map, Map map2) {
        logger.debug("json configuration: {}", map);
        ArrayList arrayList = new ArrayList();
        arrayList.add("-o");
        arrayList.add("-t");
        arrayList.add((String) map.get("config.topics"));
        arrayList.add("-n");
        arrayList.add((String) map.get("config.zkRoot"));
        String str = (String) map.get("config.zkServers");
        if (str == null || str.isEmpty()) {
            StringBuilder sb = new StringBuilder();
            Integer valueOf = Integer.valueOf(((Number) map2.get(Config.STORM_ZOOKEEPER_PORT)).intValue());
            Iterator it = ((List) map2.get(Config.STORM_ZOOKEEPER_SERVERS)).iterator();
            while (it.hasNext()) {
                sb.append(((String) it.next()) + ":" + valueOf + ",");
            }
            str = sb.toString();
        }
        arrayList.add("-z");
        arrayList.add(str);
        if (map.get("config.leaders") != null) {
            arrayList.add("-p");
            arrayList.add((String) map.get("config.partitions"));
            arrayList.add("-l");
            arrayList.add((String) map.get("config.leaders"));
        } else {
            arrayList.add("-r");
            arrayList.add((String) map.get("config.zkNodeBrokers"));
            Boolean bool = (Boolean) map2.get("kafka.topic.wildcard.match");
            if (bool != null && bool.booleanValue()) {
                arrayList.add("-w");
            }
        }
        return arrayList;
    }

    private static Map<String, Object> getLagResultForKafka(String str, SpoutSpec spoutSpec, Map map, boolean z) throws IOException {
        String str2 = spoutSpec.get_common().get_json_conf();
        Map map2 = null;
        String str3 = "Offset lags for kafka not supported for older versions. Please update kafka spout to latest version.";
        if (str2 != null && !str2.isEmpty()) {
            ArrayList arrayList = new ArrayList();
            String str4 = System.getenv("STORM_BASE_DIR");
            if (str4 != null && !str4.endsWith("/")) {
                str4 = str4 + File.separator;
            }
            arrayList.add(str4 != null ? str4 + "bin" + File.separator + "storm-kafka-monitor" : "storm-kafka-monitor");
            try {
                Map map3 = (Map) JSONValue.parseWithException(str2);
                arrayList.addAll(z ? getCommandLineOptionsForOldKafkaSpout(map3, map) : getCommandLineOptionsForNewKafkaSpout(map3));
                logger.debug("Command to run: {}", arrayList);
                if (!arrayList.contains(null)) {
                    String execCommand = ShellUtils.execCommand((String[]) arrayList.toArray(new String[0]));
                    try {
                        map2 = (Map) JSONValue.parseWithException(execCommand);
                    } catch (ParseException e) {
                        logger.debug("JSON parsing failed, assuming message as error message: {}", execCommand);
                        str3 = execCommand;
                    }
                }
            } catch (ParseException e2) {
                throw new IOException(e2);
            }
        }
        HashMap hashMap = new HashMap();
        hashMap.put(SPOUT_ID, str);
        hashMap.put(SPOUT_TYPE, "KAFKA");
        if (map2 != null) {
            hashMap.put(SPOUT_LAG_RESULT, map2);
        } else {
            hashMap.put(ERROR_INFO, str3);
        }
        return hashMap;
    }

    private static Map<String, Object> getLagResultForNewKafkaSpout(String str, SpoutSpec spoutSpec, Map map) throws IOException {
        return getLagResultForKafka(str, spoutSpec, map, false);
    }

    private static Map<String, Object> getLagResultForOldKafkaSpout(String str, SpoutSpec spoutSpec, Map map) throws IOException {
        return getLagResultForKafka(str, spoutSpec, map, true);
    }
}
