package org.apache.atlas.kafka.bridge;

import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.kafka.model.KafkaDataTypes;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.common.security.JaasUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/atlas/kafka/bridge/KafkaBridge.class */
public class KafkaBridge {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class);
    private static final int EXIT_CODE_SUCCESS = 0;
    private static final int EXIT_CODE_FAILED = 1;
    private static final String ATLAS_ENDPOINT = "atlas.rest.address";
    private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
    private static final String KAFKA_CLUSTER_NAME = "atlas.cluster.name";
    private static final String DEFAULT_CLUSTER_NAME = "primary";
    private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
    private static final String DESCRIPTION_ATTR = "description";
    private static final String PARTITION_COUNT = "partitionCount";
    private static final String NAME = "name";
    private static final String URI = "uri";
    private static final String CLUSTERNAME = "clusterName";
    private static final String TOPIC = "topic";
    private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
    private static final String ZOOKEEPER_CONNECT = "atlas.kafka.zookeeper.connect";
    private static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS = "atlas.kafka.zookeeper.connection.timeout.ms";
    private static final String ZOOKEEPER_SESSION_TIMEOUT_MS = "atlas.kafka.zookeeper.session.timeout.ms";
    private static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";
    private static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10000;
    private final List<String> availableTopics;
    private final String clusterName;
    private final AtlasClientV2 atlasClientV2;
    private final ZkUtils zkUtils;

    public static void main(String[] strArr) {
        AtlasClientV2 atlasClientV2;
        int i = EXIT_CODE_FAILED;
        AtlasClientV2 atlasClientV22 = null;
        try {
            try {
                try {
                    Options options = new Options();
                    options.addOption("t", TOPIC, true, TOPIC);
                    options.addOption("f", "filename", true, "filename");
                    CommandLine parse = new BasicParser().parse(options, strArr);
                    String optionValue = parse.getOptionValue("t");
                    String optionValue2 = parse.getOptionValue("f");
                    Configuration configuration = ApplicationProperties.get();
                    String[] stringArray = configuration.getStringArray(ATLAS_ENDPOINT);
                    if (stringArray == null || stringArray.length == 0) {
                        stringArray = new String[]{DEFAULT_ATLAS_URL};
                    }
                    if (AuthenticationUtil.isKerberosAuthenticationEnabled()) {
                        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
                        atlasClientV2 = new AtlasClientV2(currentUser, currentUser.getShortUserName(), stringArray);
                    } else {
                        atlasClientV2 = new AtlasClientV2(stringArray, AuthenticationUtil.getBasicAuthenticationInput());
                    }
                    KafkaBridge kafkaBridge = new KafkaBridge(configuration, atlasClientV2);
                    if (StringUtils.isNotEmpty(optionValue2)) {
                        File file = new File(optionValue2);
                        if (file.exists() && file.canRead()) {
                            BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
                            while (true) {
                                String readLine = bufferedReader.readLine();
                                if (readLine == null) {
                                    break;
                                } else {
                                    kafkaBridge.importTopic(readLine.trim());
                                }
                            }
                            i = EXIT_CODE_SUCCESS;
                        } else {
                            LOG.error("Failed to read the file");
                        }
                    } else {
                        kafkaBridge.importTopic(optionValue);
                        i = EXIT_CODE_SUCCESS;
                    }
                    if (atlasClientV2 != null) {
                        atlasClientV2.close();
                    }
                } catch (Exception e) {
                    System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message");
                    e.printStackTrace();
                    LOG.error("ImportKafkaEntities failed", e);
                    if (EXIT_CODE_SUCCESS != 0) {
                        atlasClientV22.close();
                    }
                }
            } catch (ParseException e2) {
                LOG.error("Failed to parse arguments. Error: ", e2.getMessage());
                printUsage();
                if (EXIT_CODE_SUCCESS != 0) {
                    atlasClientV22.close();
                }
            }
            System.exit(i);
        } catch (Throwable th) {
            if (EXIT_CODE_SUCCESS != 0) {
                atlasClientV22.close();
            }
            throw th;
        }
    }

    public KafkaBridge(Configuration configuration, AtlasClientV2 atlasClientV2) throws Exception {
        String zKConnection = getZKConnection(configuration);
        ZkClient zkClient = new ZkClient(zKConnection, configuration.getInt(ZOOKEEPER_SESSION_TIMEOUT_MS, 10000), configuration.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, 10000), ZKStringSerializer$.MODULE$);
        this.atlasClientV2 = atlasClientV2;
        this.clusterName = configuration.getString(KAFKA_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
        this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zKConnection), JaasUtils.isZkSecurityEnabled());
        this.availableTopics = JavaConversions.seqAsJavaList(this.zkUtils.getAllTopics());
    }

    public void importTopic(String str) throws Exception {
        List<String> list = this.availableTopics;
        if (StringUtils.isNotEmpty(str)) {
            ArrayList arrayList = new ArrayList();
            for (String str2 : list) {
                if (Pattern.compile(str).matcher(str2).matches()) {
                    arrayList.add(str2);
                }
            }
            list = arrayList;
        }
        if (CollectionUtils.isNotEmpty(list)) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                createOrUpdateTopic(it.next());
            }
        }
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo createOrUpdateTopic(String str) throws Exception {
        AtlasEntity.AtlasEntityWithExtInfo updateEntityInAtlas;
        String topicQualifiedName = getTopicQualifiedName(this.clusterName, str);
        AtlasEntity.AtlasEntityWithExtInfo findTopicEntityInAtlas = findTopicEntityInAtlas(topicQualifiedName);
        if (findTopicEntityInAtlas == null) {
            System.out.println("Adding Kafka topic " + str);
            LOG.info("Importing Kafka topic: {}", topicQualifiedName);
            updateEntityInAtlas = createEntityInAtlas(new AtlasEntity.AtlasEntityWithExtInfo(getTopicEntity(str, null)));
        } else {
            System.out.println("Updating Kafka topic " + str);
            LOG.info("Kafka topic {} already exists in Atlas. Updating it..", topicQualifiedName);
            findTopicEntityInAtlas.setEntity(getTopicEntity(str, findTopicEntityInAtlas.getEntity()));
            updateEntityInAtlas = updateEntityInAtlas(findTopicEntityInAtlas);
        }
        return updateEntityInAtlas;
    }

    @VisibleForTesting
    AtlasEntity getTopicEntity(String str, AtlasEntity atlasEntity) {
        AtlasEntity atlasEntity2 = atlasEntity == null ? new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()) : atlasEntity;
        atlasEntity2.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getTopicQualifiedName(this.clusterName, str));
        atlasEntity2.setAttribute(CLUSTERNAME, this.clusterName);
        atlasEntity2.setAttribute(TOPIC, str);
        atlasEntity2.setAttribute(NAME, str);
        atlasEntity2.setAttribute(DESCRIPTION_ATTR, str);
        atlasEntity2.setAttribute(URI, str);
        atlasEntity2.setAttribute(PARTITION_COUNT, (Integer) this.zkUtils.getTopicPartitionCount(str).get());
        return atlasEntity2;
    }

    @VisibleForTesting
    static String getTopicQualifiedName(String str, String str2) {
        return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, str2.toLowerCase(), str);
    }

    private AtlasEntity.AtlasEntityWithExtInfo findTopicEntityInAtlas(String str) {
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo;
        try {
            atlasEntityWithExtInfo = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), str);
            clearRelationshipAttributes(atlasEntityWithExtInfo);
        } catch (Exception e) {
            atlasEntityWithExtInfo = EXIT_CODE_SUCCESS;
        }
        return atlasEntityWithExtInfo;
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo findEntityInAtlas(String str, String str2) throws Exception {
        return this.atlasClientV2.getEntityByAttribute(str, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, str2));
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo createEntityInAtlas(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) throws Exception {
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo2 = EXIT_CODE_SUCCESS;
        List createdEntities = this.atlasClientV2.createEntity(atlasEntityWithExtInfo).getCreatedEntities();
        if (CollectionUtils.isNotEmpty(createdEntities)) {
            atlasEntityWithExtInfo2 = this.atlasClientV2.getEntityByGuid(((AtlasEntityHeader) createdEntities.get(EXIT_CODE_SUCCESS)).getGuid());
            LOG.info("Created {} entity: name={}, guid={}", new Object[]{atlasEntityWithExtInfo2.getEntity().getTypeName(), atlasEntityWithExtInfo2.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), atlasEntityWithExtInfo2.getEntity().getGuid()});
        }
        return atlasEntityWithExtInfo2;
    }

    @VisibleForTesting
    AtlasEntity.AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) throws Exception {
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo2;
        EntityMutationResponse updateEntity = this.atlasClientV2.updateEntity(atlasEntityWithExtInfo);
        if (updateEntity != null) {
            List updatedEntities = updateEntity.getUpdatedEntities();
            if (CollectionUtils.isNotEmpty(updatedEntities)) {
                atlasEntityWithExtInfo2 = this.atlasClientV2.getEntityByGuid(((AtlasEntityHeader) updatedEntities.get(EXIT_CODE_SUCCESS)).getGuid());
                LOG.info("Updated {} entity: name={}, guid={} ", new Object[]{atlasEntityWithExtInfo2.getEntity().getTypeName(), atlasEntityWithExtInfo2.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), atlasEntityWithExtInfo2.getEntity().getGuid()});
            } else {
                LOG.info("Entity: name={} ", atlasEntityWithExtInfo.toString() + " not updated as it is unchanged from what is in Atlas");
                atlasEntityWithExtInfo2 = atlasEntityWithExtInfo;
            }
        } else {
            LOG.info("Entity: name={} ", atlasEntityWithExtInfo.toString() + " not updated as it is unchanged from what is in Atlas");
            atlasEntityWithExtInfo2 = atlasEntityWithExtInfo;
        }
        return atlasEntityWithExtInfo2;
    }

    private static void printUsage() {
        System.out.println("Usage 1: import-kafka.sh");
        System.out.println("Usage 2: import-kafka.sh [-t <topic regex> OR --topic <topic regex>]");
        System.out.println("Usage 3: import-kafka.sh [-f <filename>]");
        System.out.println("   Format:");
        System.out.println("        topic1 OR topic1 regex");
        System.out.println("        topic2 OR topic2 regex");
        System.out.println("        topic3 OR topic3 regex");
    }

    private void clearRelationshipAttributes(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        if (atlasEntityWithExtInfo != null) {
            clearRelationshipAttributes(atlasEntityWithExtInfo.getEntity());
            if (atlasEntityWithExtInfo.getReferredEntities() != null) {
                clearRelationshipAttributes(atlasEntityWithExtInfo.getReferredEntities().values());
            }
        }
    }

    private void clearRelationshipAttributes(Collection<AtlasEntity> collection) {
        if (collection != null) {
            Iterator<AtlasEntity> it = collection.iterator();
            while (it.hasNext()) {
                clearRelationshipAttributes(it.next());
            }
        }
    }

    private void clearRelationshipAttributes(AtlasEntity atlasEntity) {
        if (atlasEntity == null || atlasEntity.getRelationshipAttributes() == null) {
            return;
        }
        atlasEntity.getRelationshipAttributes().clear();
    }

    private String getStringValue(String[] strArr) {
        String str = EXIT_CODE_SUCCESS;
        int length = strArr.length;
        for (int i = EXIT_CODE_SUCCESS; i < length; i += EXIT_CODE_FAILED) {
            String str2 = strArr[i];
            str = str == null ? str2 : str + "," + str2;
        }
        return str;
    }

    private String getZKConnection(Configuration configuration) {
        String stringValue = getStringValue(configuration.getStringArray(ZOOKEEPER_CONNECT));
        if (StringUtils.isEmpty(stringValue)) {
            stringValue = DEFAULT_ZOOKEEPER_CONNECT;
        }
        return stringValue;
    }
}
