/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.atlas.hook;

import com.sun.jersey.api.client.ClientResponse;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NotificationSender {
    private static final Logger logger = LoggerFactory.getLogger(NotificationSender.class);
    private NiFiAtlasClient atlasClient;
    private final Map<String, String> guidToTypedQualifiedName;
    private final Map<String, Referenceable> typedQualifiedNameToRef;

    private static <K, V> Map<K, V> createCache(final int maxSize) {
        return new LinkedHashMap<K, V>(maxSize, 0.75f, true){

            @Override
            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
                return this.size() > maxSize;
            }
        };
    }

    NotificationSender() {
        int qualifiedNameCacheSize = 10000;
        this.guidToTypedQualifiedName = NotificationSender.createCache(10000);
        int dataSetRefCacheSize = 1000;
        this.typedQualifiedNameToRef = NotificationSender.createCache(1000);
    }

    void setAtlasClient(NiFiAtlasClient atlasClient) {
        this.atlasClient = atlasClient;
    }

    private Predicate<Referenceable> distinctReferenceable() {
        HashSet keys = new HashSet();
        return r -> {
            String key = AtlasUtils.toTypedQualifiedName(r.getTypeName(), (String)r.get("qualifiedName"));
            return keys.add(key);
        };
    }

    private <K, V> List<V> safeGet(Map<K, List<V>> map, K key) {
        return map.computeIfAbsent(key, k -> Collections.emptyList());
    }

    private void mergeRefs(Referenceable r1, Referenceable r2) {
        r1.set("inputs", this.mergeRefs((Collection)r1.get("inputs"), (Collection)r2.get("inputs")));
        r1.set("outputs", this.mergeRefs((Collection)r1.get("outputs"), (Collection)r2.get("outputs")));
    }

    private Collection<Referenceable> mergeRefs(Collection<Referenceable> r1, Collection<Referenceable> r2) {
        boolean isR2Empty;
        boolean isR1Empty = r1 == null || r1.isEmpty();
        boolean bl = isR2Empty = r2 == null || r2.isEmpty();
        if (isR1Empty) {
            return r2;
        }
        if (isR2Empty) {
            return r1;
        }
        return Stream.concat(r1.stream(), r2.stream()).filter(this.distinctReferenceable()).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void send(List<HookNotification> messages, BiConsumer<List<HookNotification>, UserGroupInformation> notifier) {
        logger.info("Sending {} messages to Atlas", (Object)messages.size());
        Metrics metrics = new Metrics();
        try {
            metrics.totalMessages = messages.size();
            Map<Boolean, List<HookNotification>> createAndOthers = messages.stream().collect(Collectors.groupingBy(msg -> HookNotification.HookNotificationType.ENTITY_CREATE.equals((Object)msg.getType())));
            List<HookNotification> creates = this.safeGet(createAndOthers, true);
            metrics.createMessages = creates.size();
            Map<Boolean, List<Referenceable>> newFlowPathsAndOtherEntities = creates.stream().flatMap(msg -> ((HookNotificationV1.EntityCreateRequest)msg).getEntities().stream()).collect(Collectors.groupingBy(ref -> "nifi_flow_path".equals(ref.getTypeName())));
            List newEntitiesExceptFlowPaths = this.safeGet(newFlowPathsAndOtherEntities, false).stream().filter(this.distinctReferenceable()).collect(Collectors.toList());
            Collection<Referenceable> newFlowPaths = this.safeGet(newFlowPathsAndOtherEntities, true).stream().collect(Collectors.toMap(ref -> ref.get("qualifiedName"), ref -> ref, (r1, r2) -> {
                this.mergeRefs((Referenceable)r1, (Referenceable)r2);
                return r1;
            })).values();
            metrics.uniqueFlowPathCreates = newFlowPaths.size();
            metrics.uniqueOtherCreates = newEntitiesExceptFlowPaths.size();
            ArrayList<Object> newEntities = new ArrayList<Object>();
            newEntities.addAll(newEntitiesExceptFlowPaths);
            newEntities.addAll(newFlowPaths);
            if (!newEntities.isEmpty()) {
                notifier.accept(Collections.singletonList(new HookNotificationV1.EntityCreateRequest("nifi", newEntities)), null);
            }
            Map<Boolean, List<HookNotification>> partialNiFiFlowPathUpdateAndOthers = this.safeGet(createAndOthers, false).stream().collect(Collectors.groupingBy(msg -> HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE.equals((Object)msg.getType()) && "nifi_flow_path".equals(((HookNotificationV1.EntityPartialUpdateRequest)msg).getTypeName()) && "qualifiedName".equals(((HookNotificationV1.EntityPartialUpdateRequest)msg).getAttribute())));
            List<HookNotification> partialNiFiFlowPathUpdates = this.safeGet(partialNiFiFlowPathUpdateAndOthers, true);
            List<HookNotification> otherMessages = this.safeGet(partialNiFiFlowPathUpdateAndOthers, false);
            metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size();
            metrics.otherMessages = otherMessages.size();
            List deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotificationV1.EntityPartialUpdateRequest)msg).collect(Collectors.groupingBy(HookNotificationV1.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream().map(entry -> {
                Map<String, Referenceable> distinctOutputs;
                Map<String, Referenceable> distinctInputs;
                String flowPathGuid;
                String flowPathQualifiedName = (String)entry.getKey();
                try {
                    ++metrics.flowPathSearched;
                    AtlasEntity.AtlasEntityWithExtInfo flowPathExt = this.atlasClient.searchEntityDef(new AtlasObjectId("nifi_flow_path", "qualifiedName", (Object)flowPathQualifiedName));
                    AtlasEntity flowPathEntity = flowPathExt.getEntity();
                    flowPathGuid = flowPathEntity.getGuid();
                    distinctInputs = this.toReferenceables(flowPathEntity.getAttribute("inputs"), metrics);
                    distinctOutputs = this.toReferenceables(flowPathEntity.getAttribute("outputs"), metrics);
                }
                catch (AtlasServiceException e) {
                    if (ClientResponse.Status.NOT_FOUND.equals((Object)e.getStatus())) {
                        logger.debug("nifi_flow_path was not found for qualifiedName {}", (Object)flowPathQualifiedName);
                    } else {
                        logger.warn("Failed to retrieve nifi_flow_path with qualifiedName {} due to {}", new Object[]{flowPathQualifiedName, e, e});
                    }
                    return null;
                }
                for (HookNotificationV1.EntityPartialUpdateRequest msg : (List)entry.getValue()) {
                    this.fromReferenceable(msg.getEntity().get("inputs"), metrics).entrySet().stream().filter(ref -> !distinctInputs.containsKey(ref.getKey())).forEach(ref -> {
                        Referenceable cfr_ignored_0 = (Referenceable)distinctInputs.put((String)ref.getKey(), (Referenceable)ref.getValue());
                    });
                    this.fromReferenceable(msg.getEntity().get("outputs"), metrics).entrySet().stream().filter(ref -> !distinctOutputs.containsKey(ref.getKey())).forEach(ref -> {
                        Referenceable cfr_ignored_0 = (Referenceable)distinctOutputs.put((String)ref.getKey(), (Referenceable)ref.getValue());
                    });
                }
                Referenceable flowPathRef = new Referenceable(flowPathGuid, "nifi_flow_path", null);
                flowPathRef.set("inputs", new ArrayList<Referenceable>(distinctInputs.values()));
                flowPathRef.set("outputs", new ArrayList<Referenceable>(distinctOutputs.values()));
                return new HookNotificationV1.EntityPartialUpdateRequest("nifi", "nifi_flow_path", "qualifiedName", flowPathQualifiedName, flowPathRef);
            }).filter(Objects::nonNull).collect(Collectors.toList());
            metrics.uniquePartialNiFiFlowPathUpdates = deduplicatedMessages.size();
            notifier.accept(deduplicatedMessages, null);
            notifier.accept(otherMessages, null);
        }
        finally {
            logger.info(metrics.toLogString("Finished"));
        }
    }

    private Map<String, Referenceable> toReferenceables(Object _refs, Metrics metrics) {
        if (_refs == null) {
            return new HashMap<String, Referenceable>();
        }
        Collection refs = (Collection)_refs;
        return refs.stream().map(ref -> {
            String typedQualifiedName;
            String typeName = (String)ref.get("typeName");
            String guid = (String)ref.get("guid");
            if (this.guidToTypedQualifiedName.containsKey(guid)) {
                ++metrics.dataSetCacheHit;
            }
            if ((typedQualifiedName = this.guidToTypedQualifiedName.computeIfAbsent(guid, k -> {
                try {
                    ++metrics.dataSetSearched;
                    AtlasEntity.AtlasEntityWithExtInfo refExt = this.atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName));
                    String qualifiedName = (String)refExt.getEntity().getAttribute("qualifiedName");
                    String _typedQualifiedName = AtlasUtils.toTypedQualifiedName(typeName, qualifiedName);
                    this.typedQualifiedNameToRef.put(_typedQualifiedName, new Referenceable(guid, typeName, Collections.EMPTY_MAP));
                    return _typedQualifiedName;
                }
                catch (AtlasServiceException e) {
                    if (ClientResponse.Status.NOT_FOUND.equals((Object)e.getStatus())) {
                        logger.warn("{} entity was not found for guid {}", (Object)typeName, (Object)guid);
                    } else {
                        logger.warn("Failed to retrieve {} with guid {} due to {}", new Object[]{typeName, guid, e});
                    }
                    return null;
                }
            })) == null) {
                return null;
            }
            return new Tuple((Object)typedQualifiedName, (Object)this.typedQualifiedNameToRef.get(typedQualifiedName));
        }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null).collect(Collectors.toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> {
            logger.debug("Duplicated qualified name was found, use the new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue});
            return newValue;
        }));
    }

    private Map<String, Referenceable> fromReferenceable(Object _refs, Metrics metrics) {
        if (_refs == null) {
            return Collections.emptyMap();
        }
        Collection refs = (Collection)_refs;
        return refs.stream().map(ref -> {
            String typeName = ref.getTypeName();
            Id id = ref.getId();
            String refQualifiedName = (String)ref.get("qualifiedName");
            String typedRefQualifiedName = AtlasUtils.toTypedQualifiedName(typeName, refQualifiedName);
            Referenceable refFromCacheIfAvailable = this.typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
                if (this.isAssigned(id)) {
                    this.guidToTypedQualifiedName.put(id._getId(), typedRefQualifiedName);
                }
                return ref;
            });
            return new Tuple((Object)typedRefQualifiedName, (Object)refFromCacheIfAvailable);
        }).filter(tuple -> tuple.getValue() != null).collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
    }

    private boolean isAssigned(Id id) {
        try {
            UUID.fromString(id.getId());
        }
        catch (IllegalArgumentException e) {
            return false;
        }
        return true;
    }

    private class Metrics {
        final long startedAt = System.currentTimeMillis();
        int totalMessages;
        int createMessages;
        int uniqueFlowPathCreates;
        int uniqueOtherCreates;
        int partialNiFiFlowPathUpdates;
        int uniquePartialNiFiFlowPathUpdates;
        int otherMessages;
        int flowPathSearched;
        int dataSetSearched;
        int dataSetCacheHit;

        private Metrics() {
        }

        private String toLogString(String message) {
            return String.format("%s, %d ms passed, totalMessages=%d, createMessages=%d, uniqueFlowPathCreates=%d, uniqueOtherCreates=%d, partialNiFiFlowPathUpdates=%d, uniquePartialNiFiFlowPathUpdates=%d, otherMessage=%d, flowPathSearched=%d, dataSetSearched=%d, dataSetCacheHit=%s, guidToQualifiedName.size=%d, typedQualifiedNameToRef.size=%d", message, System.currentTimeMillis() - this.startedAt, this.totalMessages, this.createMessages, this.uniqueFlowPathCreates, this.uniqueOtherCreates, this.partialNiFiFlowPathUpdates, this.uniquePartialNiFiFlowPathUpdates, this.otherMessages, this.flowPathSearched, this.dataSetSearched, this.dataSetCacheHit, NotificationSender.this.guidToTypedQualifiedName.size(), NotificationSender.this.typedQualifiedNameToRef.size());
        }
    }
}

