/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.milo.opcua.sdk.client.subscriptions;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
import org.eclipse.milo.opcua.sdk.client.api.UaSession;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UByte;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
import org.eclipse.milo.opcua.stack.core.types.structured.CreateSubscriptionResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.DataChangeNotification;
import org.eclipse.milo.opcua.stack.core.types.structured.EventFieldList;
import org.eclipse.milo.opcua.stack.core.types.structured.EventNotificationList;
import org.eclipse.milo.opcua.stack.core.types.structured.ModifySubscriptionResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemNotification;
import org.eclipse.milo.opcua.stack.core.types.structured.NotificationMessage;
import org.eclipse.milo.opcua.stack.core.types.structured.PublishRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.PublishResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.RepublishResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.RequestHeader;
import org.eclipse.milo.opcua.stack.core.types.structured.StatusChangeNotification;
import org.eclipse.milo.opcua.stack.core.types.structured.SubscriptionAcknowledgement;
import org.eclipse.milo.opcua.stack.core.util.ConversionUtil;
import org.eclipse.milo.opcua.stack.core.util.ExecutionQueue;
import org.jooq.lambda.tuple.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpcUaSubscriptionManager
implements UaSubscriptionManager {
    public static final UInteger DEFAULT_MAX_NOTIFICATIONS_PER_PUBLISH = Unsigned.uint(65535);
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Map<UInteger, OpcUaSubscription> subscriptions = Maps.newConcurrentMap();
    private final List<UaSubscriptionManager.SubscriptionListener> subscriptionListeners = Lists.newCopyOnWriteArrayList();
    private final ConcurrentMap<NodeId, AtomicLong> pendingCountMap = Maps.newConcurrentMap();
    private final List<SubscriptionAcknowledgement> acknowledgements = Lists.newArrayList();
    private final ExecutionQueue deliveryQueue;
    private final ExecutionQueue processingQueue;
    private final OpcUaClient client;

    public OpcUaSubscriptionManager(OpcUaClient client) {
        this.client = client;
        this.deliveryQueue = new ExecutionQueue(client.getConfig().getExecutor());
        this.processingQueue = new ExecutionQueue(client.getConfig().getExecutor());
        client.addSessionActivityListener(new SessionActivityListener(){

            @Override
            public void onSessionInactive(UaSession session) {
                OpcUaSubscriptionManager.this.pendingCountMap.replace(session.getSessionId(), new AtomicLong(0L));
            }
        });
    }

    @Override
    public CompletableFuture<UaSubscription> createSubscription(double requestedPublishingInterval) {
        UInteger maxKeepAliveCount = Unsigned.uint(Math.max(1, (int)Math.ceil(10000.0 / requestedPublishingInterval)));
        UInteger maxLifetimeCount = Unsigned.uint(maxKeepAliveCount.intValue() * 6);
        return this.createSubscription(requestedPublishingInterval, maxLifetimeCount, maxKeepAliveCount, DEFAULT_MAX_NOTIFICATIONS_PER_PUBLISH, true, Unsigned.ubyte(0));
    }

    @Override
    public CompletableFuture<UaSubscription> createSubscription(double requestedPublishingInterval, UInteger requestedLifetimeCount, UInteger requestedMaxKeepAliveCount, UInteger maxNotificationsPerPublish, boolean publishingEnabled, UByte priority) {
        CompletableFuture<CreateSubscriptionResponse> future = this.client.createSubscription(requestedPublishingInterval, requestedLifetimeCount, requestedMaxKeepAliveCount, maxNotificationsPerPublish, publishingEnabled, priority);
        return future.thenApply(response -> {
            OpcUaSubscription subscription = new OpcUaSubscription(this.client, response.getSubscriptionId(), response.getRevisedPublishingInterval(), response.getRevisedLifetimeCount(), response.getRevisedMaxKeepAliveCount(), maxNotificationsPerPublish, publishingEnabled, priority);
            this.subscriptions.put(subscription.getSubscriptionId(), subscription);
            this.maybeSendPublishRequests();
            return subscription;
        });
    }

    @Override
    public CompletableFuture<UaSubscription> modifySubscription(UInteger subscriptionId, double requestedPublishingInterval) {
        OpcUaSubscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription == null) {
            CompletableFuture<UaSubscription> f = new CompletableFuture<UaSubscription>();
            f.completeExceptionally(new UaException(0x80280000L));
            return f;
        }
        UInteger requestedMaxKeepAliveCount = Unsigned.uint(Math.max(1, (int)Math.ceil(10000.0 / requestedPublishingInterval)));
        UInteger requestedLifetimeCount = Unsigned.uint(requestedMaxKeepAliveCount.intValue() * 6);
        CompletableFuture<UaSubscription> future = this.modifySubscription(subscriptionId, requestedPublishingInterval, requestedLifetimeCount, requestedMaxKeepAliveCount, subscription.getMaxNotificationsPerPublish(), subscription.getPriority());
        future.thenRun(this::maybeSendPublishRequests);
        return future;
    }

    @Override
    public CompletableFuture<UaSubscription> modifySubscription(UInteger subscriptionId, double requestedPublishingInterval, UInteger requestedLifetimeCount, UInteger requestedMaxKeepAliveCount, UInteger maxNotificationsPerPublish, UByte priority) {
        OpcUaSubscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription == null) {
            CompletableFuture<UaSubscription> f = new CompletableFuture<UaSubscription>();
            f.completeExceptionally(new UaException(0x80280000L));
            return f;
        }
        CompletableFuture<ModifySubscriptionResponse> future = this.client.modifySubscription(subscriptionId, requestedPublishingInterval, requestedLifetimeCount, requestedMaxKeepAliveCount, maxNotificationsPerPublish, priority);
        return future.thenApply(response -> {
            subscription.setRevisedPublishingInterval(response.getRevisedPublishingInterval());
            subscription.setRevisedLifetimeCount(response.getRevisedLifetimeCount());
            subscription.setRevisedMaxKeepAliveCount(response.getRevisedMaxKeepAliveCount());
            subscription.setMaxNotificationsPerPublish(maxNotificationsPerPublish);
            subscription.setPriority(priority);
            this.maybeSendPublishRequests();
            return subscription;
        });
    }

    @Override
    public CompletableFuture<UaSubscription> deleteSubscription(UInteger subscriptionId) {
        ArrayList subscriptionIds = Lists.newArrayList((Object[])new UInteger[]{subscriptionId});
        return this.client.deleteSubscriptions(subscriptionIds).thenApply(r -> {
            OpcUaSubscription subscription = this.subscriptions.remove(subscriptionId);
            this.maybeSendPublishRequests();
            return subscription;
        });
    }

    public void transferFailed(UInteger subscriptionId, StatusCode statusCode) {
        OpcUaSubscription subscription = this.subscriptions.remove(subscriptionId);
        if (subscription != null) {
            this.subscriptionListeners.forEach(l -> l.onSubscriptionTransferFailed(subscription, statusCode));
        }
    }

    @Override
    public ImmutableList<UaSubscription> getSubscriptions() {
        return ImmutableList.copyOf(this.subscriptions.values());
    }

    @Override
    public void addSubscriptionListener(UaSubscriptionManager.SubscriptionListener listener) {
        this.subscriptionListeners.add(listener);
    }

    @Override
    public void removeSubscriptionListener(UaSubscriptionManager.SubscriptionListener listener) {
        this.subscriptionListeners.remove(listener);
    }

    private long getMaxPendingPublishes() {
        long maxPendingPublishRequests = this.client.getConfig().getMaxPendingPublishRequests().longValue();
        return Math.min((long)(this.subscriptions.size() * 2), maxPendingPublishRequests);
    }

    private UInteger getTimeoutHint() {
        double minKeepAlive = this.subscriptions.values().stream().map(s -> s.getRevisedPublishingInterval() * s.getRevisedMaxKeepAliveCount().doubleValue()).min(Comparator.naturalOrder()).orElse(this.client.getConfig().getRequestTimeout().doubleValue());
        long timeoutHint = (long)((double)this.getMaxPendingPublishes() * minKeepAlive * 1.25);
        return Unsigned.uint(timeoutHint);
    }

    private void maybeSendPublishRequests() {
        long maxPendingPublishes = this.getMaxPendingPublishes();
        if (maxPendingPublishes == 0L) {
            return;
        }
        this.client.getSession().thenAccept(session -> {
            AtomicLong pendingCount = this.pendingCountMap.computeIfAbsent(session.getSessionId(), id -> new AtomicLong(0L));
            for (long i = pendingCount.get(); i < maxPendingPublishes; ++i) {
                if (pendingCount.incrementAndGet() <= maxPendingPublishes) {
                    this.sendPublishRequest((UaSession)session, pendingCount);
                    continue;
                }
                pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
            }
            if (this.pendingCountMap.size() > 1) {
                this.pendingCountMap.entrySet().removeIf(e -> !((NodeId)e.getKey()).equals(session.getSessionId()));
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendPublishRequest(UaSession session, AtomicLong pendingCount) {
        SubscriptionAcknowledgement[] subscriptionAcknowledgements;
        List<SubscriptionAcknowledgement> list = this.acknowledgements;
        synchronized (list) {
            subscriptionAcknowledgements = this.acknowledgements.toArray(new SubscriptionAcknowledgement[this.acknowledgements.size()]);
            this.acknowledgements.clear();
        }
        UInteger requestHandle = this.client.nextRequestHandle();
        RequestHeader requestHeader = new RequestHeader(session.getAuthenticationToken(), DateTime.now(), requestHandle, Unsigned.uint(0), null, this.getTimeoutHint(), null);
        PublishRequest request = new PublishRequest(requestHeader, subscriptionAcknowledgements);
        if (this.logger.isDebugEnabled()) {
            Object[] ackStrings = (String[])Arrays.stream(subscriptionAcknowledgements).map(ack -> String.format("id=%s/seq=%s", ack.getSubscriptionId(), ack.getSequenceNumber())).toArray(String[]::new);
            this.logger.debug("Sending PublishRequest, requestHandle={}, acknowledgements={}", (Object)requestHandle, (Object)Arrays.toString(ackStrings));
        }
        this.client.sendRequest(request).whenCompleteAsync((response, ex) -> {
            pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
            if (response != null) {
                this.logger.debug("Received PublishResponse, sequenceNumber={}", (Object)response.getNotificationMessage().getSequenceNumber());
                this.processingQueue.submit(() -> this.onPublishComplete((PublishResponse)response));
                this.maybeSendPublishRequests();
            } else {
                StatusCode statusCode = UaException.extract(ex).map(UaException::getStatusCode).orElse(StatusCode.BAD);
                this.logger.debug("Publish service failure: {}", (Object)statusCode, ex);
                if (statusCode.getValue() != 0x80780000L) {
                    this.maybeSendPublishRequests();
                }
                List<SubscriptionAcknowledgement> list = this.acknowledgements;
                synchronized (list) {
                    Collections.addAll(this.acknowledgements, subscriptionAcknowledgements);
                }
                UaException uax = UaException.extract(ex).orElse(new UaException((Throwable)ex));
                this.subscriptionListeners.forEach(l -> l.onPublishFailure(uax));
            }
        }, (Executor)this.client.getConfig().getExecutor());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onPublishComplete(PublishResponse response) {
        long expectedSequenceNumber;
        this.logger.debug("onPublishComplete() response for subscriptionId={}", (Object)response.getSubscriptionId());
        UInteger subscriptionId = response.getSubscriptionId();
        OpcUaSubscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription == null) {
            return;
        }
        NotificationMessage notificationMessage = response.getNotificationMessage();
        long sequenceNumber = notificationMessage.getSequenceNumber().longValue();
        if (sequenceNumber > (expectedSequenceNumber = subscription.getLastSequenceNumber() + 1L)) {
            this.logger.warn("[id={}] expected sequence={}, received sequence={}. Calling Republish service...", new Object[]{subscriptionId, expectedSequenceNumber, sequenceNumber});
            this.processingQueue.pause();
            this.processingQueue.submitToHead(() -> this.onPublishComplete(response));
            this.republish(subscriptionId, expectedSequenceNumber, sequenceNumber).whenComplete((dataLost, ex) -> {
                if (ex != null) {
                    this.logger.debug("Republish failed: {}", (Object)ex.getMessage(), ex);
                    this.subscriptionListeners.forEach(l -> l.onNotificationDataLost(subscription));
                } else if (dataLost.booleanValue()) {
                    this.subscriptionListeners.forEach(l -> l.onNotificationDataLost(subscription));
                }
                subscription.setLastSequenceNumber(sequenceNumber - 1L);
                this.processingQueue.resume();
            });
            return;
        }
        subscription.setLastSequenceNumber(sequenceNumber);
        UInteger[] availableSequenceNumbers = response.getAvailableSequenceNumbers();
        if (availableSequenceNumbers != null && availableSequenceNumbers.length > 0) {
            List<SubscriptionAcknowledgement> list = this.acknowledgements;
            synchronized (list) {
                for (UInteger available : availableSequenceNumbers) {
                    this.acknowledgements.add(new SubscriptionAcknowledgement(subscriptionId, available));
                }
            }
            if (this.logger.isDebugEnabled()) {
                Object[] seqStrings = (String[])Arrays.stream(availableSequenceNumbers).map(sequence -> String.format("id=%s/seq=%s", subscriptionId, sequence)).toArray(String[]::new);
                this.logger.debug("[id={}] PublishResponse sequence={}, available sequences={}", new Object[]{subscriptionId, sequenceNumber, Arrays.toString(seqStrings)});
            }
        }
        this.onNotificationMessage(subscriptionId, notificationMessage);
    }

    private CompletableFuture<Boolean> republish(UInteger subscriptionId, long fromSequence, long toSequence) {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.republish(subscriptionId, fromSequence, toSequence, false, future);
        return future;
    }

    private void republish(UInteger subscriptionId, long fromSequence, long toSequence, boolean dataLost, CompletableFuture<Boolean> future) {
        if (fromSequence == toSequence) {
            future.complete(dataLost);
        } else {
            this.client.republish(subscriptionId, Unsigned.uint(fromSequence)).whenComplete((response, ex) -> {
                if (response != null) {
                    try {
                        this.onRepublishComplete(subscriptionId, (RepublishResponse)response, Unsigned.uint(fromSequence));
                        this.republish(subscriptionId, fromSequence + 1L, toSequence, dataLost, future);
                    }
                    catch (UaException e) {
                        this.republish(subscriptionId, fromSequence + 1L, toSequence, true, future);
                    }
                } else {
                    StatusCode statusCode = UaException.extract(ex).map(UaException::getStatusCode).orElse(StatusCode.BAD);
                    if (statusCode.getValue() == 2155544576L) {
                        this.republish(subscriptionId, fromSequence + 1L, toSequence, true, future);
                    } else {
                        future.completeExceptionally((Throwable)ex);
                    }
                }
            });
        }
    }

    private void onRepublishComplete(UInteger subscriptionId, RepublishResponse response, UInteger expectedSequenceNumber) throws UaException {
        NotificationMessage notificationMessage = response.getNotificationMessage();
        UInteger sequenceNumber = notificationMessage.getSequenceNumber();
        if (!sequenceNumber.equals(expectedSequenceNumber)) {
            throw new UaException(0x80880000L, "expected sequence=" + expectedSequenceNumber + ", received sequence=" + sequenceNumber);
        }
        this.onNotificationMessage(subscriptionId, notificationMessage);
    }

    private void onNotificationMessage(UInteger subscriptionId, NotificationMessage notificationMessage) {
        DateTime publishTime = notificationMessage.getPublishTime();
        this.logger.debug("onNotificationMessage(), subscriptionId={}, sequenceNumber={}, publishTime={}", new Object[]{subscriptionId, notificationMessage.getSequenceNumber(), publishTime});
        OpcUaSubscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription != null) {
            this.deliverNotificationMessage(subscription, notificationMessage);
        }
    }

    private void deliverNotificationMessage(OpcUaSubscription subscription, NotificationMessage notificationMessage) {
        subscription.getNotificationSemaphore().acquire().thenAccept(permit -> this.deliveryQueue.submit(() -> {
            try {
                Map<UInteger, OpcUaMonitoredItem> items = subscription.getItemsByClientHandle();
                List<ExtensionObject> notificationData = ConversionUtil.l(notificationMessage.getNotificationData());
                for (ExtensionObject xo : notificationData) {
                    UaMonitoredItem item;
                    Object o = xo.decode();
                    if (o instanceof DataChangeNotification) {
                        DataChangeNotification dcn = (DataChangeNotification)o;
                        List<MonitoredItemNotification> monitoredItems = ConversionUtil.l(dcn.getMonitoredItems());
                        int notificationCount = monitoredItems.size();
                        this.logger.debug("Received {} MonitoredItemNotifications", (Object)notificationCount);
                        for (MonitoredItemNotification monitoredItemNotification : monitoredItems) {
                            this.logger.trace("MonitoredItemNotification: clientHandle={}, value={}", (Object)monitoredItemNotification.getClientHandle(), (Object)monitoredItemNotification.getValue());
                            item = items.get(monitoredItemNotification.getClientHandle());
                            if (item != null) {
                                ((OpcUaMonitoredItem)item).onValueArrived(monitoredItemNotification.getValue());
                                continue;
                            }
                            this.logger.warn("no item for clientHandle=" + monitoredItemNotification.getClientHandle());
                        }
                        if (notificationCount == 0) {
                            this.subscriptionListeners.forEach(listener -> listener.onKeepAlive(subscription, notificationMessage.getPublishTime()));
                            subscription.getNotificationListeners().forEach(listener -> listener.onKeepAliveNotification(subscription, notificationMessage.getPublishTime()));
                            continue;
                        }
                        if (subscription.getNotificationListeners().isEmpty()) continue;
                        ImmutableList.Builder builder = ImmutableList.builder();
                        for (MonitoredItemNotification n : monitoredItems) {
                            UaMonitoredItem item2 = subscription.getItemsByClientHandle().get(n.getClientHandle());
                            if (item2 == null) continue;
                            builder.add((Object)new Tuple2((Object)item2, (Object)n.getValue()));
                        }
                        ImmutableList immutableList = builder.build();
                        subscription.getNotificationListeners().forEach(listener -> listener.onDataChangeNotification(subscription, (ImmutableList<Tuple2<UaMonitoredItem, DataValue>>)itemValues, notificationMessage.getPublishTime()));
                        continue;
                    }
                    if (o instanceof EventNotificationList) {
                        EventNotificationList enl = (EventNotificationList)o;
                        List<EventFieldList> events = ConversionUtil.l(enl.getEvents());
                        for (EventFieldList eventFieldList : events) {
                            this.logger.trace("EventFieldList: clientHandle={}, values={}", (Object)eventFieldList.getClientHandle(), (Object)Arrays.toString(eventFieldList.getEventFields()));
                            OpcUaMonitoredItem opcUaMonitoredItem = items.get(eventFieldList.getClientHandle());
                            if (opcUaMonitoredItem == null) continue;
                            opcUaMonitoredItem.onEventArrived(eventFieldList.getEventFields());
                        }
                        if (subscription.getNotificationListeners().isEmpty()) continue;
                        ImmutableList.Builder builder = ImmutableList.builder();
                        for (EventFieldList eventFieldList : events) {
                            item = subscription.getItemsByClientHandle().get(eventFieldList.getClientHandle());
                            if (item == null) continue;
                            builder.add((Object)new Tuple2((Object)item, (Object)eventFieldList.getEventFields()));
                        }
                        ImmutableList immutableList = builder.build();
                        subscription.getNotificationListeners().forEach(listener -> listener.onEventNotification(subscription, (ImmutableList<Tuple2<UaMonitoredItem, Variant[]>>)itemEvents, notificationMessage.getPublishTime()));
                        continue;
                    }
                    if (!(o instanceof StatusChangeNotification)) continue;
                    StatusChangeNotification scn = (StatusChangeNotification)o;
                    this.logger.debug("StatusChangeNotification: {}", (Object)scn.getStatus());
                    this.subscriptionListeners.forEach(listener -> listener.onStatusChanged(subscription, scn.getStatus()));
                    subscription.getNotificationListeners().forEach(listener -> listener.onStatusChangedNotification(subscription, scn.getStatus()));
                    if (scn.getStatus().getValue() != 0x800A0000L) continue;
                    this.subscriptions.remove(subscription.getSubscriptionId());
                    this.maybeSendPublishRequests();
                }
            }
            finally {
                permit.release();
            }
        }));
    }

    public void startPublishing() {
        this.maybeSendPublishRequests();
    }

    public void clearSubscriptions() {
        this.subscriptions.clear();
    }
}

