/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.Subscription;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.PublisherHandler;
import net.openhft.chronicle.engine.server.internal.SubscriptionHandler;
import net.openhft.chronicle.engine.tree.TopologicalEvent;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractRemoteSubscription<E>
extends AbstractStatelessClient
implements Subscription<E> {
    private static final Logger LOG = LoggerFactory.getLogger(MapWireHandler.class);
    protected final Map<Object, Long> subscribersToTid = new ConcurrentHashMap<Object, Long>();

    public AbstractRemoteSubscription(@NotNull TcpChannelHub hub, long cid, @NotNull String csp) {
        super(hub, cid, csp);
    }

    @Override
    public void registerSubscriber(@NotNull RequestContext rc, @NotNull Subscriber<E> subscriber) {
        this.registerSubscriber0(rc, subscriber);
    }

    @Override
    public void unregisterSubscriber(Subscriber<E> subscriber) {
        this.unregisterSubscriber0(subscriber);
    }

    void registerSubscriber0(final @NotNull RequestContext rc, final @NotNull Subscriber subscriber) {
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        Boolean bootstrap = rc.bootstrap();
        String csp = this.csp;
        if (bootstrap != null) {
            csp = csp + "&bootstrap=" + bootstrap;
        }
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, csp){
            {
                super(hub, csp);
                AbstractRemoteSubscription.this.subscribersToTid.put(subscriber, this.tid());
            }

            @Override
            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName((WireKey)SubscriptionHandler.SubscriptionEventID.registerSubscriber).typeLiteral((CharSequence)ClassAliasPool.CLASS_ALIASES.nameFor(rc.elementType()));
            }

            @Override
            public void onConsumer(@NotNull WireIn inWire) {
                inWire.readDocument(null, d -> {
                    StringBuilder eventname = Wires.acquireStringBuilder();
                    ValueIn valueIn = d.readEventName(eventname);
                    if (PublisherHandler.EventId.onEndOfSubscription.contentEquals(eventname)) {
                        subscriber.onEndOfSubscription();
                        AbstractRemoteSubscription.this.hub.unsubscribe(this.tid());
                    } else if (CoreFields.reply.contentEquals(eventname)) {
                        Class aClass = rc.elementType();
                        ReadMarshallable object = MapEvent.class.isAssignableFrom(aClass) || TopologicalEvent.class.isAssignableFrom(aClass) ? valueIn.typedMarshallable() : valueIn.object(rc.elementType());
                        AbstractRemoteSubscription.this.onEvent(object, subscriber);
                    }
                });
            }
        });
    }

    private void onEvent(@Nullable Object message, @NotNull Subscriber subscriber) {
        try {
            if (message != null) {
                subscriber.onMessage(message);
            }
        }
        catch (InvalidSubscriberException noLongerValid) {
            this.unregisterSubscriber(subscriber);
        }
    }

    void unregisterSubscriber0(Subscriber subscriber) {
        Long tid = this.subscribersToTid.get(subscriber);
        if (tid == null) {
            LOG.warn("There is subscription to unsubscribe");
            return;
        }
        this.hub.outBytesLock().lock();
        try {
            this.writeMetaDataForKnownTID(tid);
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)SubscriptionHandler.SubscriptionEventID.unRegisterSubscriber).text((CharSequence)""));
            this.hub.writeSocket((WireOut)this.hub.outWire());
        }
        finally {
            this.hub.outBytesLock().unlock();
        }
    }

    @Override
    public int topicSubscriberCount() {
        return this.proxyReturnInt((WireKey)SubscriptionHandler.SubscriptionEventID.topicSubscriberCount);
    }

    @Override
    public int keySubscriberCount() {
        return this.proxyReturnInt((WireKey)SubscriptionHandler.SubscriptionEventID.keySubscriberCount);
    }

    @Override
    public int entrySubscriberCount() {
        return this.proxyReturnInt((WireKey)SubscriptionHandler.SubscriptionEventID.entrySubscriberCount);
    }
}

