/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.manager.agent;

import jakarta.persistence.EntityManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.persistence.PersistenceService;
import org.openremote.container.timer.TimerService;
import org.openremote.manager.agent.AgentResourceImpl;
import org.openremote.manager.asset.AssetProcessingException;
import org.openremote.manager.asset.AssetProcessingService;
import org.openremote.manager.asset.AssetStorageService;
import org.openremote.manager.event.ClientEventService;
import org.openremote.manager.gateway.GatewayService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.manager.web.ManagerWebService;
import org.openremote.model.Container;
import org.openremote.model.ContainerService;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.Asset;
import org.openremote.model.asset.AssetFilter;
import org.openremote.model.asset.AssetTreeNode;
import org.openremote.model.asset.agent.Agent;
import org.openremote.model.asset.agent.AgentLink;
import org.openremote.model.asset.agent.ConnectionStatus;
import org.openremote.model.asset.agent.Protocol;
import org.openremote.model.attribute.Attribute;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.attribute.AttributeMap;
import org.openremote.model.attribute.AttributeRef;
import org.openremote.model.attribute.AttributeWriteFailure;
import org.openremote.model.protocol.ProtocolAssetDiscovery;
import org.openremote.model.protocol.ProtocolAssetImport;
import org.openremote.model.protocol.ProtocolAssetService;
import org.openremote.model.protocol.ProtocolInstanceDiscovery;
import org.openremote.model.query.AssetQuery;
import org.openremote.model.query.filter.AttributePredicate;
import org.openremote.model.query.filter.NameValuePredicate;
import org.openremote.model.query.filter.RealmPredicate;
import org.openremote.model.query.filter.StringPredicate;
import org.openremote.model.query.filter.ValuePredicate;
import org.openremote.model.util.Pair;
import org.openremote.model.util.TextUtil;
import org.openremote.model.value.MetaItemType;
import org.openremote.model.value.NameHolder;

