/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.manager.asset;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import jakarta.persistence.EntityManager;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import org.apache.camel.RouteConfigurationsBuilder;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.builder.RouteConfigurationBuilder;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.persistence.PersistenceService;
import org.openremote.container.timer.TimerService;
import org.openremote.manager.agent.AgentService;
import org.openremote.manager.asset.AssetProcessingException;
import org.openremote.manager.asset.AssetStorageService;
import org.openremote.manager.asset.AttributeLinkingService;
import org.openremote.manager.asset.OutdatedAttributeEvent;
import org.openremote.manager.datapoint.AssetDatapointService;
import org.openremote.manager.event.AttributeEventInterceptor;
import org.openremote.manager.event.ClientEventService;
import org.openremote.manager.event.EventSubscriptionAuthorizer;
import org.openremote.manager.gateway.GatewayService;
import org.openremote.manager.rules.RulesService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.model.Container;
import org.openremote.model.ContainerService;
import org.openremote.model.asset.Asset;
import org.openremote.model.attribute.Attribute;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.attribute.AttributeWriteFailure;
import org.openremote.model.security.ClientRole;
import org.openremote.model.util.ValueUtil;
import org.openremote.model.value.MetaItemType;

public class AssetProcessingService
extends RouteBuilder
implements ContainerService {
    public static final String ATTRIBUTE_EVENT_ROUTE_CONFIG_ID = "attributeEvent";
    public static final int PRIORITY = 1000;
    public static final String ATTRIBUTE_EVENT_PROCESSOR = "direct://AttributeEventProcessor";
    private static final System.Logger LOG = System.getLogger(AssetProcessingService.class.getName());
    protected final List<AttributeEventInterceptor> eventInterceptors = new ArrayList<AttributeEventInterceptor>();
    protected TimerService timerService;
    protected ManagerIdentityService identityService;
    protected PersistenceService persistenceService;
    protected RulesService rulesService;
    protected AgentService agentService;
    protected GatewayService gatewayService;
    protected AssetStorageService assetStorageService;
    protected AssetDatapointService assetDatapointService;
    protected AttributeLinkingService assetAttributeLinkingService;
    protected MessageBrokerService messageBrokerService;
    protected ClientEventService clientEventService;
    protected long lastProcessedEventTimestamp = System.currentTimeMillis();
    protected ExecutorService executorService;
    protected MeterRegistry meterRegistry;
    protected Timer eventTimer;
    protected Map<String, Counter> eventCounters;

    public int getPriority() {
        return 1000;
    }

    public void init(Container container) throws Exception {
        this.timerService = (TimerService)container.getService(TimerService.class);
        this.identityService = (ManagerIdentityService)container.getService(ManagerIdentityService.class);
        this.persistenceService = (PersistenceService)container.getService(PersistenceService.class);
        this.rulesService = (RulesService)container.getService(RulesService.class);
        this.agentService = (AgentService)container.getService(AgentService.class);
        this.gatewayService = (GatewayService)container.getService(GatewayService.class);
        this.assetStorageService = (AssetStorageService)container.getService(AssetStorageService.class);
        this.assetDatapointService = (AssetDatapointService)container.getService(AssetDatapointService.class);
        this.assetAttributeLinkingService = (AttributeLinkingService)container.getService(AttributeLinkingService.class);
        this.messageBrokerService = (MessageBrokerService)container.getService(MessageBrokerService.class);
        this.clientEventService = (ClientEventService)container.getService(ClientEventService.class);
        this.executorService = container.getExecutor();
        EventSubscriptionAuthorizer assetEventAuthorizer = AssetStorageService.assetInfoAuthorizer(this.identityService, this.assetStorageService);
        this.clientEventService.addSubscriptionAuthorizer((requestedRealm, auth, subscription) -> {
            if (!subscription.isEventType(AttributeEvent.class)) {
                return false;
            }
            return assetEventAuthorizer.authorise(requestedRealm, auth, subscription);
        });
        this.clientEventService.addEventAuthorizer((requestedRealm, authContext, event) -> {
            Attribute attribute;
            if (!(event instanceof AttributeEvent)) {
                return false;
            }
            AttributeEvent attributeEvent = (AttributeEvent)event;
            if (authContext != null && authContext.isSuperUser()) {
                return true;
            }
            if (!this.identityService.getIdentityProvider().isRealmActiveAndAccessible(authContext, requestedRealm)) {
                LOG.log(System.Logger.Level.INFO, "Realm is inactive, inaccessible or nonexistent: " + requestedRealm);
                return false;
            }
            if (authContext != null && !authContext.hasResourceRoleOrIsSuperUser(ClientRole.WRITE_ATTRIBUTES.getValue(), "openremote")) {
                LOG.log(System.Logger.Level.DEBUG, "User doesn't have required role '" + String.valueOf(ClientRole.WRITE_ATTRIBUTES) + "': username=" + authContext.getUsername() + ", userRealm=" + authContext.getAuthenticatedRealmName());
                return false;
            }
            Asset<?> asset = this.assetStorageService.find(attributeEvent.getId());
            Attribute attribute2 = attribute = asset != null ? (Attribute)asset.getAttribute(attributeEvent.getName()).orElse(null) : null;
            if (asset == null || !asset.hasAttribute(attributeEvent.getName())) {
                LOG.log(System.Logger.Level.INFO, () -> "Cannot authorize asset event as asset and/or attribute doesn't exist: " + String.valueOf(attributeEvent.getRef()));
                return false;
            }
            if (!Objects.equals(requestedRealm, asset.getRealm())) {
                LOG.log(System.Logger.Level.INFO, () -> "Asset is not in the requested realm: requestedRealm=" + requestedRealm + ", ref=" + String.valueOf(attributeEvent.getRef()));
                return false;
            }
            if (authContext != null) {
                if (this.identityService.getIdentityProvider().isRestrictedUser(authContext)) {
                    if (!this.assetStorageService.isUserAsset(authContext.getUserId(), attributeEvent.getId())) {
                        LOG.log(System.Logger.Level.DEBUG, () -> "Restricted user is not linked to asset '" + attributeEvent.getId() + "': username=" + authContext.getUsername() + ", userRealm=" + authContext.getAuthenticatedRealmName());
                        return false;
                    }
                    if (attribute == null || !attribute.getMetaValue(MetaItemType.ACCESS_RESTRICTED_WRITE).orElse(false).booleanValue()) {
                        LOG.log(System.Logger.Level.DEBUG, () -> "Asset attribute doesn't support restricted write on '" + String.valueOf(attributeEvent.getRef()) + "': username=" + authContext.getUsername() + ", userRealm=" + authContext.getAuthenticatedRealmName());
                        return false;
                    }
                }
            } else if (attribute == null || !attribute.hasMeta(MetaItemType.ACCESS_PUBLIC_WRITE)) {
                LOG.log(System.Logger.Level.DEBUG, () -> "Asset doesn't support public write on '" + String.valueOf(attributeEvent.getRef()) + "': username=null");
                return false;
            }
            return true;
        });
        this.messageBrokerService.getContext().addRoutesConfigurations((RouteConfigurationsBuilder)new RouteConfigurationBuilder(this){

            public void configuration() throws Exception {
                this.routeConfiguration(AssetProcessingService.ATTRIBUTE_EVENT_ROUTE_CONFIG_ID).onException(new Class[]{IllegalStateException.class, RejectedExecutionException.class, AssetProcessingException.class}).handled(true).logExhausted(false).logStackTrace(false).process(exchange -> {
                    IllegalStateException illegalStateException;
                    Exception exception = (Exception)exchange.getProperty("CamelExceptionCaught", Exception.class);
                    if (exception instanceof RejectedExecutionException || exception instanceof IllegalStateException && "Queue full".equals((illegalStateException = (IllegalStateException)exception).getMessage())) {
                        exception = new AssetProcessingException(AttributeWriteFailure.QUEUE_FULL, "Queue for this message is full");
                    }
                    exchange.getMessage().setBody((Object)exception);
                    if (!LOG.isLoggable(System.Logger.Level.WARNING)) {
                        return;
                    }
                    Object body = exchange.getIn().getBody();
                    StringBuilder error = new StringBuilder("Route '").append(exchange.getFromRouteId()).append("' error processing message: ").append(body);
                    if (exception instanceof AssetProcessingException) {
                        AssetProcessingException processingException = (AssetProcessingException)exception;
                        if (processingException.getReason() == AttributeWriteFailure.ASSET_NOT_FOUND) {
                            LOG.log(System.Logger.Level.DEBUG, error::toString);
                        } else {
                            LOG.log(System.Logger.Level.WARNING, error::toString);
                        }
                    } else {
                        LOG.log(System.Logger.Level.WARNING, error::toString, (Throwable)exception);
                    }
                });
            }
        });
        this.messageBrokerService.getContext().addRoutes((RoutesBuilder)this);
        if (container.getMeterRegistry() != null) {
            this.meterRegistry = container.getMeterRegistry();
            this.eventCounters = new ConcurrentHashMap<String, Counter>();
            this.eventTimer = this.meterRegistry.timer("or.attributes", (Iterable)Tags.empty());
        }
    }

    public void start(Container container) throws Exception {
    }

    public void stop(Container container) throws Exception {
    }

    public void configure() throws Exception {
        this.from(ATTRIBUTE_EVENT_PROCESSOR).routeId("AttributeEvent-Processor").routeConfigurationId(ATTRIBUTE_EVENT_ROUTE_CONFIG_ID).threads().executorService(this.executorService).process(exchange -> {
            AttributeEvent event = (AttributeEvent)exchange.getIn().getBody(AttributeEvent.class);
            if (event.getId() == null || event.getId().isEmpty()) {
                throw new AssetProcessingException(AttributeWriteFailure.ASSET_ID_MISSING);
            }
            if (event.getName() == null || event.getName().isEmpty()) {
                throw new AssetProcessingException(AttributeWriteFailure.ATTRIBUTE_NAME_MISSING);
            }
            if (event.getTimestamp() <= 0L) {
                event.setTimestamp(this.timerService.getCurrentTimeMillis());
            } else if (event.getTimestamp() > this.timerService.getCurrentTimeMillis()) {
                event.setTimestamp(this.timerService.getCurrentTimeMillis());
            }
            LOG.log(System.Logger.Level.TRACE, () -> ">>> Attribute event processing start: " + String.valueOf(event));
            Counter counter = this.getEventCounter(event.getSource());
            if (counter != null) {
                counter.increment();
            }
            boolean processed = this.eventTimer != null ? this.eventTimer.record(() -> this.processAttributeEvent(event)) : this.processAttributeEvent(event);
            exchange.getIn().setBody((Object)processed);
        });
    }

    protected Counter getEventCounter(String source) {
        if (this.eventCounters == null) {
            return null;
        }
        String sourceStr = source == null ? "none" : source;
        return this.eventCounters.computeIfAbsent(sourceStr, eventSource -> this.meterRegistry.counter("or.attributes", (Iterable)Tags.of((String)"source", (String)eventSource)));
    }

    public void addEventInterceptor(AttributeEventInterceptor eventInterceptor) {
        this.eventInterceptors.add(eventInterceptor);
        this.eventInterceptors.sort(Comparator.comparingInt(AttributeEventInterceptor::getPriority));
    }

    public void sendAttributeEvent(AttributeEvent attributeEvent) {
        this.sendAttributeEvent(attributeEvent, null);
    }

    public void sendAttributeEvent(AttributeEvent attributeEvent, String source) {
        attributeEvent.setSource(source);
        if (attributeEvent.getTimestamp() <= 0L) {
            attributeEvent.setTimestamp(this.timerService.getCurrentTimeMillis());
        }
        this.messageBrokerService.getFluentProducerTemplate().withBody((Object)attributeEvent).to(ATTRIBUTE_EVENT_PROCESSOR).asyncSend();
    }

    protected boolean processAttributeEvent(AttributeEvent event) throws AssetProcessingException {
        return this.assetStorageService.withAssetLock(event.getId(), () -> {
            long startMillis;
            this.lastProcessedEventTimestamp = startMillis = System.currentTimeMillis();
            return (Boolean)this.persistenceService.doReturningTransaction(em -> {
                Asset<?> asset = this.assetStorageService.find((EntityManager)em, event.getId(), true);
                if (asset == null) {
                    throw new AssetProcessingException(AttributeWriteFailure.ASSET_NOT_FOUND, event.getId());
                }
                Attribute attribute = (Attribute)asset.getAttribute(event.getName()).orElseThrow(() -> new AssetProcessingException(AttributeWriteFailure.ATTRIBUTE_NOT_FOUND, event.getRef().toString()));
                Object value = event.getValue().map(eventValue -> {
                    Class attributeValueType = attribute.getTypeClass();
                    return ValueUtil.getValueCoerced((Object)eventValue, (Class)attributeValueType).orElseThrow(() -> {
                        String msg = "Event processing failed unable to coerce value into the correct value type: realm=" + event.getRealm() + ", attribute=" + String.valueOf(event.getRef()) + ", event value type=" + String.valueOf(eventValue.getClass()) + ", attribute value type=" + String.valueOf(attributeValueType);
                        return new AssetProcessingException(AttributeWriteFailure.INVALID_VALUE, msg);
                    });
                }).orElse(null);
                event.setValue(value);
                AttributeEvent enrichedEvent = new AttributeEvent(asset, attribute, event.getSource(), event.getValue().orElse(null), Long.valueOf(event.getTimestamp()), attribute.getValue().orElse(null), attribute.getTimestamp().orElse(0L));
                Set validationFailures = ValueUtil.validate((Object)enrichedEvent, (Class[])new Class[0]);
                if (!validationFailures.isEmpty()) {
                    String msg = "Event processing failed value failed constraint validation: realm=" + enrichedEvent.getRealm() + ", attribute=" + String.valueOf(enrichedEvent.getRef()) + ", event value type=" + enrichedEvent.getValue().map(v -> v.getClass().getName()).orElse("null") + ", attribute value type=" + String.valueOf(enrichedEvent.getTypeClass());
                    throw new AssetProcessingException(AttributeWriteFailure.INVALID_VALUE, msg);
                }
                String interceptorName = null;
                boolean intercepted = false;
                for (AttributeEventInterceptor interceptor : this.eventInterceptors) {
                    try {
                        intercepted = interceptor.intercept((EntityManager)em, enrichedEvent);
                    }
                    catch (AssetProcessingException ex) {
                        throw new AssetProcessingException(ex.getReason(), "Interceptor '" + String.valueOf(interceptor) + "' error=" + ex.getMessage());
                    }
                    catch (Throwable t) {
                        throw new AssetProcessingException(AttributeWriteFailure.INTERCEPTOR_FAILURE, "Interceptor '" + String.valueOf(interceptor) + "' uncaught exception error=" + t.getMessage(), t);
                    }
                    if (!intercepted) continue;
                    interceptorName = interceptor.getName();
                    break;
                }
                if (intercepted) {
                    LOG.log(System.Logger.Level.TRACE, "Event intercepted: interceptor=" + interceptorName + ", ref=" + String.valueOf(enrichedEvent.getRef()) + ", source=" + enrichedEvent.getSource());
                } else if (enrichedEvent.isOutdated()) {
                    LOG.log(System.Logger.Level.INFO, () -> "Event is older than current attribute value so marking as outdated: ref=" + String.valueOf(enrichedEvent.getRef()) + ", event=" + String.valueOf(Instant.ofEpochMilli(enrichedEvent.getTimestamp())) + ", previous=" + String.valueOf(Instant.ofEpochMilli(enrichedEvent.getOldValueTimestamp())));
                    this.clientEventService.publishEvent(new OutdatedAttributeEvent(enrichedEvent));
                } else if (!this.assetStorageService.updateAttributeValue((EntityManager)em, enrichedEvent)) {
                    throw new AssetProcessingException(AttributeWriteFailure.STATE_STORAGE_FAILED, "database update failed, no rows updated");
                }
                if (LOG.isLoggable(System.Logger.Level.DEBUG)) {
                    long processingMillis = System.currentTimeMillis() - startMillis;
                    LOG.log(System.Logger.Level.DEBUG, "<<< Attribute event processed in " + processingMillis + "ms: processor=" + Thread.currentThread().getName() + ", event=" + String.valueOf(enrichedEvent));
                }
                return true;
            });
        });
    }

    public String toString() {
        return ((Object)((Object)this)).getClass().getSimpleName() + "{}";
    }
}

