/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.storm.hook;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.storm.hook.StormTopologyUtil;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.storm.ISubmitterHook;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StormAtlasHook
extends AtlasHook
implements ISubmitterHook {
    public static final Logger LOG = LoggerFactory.getLogger(StormAtlasHook.class);
    private static final String CONF_PREFIX = "atlas.hook.storm.";
    private static final String HOOK_NUM_RETRIES = "atlas.hook.storm.numRetries";
    public static final String ANONYMOUS_OWNER = "anonymous";
    public static final String HBASE_NAMESPACE_DEFAULT = "default";

    protected String getNumberOfRetriesPropertyKey() {
        return HOOK_NUM_RETRIES;
    }

    public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology stormTopology) throws IllegalAccessException {
        LOG.info("Collecting metadata for a new storm topology: {}", (Object)topologyInfo.get_name());
        try {
            ArrayList<Referenceable> entities = new ArrayList<Referenceable>();
            Referenceable topologyReferenceable = this.createTopologyInstance(topologyInfo, stormConf);
            List<Referenceable> dependentEntities = this.addTopologyDataSets(stormTopology, topologyReferenceable, topologyInfo.get_owner(), stormConf);
            if (dependentEntities.size() > 0) {
                entities.addAll(dependentEntities);
            }
            ArrayList<Referenceable> graphNodes = this.createTopologyGraph(stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts());
            topologyReferenceable.set("nodes", graphNodes);
            entities.add(topologyReferenceable);
            LOG.debug("notifying entities, size = {}", (Object)entities.size());
            String user = StormAtlasHook.getUser((String)topologyInfo.get_owner(), null);
            this.notifyEntities(user, entities);
        }
        catch (Exception e) {
            throw new RuntimeException("Atlas hook is unable to process the topology.", e);
        }
    }

    private Referenceable createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) throws Exception {
        Referenceable topologyReferenceable = new Referenceable(StormDataTypes.STORM_TOPOLOGY.getName(), new String[0]);
        topologyReferenceable.set("id", (Object)topologyInfo.get_id());
        topologyReferenceable.set("name", (Object)topologyInfo.get_name());
        topologyReferenceable.set("qualifiedName", (Object)topologyInfo.get_name());
        String owner = topologyInfo.get_owner();
        if (StringUtils.isEmpty((String)owner)) {
            owner = ANONYMOUS_OWNER;
        }
        topologyReferenceable.set("owner", (Object)owner);
        topologyReferenceable.set("startTime", (Object)new Date(System.currentTimeMillis()));
        topologyReferenceable.set("clusterName", (Object)this.getClusterName(stormConf));
        return topologyReferenceable;
    }

    private List<Referenceable> addTopologyDataSets(StormTopology stormTopology, Referenceable topologyReferenceable, String topologyOwner, Map stormConf) throws Exception {
        ArrayList<Referenceable> dependentEntities = new ArrayList<Referenceable>();
        this.addTopologyInputs(topologyReferenceable, stormTopology.get_spouts(), stormConf, topologyOwner, dependentEntities);
        this.addTopologyOutputs(topologyReferenceable, stormTopology, topologyOwner, stormConf, dependentEntities);
        return dependentEntities;
    }

    private void addTopologyInputs(Referenceable topologyReferenceable, Map<String, SpoutSpec> spouts, Map stormConf, String topologyOwner, List<Referenceable> dependentEntities) throws IllegalAccessException {
        ArrayList<Referenceable> inputDataSets = new ArrayList<Referenceable>();
        for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
            Serializable instance = (Serializable)Utils.javaDeserialize((byte[])entry.getValue().get_spout_object().get_serialized_java(), Serializable.class);
            String simpleName = instance.getClass().getSimpleName();
            Referenceable datasetRef = this.createDataSet(simpleName, topologyOwner, instance, stormConf, dependentEntities);
            if (datasetRef == null) continue;
            inputDataSets.add(datasetRef);
        }
        topologyReferenceable.set("inputs", inputDataSets);
    }

    private void addTopologyOutputs(Referenceable topologyReferenceable, StormTopology stormTopology, String topologyOwner, Map stormConf, List<Referenceable> dependentEntities) throws Exception {
        ArrayList<Referenceable> outputDataSets = new ArrayList<Referenceable>();
        Map bolts = stormTopology.get_bolts();
        Set<String> terminalBoltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology);
        for (String terminalBoltName : terminalBoltNames) {
            Serializable instance = (Serializable)Utils.javaDeserialize((byte[])((Bolt)bolts.get(terminalBoltName)).get_bolt_object().get_serialized_java(), Serializable.class);
            String dataSetType = instance.getClass().getSimpleName();
            Referenceable datasetRef = this.createDataSet(dataSetType, topologyOwner, instance, stormConf, dependentEntities);
            if (datasetRef == null) continue;
            outputDataSets.add(datasetRef);
        }
        topologyReferenceable.set("outputs", outputDataSets);
    }

    private Referenceable createDataSet(String name, String topologyOwner, Serializable instance, Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException {
        Referenceable dataSetReferenceable;
        Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null);
        String clusterName = null;
        switch (name) {
            case "KafkaSpout": {
                dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName(), new String[0]);
                String topicName = config.get("KafkaSpout._spoutConfig.topic");
                dataSetReferenceable.set("topic", (Object)topicName);
                dataSetReferenceable.set("uri", (Object)config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"));
                if (StringUtils.isEmpty((String)topologyOwner)) {
                    topologyOwner = ANONYMOUS_OWNER;
                }
                dataSetReferenceable.set("owner", (Object)topologyOwner);
                dataSetReferenceable.set("qualifiedName", (Object)StormAtlasHook.getKafkaTopicQualifiedName(this.getClusterName(stormConf), topicName));
                dataSetReferenceable.set("name", (Object)topicName);
                break;
            }
            case "HBaseBolt": {
                dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName(), new String[0]);
                String hbaseTableName = config.get("HBaseBolt.tableName");
                dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir"));
                dataSetReferenceable.set("name", (Object)hbaseTableName);
                dataSetReferenceable.set("owner", stormConf.get("storm.kerberos.principal"));
                clusterName = this.extractComponentClusterName(HBaseConfiguration.create(), stormConf);
                dataSetReferenceable.set("qualifiedName", (Object)StormAtlasHook.getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName));
                break;
            }
            case "HdfsBolt": {
                dataSetReferenceable = new Referenceable("hdfs_path", new String[0]);
                String hdfsUri = config.get("HdfsBolt.rotationActions") == null ? config.get("HdfsBolt.fileNameFormat.path") : config.get("HdfsBolt.rotationActions");
                String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
                dataSetReferenceable.set("clusterName", (Object)this.getClusterName(stormConf));
                dataSetReferenceable.set("qualifiedName", (Object)hdfsPathStr);
                dataSetReferenceable.set("path", (Object)hdfsPathStr);
                dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal"));
                Path hdfsPath = new Path(hdfsPathStr);
                dataSetReferenceable.set("name", (Object)Path.getPathWithoutSchemeAndAuthority((Path)hdfsPath).toString().toLowerCase());
                break;
            }
            case "HiveBolt": {
                Referenceable dbReferenceable = new Referenceable("hive_db", new String[0]);
                String databaseName = config.get("HiveBolt.options.databaseName");
                dbReferenceable.set("name", (Object)databaseName);
                dbReferenceable.set("qualifiedName", (Object)HiveMetaStoreBridge.getDBQualifiedName((String)this.getClusterName(stormConf), (String)databaseName));
                dbReferenceable.set("clusterName", (Object)this.getClusterName(stormConf));
                dependentEntities.add(dbReferenceable);
                clusterName = this.extractComponentClusterName((Configuration)new HiveConf(), stormConf);
                String hiveTableName = config.get("HiveBolt.options.tableName");
                dataSetReferenceable = new Referenceable("hive_table", new String[0]);
                String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)clusterName, (String)databaseName, (String)hiveTableName);
                dataSetReferenceable.set("name", (Object)hiveTableName);
                dataSetReferenceable.set("db", (Object)dbReferenceable);
                dataSetReferenceable.set("qualifiedName", (Object)tableQualifiedName);
                break;
            }
            default: {
                return null;
            }
        }
        dependentEntities.add(dataSetReferenceable);
        return dataSetReferenceable;
    }

    private String extractComponentClusterName(Configuration configuration, Map stormConf) {
        String clusterName = configuration.get("atlas.cluster.name", null);
        if (clusterName == null) {
            clusterName = this.getClusterName(stormConf);
        }
        return clusterName;
    }

    private ArrayList<Referenceable> createTopologyGraph(StormTopology stormTopology, Map<String, SpoutSpec> spouts, Map<String, Bolt> bolts) throws Exception {
        HashMap<String, Referenceable> nodeEntities = new HashMap<String, Referenceable>();
        this.addSpouts(spouts, nodeEntities);
        this.addBolts(bolts, nodeEntities);
        this.addGraphConnections(stormTopology, nodeEntities);
        ArrayList<Referenceable> nodes = new ArrayList<Referenceable>();
        nodes.addAll(nodeEntities.values());
        return nodes;
    }

    private void addSpouts(Map<String, SpoutSpec> spouts, Map<String, Referenceable> nodeEntities) throws IllegalAccessException {
        for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
            String spoutName = entry.getKey();
            Referenceable spoutReferenceable = this.createSpoutInstance(spoutName, entry.getValue());
            nodeEntities.put(spoutName, spoutReferenceable);
        }
    }

    private Referenceable createSpoutInstance(String spoutName, SpoutSpec stormSpout) throws IllegalAccessException {
        Referenceable spoutReferenceable = new Referenceable(StormDataTypes.STORM_SPOUT.getName(), new String[0]);
        spoutReferenceable.set("name", (Object)spoutName);
        Serializable instance = (Serializable)Utils.javaDeserialize((byte[])stormSpout.get_spout_object().get_serialized_java(), Serializable.class);
        spoutReferenceable.set("driverClass", (Object)instance.getClass().getName());
        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
        spoutReferenceable.set("conf", flatConfigMap);
        return spoutReferenceable;
    }

    private void addBolts(Map<String, Bolt> bolts, Map<String, Referenceable> nodeEntities) throws IllegalAccessException {
        for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
            Referenceable boltInstance = this.createBoltInstance(entry.getKey(), entry.getValue());
            nodeEntities.put(entry.getKey(), boltInstance);
        }
    }

    private Referenceable createBoltInstance(String boltName, Bolt stormBolt) throws IllegalAccessException {
        Referenceable boltReferenceable = new Referenceable(StormDataTypes.STORM_BOLT.getName(), new String[0]);
        boltReferenceable.set("name", (Object)boltName);
        Serializable instance = (Serializable)Utils.javaDeserialize((byte[])stormBolt.get_bolt_object().get_serialized_java(), Serializable.class);
        boltReferenceable.set("driverClass", (Object)instance.getClass().getName());
        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
        boltReferenceable.set("conf", flatConfigMap);
        return boltReferenceable;
    }

    private void addGraphConnections(StormTopology stormTopology, Map<String, Referenceable> nodeEntities) throws Exception {
        Map<String, Set<String>> adjacencyMap = StormTopologyUtil.getAdjacencyMap(stormTopology, true);
        for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) {
            String nodeName = entry.getKey();
            Set<String> adjacencyList = adjacencyMap.get(nodeName);
            if (adjacencyList == null || adjacencyList.isEmpty()) continue;
            Referenceable node = nodeEntities.get(nodeName);
            ArrayList<String> outputs = new ArrayList<String>(adjacencyList.size());
            outputs.addAll(adjacencyList);
            node.set("outputs", outputs);
            for (String adjacentNodeName : adjacencyList) {
                Referenceable adjacentNode = nodeEntities.get(adjacentNodeName);
                ArrayList<String> inputs = (ArrayList<String>)adjacentNode.get("inputs");
                if (inputs == null) {
                    inputs = new ArrayList<String>();
                }
                inputs.add(nodeName);
                adjacentNode.set("inputs", inputs);
            }
        }
    }

    public static String getKafkaTopicQualifiedName(String clusterName, String topicName) {
        return String.format("%s@%s", topicName, clusterName);
    }

    public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) {
        return String.format("%s.%s@%s", nameSpace, tableName, clusterName);
    }

    private String getClusterName(Map stormConf) {
        return atlasProperties.getString("atlas.cluster.name", "primary");
    }
}

