/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.engine.api.map.KeyValueStore;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.map.EventConsumer;
import net.openhft.chronicle.engine.map.ObjectKVSSubscription;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.PublisherHandler;
import net.openhft.chronicle.engine.server.internal.SubscriptionHandlerProcessor;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteKVSSubscription<K, MV, V>
extends AbstractStatelessClient
implements ObjectKVSSubscription<K, MV, V>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(MapWireHandler.class);
    private final Map<Object, Long> subscribersToTid = new ConcurrentHashMap<Object, Long>();
    private final Class<K> kClass;
    private final Class<V> vClass;

    public RemoteKVSSubscription(RequestContext context, Asset asset) {
        super(asset.findView(TcpChannelHub.class), 0L, RemoteKVSSubscription.toUri(context));
        this.kClass = context.keyType();
        this.vClass = context.valueType();
    }

    private static String toUri(@NotNull RequestContext context) {
        return "/" + context.fullName() + "?view=subscription";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerTopicSubscriber(RequestContext rc, TopicSubscriber<K, V> subscriber) {
        long startTime = System.currentTimeMillis();
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        this.hub.outBytesLock().lock();
        try {
            long tid = this.writeMetaDataStartTime(startTime);
            this.subscribersToTid.put(subscriber, tid);
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)SubscriptionHandlerProcessor.EventId.registerTopicSubscriber).marshallable(m -> {
                m.write(() -> "keyType").typeLiteral(this.kClass);
                m.write(() -> "valueType").typeLiteral(this.vClass);
            }));
            this.hub.asyncReadSocket(tid, w -> w.readDocument(null, d -> {
                ValueIn valueIn = d.read((WireKey)CoreFields.reply);
                valueIn.marshallable(m -> {
                    Object topic = m.read(() -> "topic").object(this.kClass);
                    Object message = m.read(() -> "message").object(this.vClass);
                    this.onEvent(topic, message, subscriber);
                });
            }));
            this.hub.writeSocket(this.hub.outWire());
        }
        finally {
            this.hub.outBytesLock().unlock();
        }
    }

    private void onEvent(K topic, V message, TopicSubscriber<K, V> subscriber) {
        try {
            if (message == null) {
                this.unregisterTopicSubscriber(subscriber);
            } else {
                subscriber.onMessage(topic, message);
            }
        }
        catch (InvalidSubscriberException noLongerValid) {
            this.unregisterTopicSubscriber(subscriber);
        }
    }

    @Override
    public int topicSubscriberCount() {
        return this.proxyReturnInt((WireKey)SubscriptionHandlerProcessor.EventId.topicSubscriberCount);
    }

    @Override
    public int keySubscriberCount() {
        return this.proxyReturnInt((WireKey)SubscriptionHandlerProcessor.EventId.keySubscriberCount);
    }

    @Override
    public int entrySubscriberCount() {
        return this.proxyReturnInt((WireKey)SubscriptionHandlerProcessor.EventId.entrySubscriberCount);
    }

    @Override
    public void unregisterTopicSubscriber(TopicSubscriber subscriber) {
        Long tid = this.subscribersToTid.get(subscriber);
        if (tid == null) {
            LOG.warn("There is no subscription to unsubscribe");
            return;
        }
        this.hub.outBytesLock().lock();
        try {
            this.writeMetaDataForKnownTID(tid);
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)SubscriptionHandlerProcessor.EventId.unregisterTopicSubscriber).text((CharSequence)""));
            this.hub.writeSocket(this.hub.outWire());
        }
        finally {
            this.hub.outBytesLock().unlock();
        }
    }

    @Override
    public void registerSubscriber(RequestContext rc, Subscriber<MapEvent<K, V>> subscriber) {
        this.registerSubscriber0(rc, subscriber);
    }

    @Override
    public void registerKeySubscriber(RequestContext rc, Subscriber<K> subscriber) {
        this.registerSubscriber0(rc, subscriber);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void registerSubscriber0(RequestContext rc, Subscriber subscriber) {
        long startTime = System.currentTimeMillis();
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        this.hub.outBytesLock().lock();
        try {
            long tid = this.writeMetaDataStartTime(startTime);
            this.subscribersToTid.put(subscriber, tid);
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)MapWireHandler.EventId.subscribe).typeLiteral((CharSequence)ClassAliasPool.CLASS_ALIASES.nameFor(rc.elementType())));
            this.hub.asyncReadSocket(tid, w -> w.readDocument(null, d -> {
                StringBuilder eventname = Wires.acquireStringBuilder();
                ValueIn valueIn = d.readEventName(eventname);
                if (PublisherHandler.EventId.onEndOfSubscription.contentEquals(eventname)) {
                    subscriber.onEndOfSubscription();
                } else if (CoreFields.reply.contentEquals(eventname)) {
                    Class aClass = rc.elementType();
                    ReadMarshallable object = MapEvent.class.isAssignableFrom(aClass) ? valueIn.typedMarshallable() : valueIn.object(rc.elementType());
                    this.onEvent(object, subscriber);
                }
            }));
            this.hub.writeSocket(this.hub.outWire());
        }
        finally {
            this.hub.outBytesLock().unlock();
        }
        assert (!this.hub.outBytesLock().isHeldByCurrentThread());
    }

    private void onEvent(Object message, Subscriber subscriber) {
        try {
            if (message != null) {
                subscriber.onMessage(message);
            }
        }
        catch (InvalidSubscriberException noLongerValid) {
            this.unregisterSubscriber(subscriber);
        }
    }

    @Override
    public void unregisterKeySubscriber(Subscriber<K> subscriber) {
        this.unregisterSubscriber0(subscriber);
    }

    @Override
    public void unregisterSubscriber(Subscriber<MapEvent<K, V>> subscriber) {
        this.unregisterSubscriber0(subscriber);
    }

    void unregisterSubscriber0(Subscriber subscriber) {
        Long tid = this.subscribersToTid.get(subscriber);
        if (tid == null) {
            LOG.warn("There is subscription to unsubscribe");
            return;
        }
        this.hub.outBytesLock().lock();
        try {
            this.writeMetaDataForKnownTID(tid);
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)MapWireHandler.EventId.unSubscribe).text((CharSequence)""));
            this.hub.writeSocket(this.hub.outWire());
        }
        finally {
            this.hub.outBytesLock().unlock();
        }
    }

    @Override
    public boolean needsPrevious() {
        return true;
    }

    @Override
    public void setKvStore(KeyValueStore<K, MV, V> store) {
    }

    @Override
    public void notifyEvent(MapEvent<K, V> mpe) {
        throw new UnsupportedOperationException("");
    }

    @Override
    public void registerDownstream(EventConsumer<K, V> subscription) {
        throw new UnsupportedOperationException("todo");
    }
}