public class AgentService
extends RouteBuilder
implements ContainerService {
    private static final Logger LOG = Logger.getLogger(AgentService.class.getName());
    public static final int PRIORITY = 2147482747;
    protected AssetProcessingService assetProcessingService;
    protected AssetStorageService assetStorageService;
    protected ClientEventService clientEventService;
    protected GatewayService gatewayService;
    protected ExecutorService executorService;
    protected Map<String, Agent<?, ?, ?>> agentMap;
    protected final Map<String, Future<Void>> agentDiscoveryImportFutureMap = new ConcurrentHashMap<String, Future<Void>>();
    protected final Map<String, Protocol<?>> protocolInstanceMap = new ConcurrentHashMap();
    protected final Map<String, Set<Consumer<PersistenceEvent<Asset<?>>>>> childAssetSubscriptions = new ConcurrentHashMap();
    protected boolean initDone;
    protected Container container;
    protected final Object agentLock = new Object();

    public int getPriority() {
        return 2147482747;
    }

    public void init(Container container) throws Exception {
        this.container = container;
        this.assetProcessingService = (AssetProcessingService)container.getService(AssetProcessingService.class);
        this.assetStorageService = (AssetStorageService)container.getService(AssetStorageService.class);
        this.clientEventService = (ClientEventService)container.getService(ClientEventService.class);
        this.gatewayService = (GatewayService)container.getService(GatewayService.class);
        this.executorService = container.getExecutor();
        if (this.initDone) {
            return;
        }
        ((ManagerWebService)container.getService(ManagerWebService.class)).addApiSingleton((Object)new AgentResourceImpl((TimerService)container.getService(TimerService.class), (ManagerIdentityService)container.getService(ManagerIdentityService.class), this.assetStorageService, this, container.getExecutor()));
        this.assetProcessingService.addEventInterceptor(this::onAttributeEventIntercepted);
        this.clientEventService.addSubscription(AttributeEvent.class, new AssetFilter().setAssetClasses(Collections.singletonList(Agent.class)), this::onAgentAttributeEvent);
        this.initDone = true;
    }

    public void start(Container container) throws Exception {
        ((MessageBrokerService)container.getService(MessageBrokerService.class)).getContext().addRoutes((RoutesBuilder)this);
        LOG.fine("Loading agents...");
        Collection<Agent<?, ?, ?>> agents = this.getAgents().values();
        LOG.fine("Found agent count = " + agents.size());
        agents.forEach(this::doAgentInit);
    }

    public void stop(Container container) throws Exception {
        if (this.agentMap != null) {
            ArrayList agents = new ArrayList(this.agentMap.values());
            agents.forEach(agent -> this.undeployAgent(agent.getId()));
            this.agentMap.clear();
        }
        this.protocolInstanceMap.clear();
    }

    public void configure() throws Exception {
        this.from("seda://PersistenceTopic?multipleConsumers=true&concurrentConsumers=1&waitForTaskToComplete=NEVER&purgeWhenStopping=true&discardIfNoConsumers=true&size=25000").routeId("Persistence-Agent").filter(PersistenceService.isPersistenceEventForEntityType(Asset.class)).filter(GatewayService.isNotForGateway(this.gatewayService)).process(exchange -> {
            PersistenceEvent persistenceEvent = (PersistenceEvent)exchange.getIn().getBody(PersistenceEvent.class);
            if (PersistenceService.isPersistenceEventForEntityType(Agent.class).matches(exchange)) {
                PersistenceEvent agentEvent = persistenceEvent;
                this.processAgentChange(agentEvent);
            } else {
                this.processAssetChange(persistenceEvent);
            }
        });
    }

    protected void processAgentChange(PersistenceEvent<Agent<?, ?, ?>> persistenceEvent) {
        LOG.finest("Processing agent persistence event: " + String.valueOf(persistenceEvent.getCause()));
        Agent agent = (Agent)persistenceEvent.getEntity();
        switch (persistenceEvent.getCause()) {
            case CREATE: 
            case UPDATE: {
                this.deployAgent(agent);
                break;
            }
            case DELETE: {
                this.undeployAgent(agent.getId());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void deployAgent(Agent<?, ?, ?> agent) {
        Object object = this.agentLock;
        synchronized (object) {
            this.undeployAgent(agent.getId());
            agent = this.addAgent(agent);
            if (agent == null) {
                return;
            }
            this.doAgentInit(agent);
        }
    }

    protected void processAssetChange(PersistenceEvent<Asset<?>> persistenceEvent) {
        LOG.finest("Processing asset persistence event: " + String.valueOf(persistenceEvent.getCause()));
        Asset asset = (Asset)persistenceEvent.getEntity();
        switch (persistenceEvent.getCause()) {
            case CREATE: {
                this.getGroupedAgentLinkAttributes(asset.getAttributes().stream(), attribute -> true).forEach((agent, attributes) -> this.linkAttributes((Agent<?, ?, ?>)agent, asset.getId(), (Collection<Attribute<?>>)attributes));
                break;
            }
            case UPDATE: {
                if (!persistenceEvent.hasPropertyChanged("attributes")) {
                    return;
                }
                List oldLinkedAttributes = ((AttributeMap)persistenceEvent.getPreviousState("attributes")).stream().filter(attr -> attr.hasMeta(MetaItemType.AGENT_LINK)).collect(Collectors.toList());
                List newLinkedAttributes = ((AttributeMap)persistenceEvent.getCurrentState("attributes")).stream().filter(attr -> attr.hasMeta(MetaItemType.AGENT_LINK)).collect(Collectors.toList());
                List obsoleteOrModified = Attribute.getAddedOrModifiedAttributes(newLinkedAttributes, oldLinkedAttributes).toList();
                this.getGroupedAgentLinkAttributes(obsoleteOrModified.stream(), attribute -> true).forEach((agent, attributes) -> this.unlinkAttributes(agent.getId(), asset.getId(), (List<Attribute<?>>)attributes));
                this.getGroupedAgentLinkAttributes(newLinkedAttributes.stream().filter(attr -> !oldLinkedAttributes.contains(attr) || obsoleteOrModified.contains(attr)), attribute -> true).forEach((agent, attributes) -> this.linkAttributes((Agent<?, ?, ?>)agent, asset.getId(), (Collection<Attribute<?>>)attributes));
                break;
            }
            case DELETE: {
                this.getGroupedAgentLinkAttributes(asset.getAttributes().stream(), attribute -> true).forEach((agent, attributes) -> this.unlinkAttributes(agent.getId(), asset.getId(), (List<Attribute<?>>)attributes));
            }
        }
        this.notifyAgentAncestor(asset, persistenceEvent);
    }

    protected void notifyAgentAncestor(Asset<?> asset, PersistenceEvent<Asset<?>> persistenceEvent) {
        Asset<?> parentAsset;
        String parentId = asset.getParentId();
        if (asset instanceof Agent || parentId == null) {
            return;
        }
        String ancestorAgentId = null;
        if (this.agentMap.containsKey(parentId)) {
            ancestorAgentId = parentId;
        } else if (asset.getPath() == null && (parentAsset = this.assetStorageService.find(parentId)) != null && parentAsset.getPath() != null) {
            ancestorAgentId = Arrays.stream(parentAsset.getPath()).filter(assetId -> this.getAgents().containsKey(assetId)).findFirst().orElse(null);
        }
        if (ancestorAgentId != null) {
            this.notifyChildAssetChange(ancestorAgentId, persistenceEvent);
        }
    }

    protected void sendAttributeEvent(AttributeEvent event) {
        this.assetProcessingService.sendAttributeEvent(event, ((Object)((Object)this)).getClass().getSimpleName());
    }

    protected void doAgentInit(Agent<?, ?, ?> agent) {
        boolean isDisabled = agent.isDisabled().orElse(false);
        if (isDisabled) {
            LOG.fine("Agent is disabled so not starting: " + String.valueOf(agent));
            this.sendAttributeEvent(new AttributeEvent(agent.getId(), Agent.STATUS.getName(), (Object)ConnectionStatus.DISABLED));
        } else {
            this.executorService.execute(() -> this.startAgent(agent));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startAgent(Agent<?, ?, ?> agent) {
        Object object = this.agentLock;
        synchronized (object) {
            Protocol protocol = null;
            try {
                protocol = agent.getProtocolInstance();
                protocol.setAssetService((ProtocolAssetService)new AgentProtocolAssetService(agent));
                LOG.fine("Starting protocol instance: " + String.valueOf(protocol));
                protocol.start(this.container);
                this.protocolInstanceMap.put(agent.getId(), protocol);
                LOG.fine("Started protocol instance: " + String.valueOf(protocol));
                LOG.finest("Linking attributes to protocol instance: " + String.valueOf(protocol));
                List<Asset<?>> assets = this.assetStorageService.findAll(new AssetQuery().attributes(new AttributePredicate[]{new AttributePredicate().meta(new NameValuePredicate[]{new NameValuePredicate((NameHolder)MetaItemType.AGENT_LINK, (ValuePredicate)new StringPredicate(agent.getId()), false, new NameValuePredicate.Path(new String[]{"id"}))})}));
                LOG.finest("Found '" + assets.size() + "' asset(s) with attributes linked to this protocol instance: " + String.valueOf(protocol));
                assets.forEach(asset -> this.getGroupedAgentLinkAttributes(asset.getAttributes().stream(), assetAttribute -> assetAttribute.getMetaValue(MetaItemType.AGENT_LINK).map(agentLink -> agentLink.getId().equals(agent.getId())).orElse(false)).forEach((agnt, attributes) -> this.linkAttributes((Agent<?, ?, ?>)agnt, asset.getId(), (Collection<Attribute<?>>)attributes)));
            }
            catch (Exception e) {
                if (protocol != null) {
                    try {
                        protocol.stop(this.container);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                this.protocolInstanceMap.remove(agent.getId());
                LOG.log(Level.SEVERE, "Failed to start protocol '" + String.valueOf(protocol) + "': " + String.valueOf(agent) + " msg=" + e.getMessage());
                this.sendAttributeEvent(new AttributeEvent(agent.getId(), Agent.STATUS.getName(), (Object)ConnectionStatus.ERROR));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void undeployAgent(String agentId) {
        Object object = this.agentLock;
        synchronized (object) {
            this.removeAgent(agentId);
            Protocol<?> protocol = this.protocolInstanceMap.get(agentId);
            if (protocol == null) {
                return;
            }
            Map groupedAttributes = protocol.getLinkedAttributes().entrySet().stream().collect(Collectors.groupingBy(entry -> ((AttributeRef)entry.getKey()).getId(), Collectors.mapping(Map.Entry::getValue, Collectors.toList())));
            groupedAttributes.forEach((assetId, linkedAttributes) -> this.unlinkAttributes(agentId, (String)assetId, (List<Attribute<?>>)linkedAttributes));
            try {
                protocol.stop(this.container);
            }
            catch (Exception e) {
                LOG.log(Level.SEVERE, "Protocol instance threw an exception whilst being stopped", e);
            }
            this.childAssetSubscriptions.remove(agentId);
            this.protocolInstanceMap.remove(agentId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void linkAttributes(Agent<?, ?, ?> agent, String assetId, Collection<Attribute<?>> attributes) {
        Protocol<?> protocol = this.getProtocolInstance(agent.getId());
        if (protocol == null) {
            return;
        }
        Protocol<?> protocol2 = protocol;
        synchronized (protocol2) {
            LOG.fine("Linking asset '" + assetId + "' attributes linked to protocol: assetId=" + assetId + ", attributes=" + attributes.size() + ", protocol=" + String.valueOf(protocol));
            attributes.forEach(attribute -> {
                AttributeRef attributeRef = new AttributeRef(assetId, attribute.getName());
                try {
                    if (!protocol.getLinkedAttributes().containsKey(attributeRef)) {
                        LOG.finest("Linking attribute '" + String.valueOf(attributeRef) + "' to protocol: " + String.valueOf(protocol));
                        protocol.linkAttribute(assetId, attribute);
                    }
                }
                catch (Exception ex) {
                    LOG.log(Level.SEVERE, "Failed to link attribute '" + String.valueOf(attributeRef) + "' to protocol: " + String.valueOf(protocol) + " msg=" + ex.getMessage());
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unlinkAttributes(String agentId, String assetId, List<Attribute<?>> attributes) {
        Protocol<?> protocol = this.getProtocolInstance(agentId);
        if (protocol == null) {
            return;
        }
        Protocol<?> protocol2 = protocol;
        synchronized (protocol2) {
            LOG.fine("Unlinking asset '" + assetId + "' attributes linked to protocol: assetId=" + assetId + ", attributes=" + attributes.size() + ", protocol=" + String.valueOf(protocol));
            attributes.forEach(attribute -> {
                try {
                    AttributeRef attributeRef = new AttributeRef(assetId, attribute.getName());
                    if (protocol.getLinkedAttributes().containsKey(attributeRef)) {
                        LOG.finest("Unlinking attribute '" + String.valueOf(attributeRef) + "' to protocol: " + String.valueOf(protocol));
                        protocol.unlinkAttribute(assetId, attribute);
                    }
                }
                catch (Exception ex) {
                    LOG.log(Level.SEVERE, "Ignoring error on unlinking attribute '" + String.valueOf(attribute) + "' from protocol: " + String.valueOf(protocol), ex);
                }
            });
        }
    }

    protected boolean onAttributeEventIntercepted(EntityManager em, AttributeEvent event) throws AssetProcessingException {
        if (((Object)((Object)this)).getClass().getSimpleName().equals(event.getSource())) {
            return false;
        }
        Optional agentLinkOptional = event.getMetaValue(MetaItemType.AGENT_LINK);
        return agentLinkOptional.map(agentLink -> {
            LOG.finest("Attribute event for agent linked attribute: agent=" + agentLink.getId() + ", ref=" + String.valueOf(event.getRef()));
            if (event.isOutdated()) {
                return true;
            }
            Protocol<?> protocolInstance = this.getProtocolInstance(agentLink.getId());
            if (protocolInstance == null) {
                throw new AssetProcessingException(AttributeWriteFailure.CANNOT_PROCESS, "Agent protocol instance not found, agent may be disabled or has been deleted: attributeRef=" + String.valueOf(event.getRef()) + ", agentLink=" + String.valueOf(agentLink));
            }
            try {
                protocolInstance.processLinkedAttributeWrite(event);
            }
            catch (Exception e) {
                AttributeWriteFailure failure = AttributeWriteFailure.UNKNOWN;
                String msg = e.getMessage();
                if (e instanceof AssetProcessingException) {
                    AssetProcessingException assetProcessingException = (AssetProcessingException)e;
                    failure = assetProcessingException.getReason();
                }
                throw new AssetProcessingException(failure, "An exception occurred whilst the protocol was trying to process the attribute write request: agentLink=" + String.valueOf(agentLink) + ", msg=" + msg);
            }
            return true;
        }).orElse(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onAgentAttributeEvent(AttributeEvent event) {
        Object object = this.agentLock;
        synchronized (object) {
            Agent<?, ?, ?> agent = this.getAgent(event.getId());
            if (agent == null) {
                return;
            }
            boolean eventOutdated = agent.getAttribute(event.getName()).flatMap(Attribute::getTimestamp).map(timestamp -> event.getTimestamp() <= timestamp).orElse(true);
            if (eventOutdated) {
                return;
            }
            if (AssetStorageService.class.getSimpleName().equals(event.getSource())) {
                return;
            }
            agent.getAttribute(event.getName()).ifPresent(attr -> attr.setValue(event.getValue().orElse(null), event.getTimestamp()));
            Protocol<?> protocolInstance = this.getProtocolInstance(agent.getId());
            if (protocolInstance == null) {
                if (Agent.DISABLED.getName().equals(event.getName())) {
                    this.deployAgent(agent);
                }
                return;
            }
            LOG.finer("Notifying protocol instance of an event for one of its agent attributes: " + String.valueOf(event.getRef()));
            if (protocolInstance.onAgentAttributeChanged(event)) {
                LOG.info("Protocol has requested recreation following agent attribute event: " + String.valueOf(event.getRef()));
                this.deployAgent(agent);
            }
        }
    }

    protected Map<Agent<?, ?, ?>, List<Attribute<?>>> getGroupedAgentLinkAttributes(Stream<Attribute<?>> attributes, Predicate<Attribute<?>> filter) {
        return attributes.filter(attribute -> attribute.getMetaValue(MetaItemType.AGENT_LINK).map(agentLink -> {
            if (!this.getAgents().containsKey(agentLink.getId())) {
                LOG.finest("Agent linked attribute, agent not found or this is a gateway asset: " + String.valueOf(attribute));
                return false;
            }
            return true;
        }).orElse(false)).filter(filter).map(attribute -> new Pair((Object)attribute.getMetaValue(MetaItemType.AGENT_LINK).map(AgentLink::getId).map(agentId -> this.getAgents().get(agentId)).orElse(null), attribute)).filter(agentAttribute -> agentAttribute.key != null).collect(Collectors.groupingBy(agentAttribute -> (Agent)agentAttribute.key, Collectors.collectingAndThen(Collectors.toList(), agentAttribute -> agentAttribute.stream().map(item -> (Attribute)item.value).collect(Collectors.toList()))));
    }

    public String toString() {
        return ((Object)((Object)this)).getClass().getSimpleName() + "{}";
    }

    protected Agent<?, ?, ?> addAgent(Agent<?, ?, ?> agent) {
        if (agent.getPath() == null || agent.getPath().length > 1 && agent.getParentId() == null) {
            LOG.fine("Agent is not fully loaded so retrieving the agent from the DB: " + agent.getId());
            Agent loadedAgent = this.assetStorageService.find(agent.getId(), true, Agent.class);
            if (loadedAgent == null) {
                LOG.fine("Agent not found in the DB, maybe it has been removed: " + agent.getId());
                return null;
            }
            agent = loadedAgent;
        }
        this.getAgents().put(agent.getId(), agent);
        return agent;
    }

    protected boolean removeAgent(String agentId) {
        return this.getAgents().remove(agentId) != null;
    }

    public Agent<?, ?, ?> getAgent(String agentId) {
        return this.getAgents().get(agentId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Map<String, Agent<?, ?, ?>> getAgents() {
        Object object = this.agentLock;
        synchronized (object) {
            if (this.agentMap == null) {
                this.agentMap = this.assetStorageService.findAll(new AssetQuery().types(Agent.class)).stream().filter(asset -> this.gatewayService.getLocallyRegisteredGatewayId(asset.getId(), null) == null).collect(Collectors.toConcurrentMap(Asset::getId, agent -> (Agent)agent));
            }
            return this.agentMap;
        }
    }

    public Protocol<?> getProtocolInstance(Agent<?, ?, ?> agent) {
        return this.getProtocolInstance(agent.getId());
    }

    public Protocol<?> getProtocolInstance(String agentId) {
        return this.protocolInstanceMap.get(agentId);
    }

    protected void notifyChildAssetChange(String agentId, PersistenceEvent<Asset<?>> assetPersistenceEvent) {
        this.childAssetSubscriptions.computeIfPresent(agentId, (id, consumers) -> {
            LOG.finest("Notifying child asset change consumers of change to agent child asset: Agent ID=" + id + ", Asset<?> ID=" + ((Asset)assetPersistenceEvent.getEntity()).getId());
            try {
                consumers.forEach(consumer -> consumer.accept(assetPersistenceEvent));
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Child asset change consumer threw an exception: Agent ID=" + id + ", Asset<?> ID=" + ((Asset)assetPersistenceEvent.getEntity()).getId(), e);
            }
            return consumers;
        });
    }

    public boolean isProtocolAssetDiscoveryOrImportRunning(String agentId) {
        return this.agentDiscoveryImportFutureMap.containsKey(agentId);
    }

    public Future<Void> doProtocolInstanceDiscovery(String parentId, Class<? extends ProtocolInstanceDiscovery> instanceDiscoveryProviderClass, Consumer<Agent<?, ?, ?>[]> onDiscovered) {
        LOG.fine("Initiating protocol instance discovery: Provider = " + String.valueOf(instanceDiscoveryProviderClass));
        Runnable task = () -> {
            if (parentId != null && this.gatewayService.getLocallyRegisteredGatewayId(parentId, null) != null) {
                return;
            }
            try {
                ProtocolInstanceDiscovery instanceDiscovery = (ProtocolInstanceDiscovery)instanceDiscoveryProviderClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                Future discoveryFuture = instanceDiscovery.startInstanceDiscovery(onDiscovered);
                discoveryFuture.get();
            }
            catch (InterruptedException e) {
                LOG.fine("Protocol instance discovery was cancelled");
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Failed to do protocol instance discovery: Provider = " + String.valueOf(instanceDiscoveryProviderClass), e);
            }
            finally {
                LOG.fine("Finished protocol instance discovery: Provider = " + String.valueOf(instanceDiscoveryProviderClass));
            }
        };
        return this.executorService.submit(task, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Future<Void> doProtocolAssetDiscovery(Agent<?, ?, ?> agent, Consumer<AssetTreeNode[]> onDiscovered) throws RuntimeException {
        Protocol<?> protocol = this.getProtocolInstance(agent.getId());
        if (protocol == null) {
            throw new UnsupportedOperationException("Agent is either invalid, disabled or mis-configured: " + String.valueOf(agent));
        }
        if (!(protocol instanceof ProtocolAssetDiscovery)) {
            throw new UnsupportedOperationException("Agent protocol doesn't support asset discovery");
        }
        LOG.fine("Initiating protocol asset discovery: Agent = " + String.valueOf(agent));
        Map<String, Future<Void>> map = this.agentDiscoveryImportFutureMap;
        synchronized (map) {
            this.okToContinueWithImportOrDiscovery(agent.getId());
            Runnable task = () -> {
                try {
                    if (this.gatewayService.getLocallyRegisteredGatewayId(agent.getId(), null) != null) {
                        return;
                    }
                    ProtocolAssetDiscovery assetDiscovery = (ProtocolAssetDiscovery)protocol;
                    Future discoveryFuture = assetDiscovery.startAssetDiscovery(onDiscovered);
                    discoveryFuture.get();
                }
                catch (InterruptedException e) {
                    LOG.fine("Protocol asset discovery was cancelled");
                }
                catch (Exception e) {
                    LOG.log(Level.WARNING, "Failed to do protocol asset discovery: Agent = " + String.valueOf(agent), e);
                }
                finally {
                    LOG.fine("Finished protocol asset discovery: Agent = " + String.valueOf(agent));
                    this.agentDiscoveryImportFutureMap.remove(agent.getId());
                }
            };
            Future<Object> future = this.executorService.submit(task, null);
            this.agentDiscoveryImportFutureMap.put(agent.getId(), future);
            return future;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Future<Void> doProtocolAssetImport(Agent<?, ?, ?> agent, byte[] fileData, Consumer<AssetTreeNode[]> onDiscovered) throws RuntimeException {
        Protocol<?> protocol = this.getProtocolInstance(agent.getId());
        if (protocol == null) {
            throw new UnsupportedOperationException("Agent is either invalid, disabled or mis-configured: " + String.valueOf(agent));
        }
        if (!(protocol instanceof ProtocolAssetImport)) {
            throw new UnsupportedOperationException("Agent protocol doesn't support asset import");
        }
        LOG.fine("Initiating protocol asset import: Agent = " + String.valueOf(agent));
        Map<String, Future<Void>> map = this.agentDiscoveryImportFutureMap;
        synchronized (map) {
            this.okToContinueWithImportOrDiscovery(agent.getId());
            Runnable task = () -> {
                try {
                    if (this.gatewayService.getLocallyRegisteredGatewayId(agent.getId(), null) != null) {
                        return;
                    }
                    ProtocolAssetImport assetImport = (ProtocolAssetImport)protocol;
                    Future discoveryFuture = assetImport.startAssetImport(fileData, onDiscovered);
                    discoveryFuture.get();
                }
                catch (InterruptedException e) {
                    LOG.fine("Protocol asset import was cancelled");
                }
                catch (Exception e) {
                    LOG.log(Level.WARNING, "Failed to do protocol asset import: Agent = " + String.valueOf(agent), e);
                }
                finally {
                    LOG.fine("Finished protocol asset import: Agent = " + String.valueOf(agent));
                    this.agentDiscoveryImportFutureMap.remove(agent.getId());
                }
            };
            Future<Object> future = this.executorService.submit(task, null);
            this.agentDiscoveryImportFutureMap.put(agent.getId(), future);
            return future;
        }
    }

    protected void okToContinueWithImportOrDiscovery(String agentId) {
        if (this.agentDiscoveryImportFutureMap.containsKey(agentId)) {
            String msg = "Protocol asset discovery or import already running for requested agent: " + agentId;
            LOG.fine(msg);
            throw new IllegalStateException(msg);
        }
    }

    protected class AgentProtocolAssetService
    implements ProtocolAssetService {
        protected Agent<?, ?, ?> agent;

        public AgentProtocolAssetService(Agent<?, ?, ?> agent) {
            this.agent = agent;
        }

        public <T extends Asset<?>> T mergeAsset(T asset) {
            Objects.requireNonNull(asset.getId());
            if (TextUtil.isNullOrEmpty((String)asset.getRealm())) {
                asset.setRealm(this.agent.getRealm());
            } else if (!Objects.equals(asset.getRealm(), this.agent.getRealm())) {
                String msg = "Protocol attempting to merge asset into another realm: " + String.valueOf(this.agent);
                Protocol.LOG.warning(msg);
                throw new IllegalArgumentException(msg);
            }
            LOG.fine("Merging asset with protocol-provided: " + String.valueOf(asset));
            return AgentService.this.assetStorageService.merge(asset, true);
        }

        public boolean deleteAssets(String ... assetIds) {
            for (String assetId : assetIds) {
                Object asset = this.findAsset(assetId);
                if (asset == null || Objects.equals(asset.getRealm(), this.agent.getRealm())) continue;
                Protocol.LOG.warning("Protocol attempting to delete asset from another realm: " + String.valueOf(this.agent));
                throw new IllegalArgumentException("Protocol attempting to delete asset from another realm");
            }
            LOG.fine("Deleting protocol-provided: " + Arrays.toString(assetIds));
            return AgentService.this.assetStorageService.delete(Arrays.asList(assetIds), false);
        }

        public <T extends Asset<?>> T findAsset(String assetId) {
            LOG.fine("Getting protocol-provided: " + assetId);
            Asset<?> asset = AgentService.this.assetStorageService.find(assetId);
            if (asset != null && !Objects.equals(asset.getRealm(), this.agent.getRealm())) {
                Protocol.LOG.warning("Protocol attempting to find asset from another realm: " + String.valueOf(this.agent));
                throw new IllegalArgumentException("Protocol attempting to find asset from another realm");
            }
            return (T)asset;
        }

        public List<Asset<?>> findAssets(AssetQuery assetQuery) {
            List<Asset<?>> assets = AgentService.this.assetStorageService.findAll(assetQuery.realm(new RealmPredicate(this.agent.getRealm())));
            for (Asset<?> asset : assets) {
                if (Objects.equals(asset.getRealm(), this.agent.getRealm())) continue;
                Protocol.LOG.warning("Protocol attempting to find asset from another realm: " + String.valueOf(this.agent));
                throw new IllegalArgumentException("Protocol attempting to find asset from another realm");
            }
            return assets;
        }

        public void sendAttributeEvent(AttributeEvent attributeEvent) {
            if (TextUtil.isNullOrEmpty((String)attributeEvent.getRealm())) {
                attributeEvent.setRealm(this.agent.getRealm());
            } else if (!Objects.equals(attributeEvent.getRealm(), this.agent.getRealm())) {
                Protocol.LOG.warning("Protocol attempting to send attribute event to another realm: " + String.valueOf(this.agent));
                throw new IllegalArgumentException("Protocol attempting to send attribute event to another realm");
            }
            AgentService.this.sendAttributeEvent(attributeEvent);
        }

        public void subscribeChildAssetChange(Consumer<PersistenceEvent<Asset<?>>> assetChangeConsumer) {
            if (!AgentService.this.getAgents().containsKey(this.agent.getId())) {
                LOG.fine("Attempt to subscribe to child asset changes with an invalid agent ID: " + this.agent.getId());
                return;
            }
            Set consumers = AgentService.this.childAssetSubscriptions.computeIfAbsent(this.agent.getId(), id -> Collections.synchronizedSet(new HashSet()));
            consumers.add(assetChangeConsumer);
        }

        public void unsubscribeChildAssetChange(Consumer<PersistenceEvent<Asset<?>>> assetChangeConsumer) {
            AgentService.this.childAssetSubscriptions.computeIfPresent(this.agent.getId(), (id, consumers) -> {
                consumers.remove(assetChangeConsumer);
                return consumers.isEmpty() ? null : consumers;
            });
        }
    }
}

