/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.server.internal;

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.AssetTree;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.map.KVSSubscription;
import net.openhft.chronicle.engine.server.internal.AbstractHandler;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.wire.ParameterizeWireKey;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionHandlerProcessor
extends AbstractHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHandlerProcessor.class);
    final StringBuilder eventName = new StringBuilder();
    private final Map<Long, Object> tidToListener = new ConcurrentHashMap<Long, Object>();
    private RequestContext requestContext;
    private Queue<Consumer<Wire>> publisher;
    private AssetTree assetTree;
    private Wire outWire;
    private KVSSubscription subscription;
    private final BiConsumer<WireIn, Long> dataConsumer = new BiConsumer<WireIn, Long>(){

        @Override
        public void accept(WireIn inWire, final Long inputTid) {
            SubscriptionHandlerProcessor.this.eventName.setLength(0);
            ValueIn valueIn = inWire.readEventName(SubscriptionHandlerProcessor.this.eventName);
            if (EventId.registerTopicSubscriber.contentEquals(SubscriptionHandlerProcessor.this.eventName)) {
                TopicSubscriber listener = new TopicSubscriber(){

                    public void onMessage(Object topic, Object message) throws InvalidSubscriberException {
                        SubscriptionHandlerProcessor.this.publisher.add(publish -> {
                            publish.writeDocument(true, wire -> wire.writeEventName((WireKey)CoreFields.tid).int64(inputTid.longValue()));
                            publish.writeNotReadyDocument(false, wire -> wire.write((WireKey)CoreFields.reply).marshallable(m -> {
                                m.write(() -> "topic").object(topic);
                                m.write(() -> "message").object(message);
                            }));
                        });
                    }

                    @Override
                    public void onEndOfSubscription() {
                        SubscriptionHandlerProcessor.this.publisher.add(publish -> {
                            publish.writeDocument(true, wire -> wire.writeEventName((WireKey)CoreFields.tid).int64(inputTid.longValue()));
                            publish.writeNotReadyDocument(false, wire -> wire.writeEventName((WireKey)EventId.onEndOfSubscription).text((CharSequence)""));
                        });
                    }
                };
                valueIn.marshallable(m -> {
                    Class kClass = m.read(() -> "keyType").typeLiteral();
                    Class vClass = m.read(() -> "valueType").typeLiteral();
                    SubscriptionHandlerProcessor.this.tidToListener.put(inputTid, listener);
                    SubscriptionHandlerProcessor.this.assetTree.registerTopicSubscriber(SubscriptionHandlerProcessor.this.requestContext.name(), kClass, vClass, listener);
                });
                return;
            }
            if (EventId.unregisterTopicSubscriber.contentEquals(SubscriptionHandlerProcessor.this.eventName)) {
                TopicSubscriber listener = (TopicSubscriber)SubscriptionHandlerProcessor.this.tidToListener.remove(inputTid);
                if (listener == null) {
                    LOG.warn("No subscriber to present to unsubscribe (" + inputTid + ")");
                    return;
                }
                SubscriptionHandlerProcessor.this.assetTree.unregisterTopicSubscriber(SubscriptionHandlerProcessor.this.requestContext.name(), listener);
                SubscriptionHandlerProcessor.this.publisher.add(publish -> {
                    publish.writeDocument(true, wire -> wire.writeEventName((WireKey)CoreFields.tid).int64(inputTid.longValue()));
                    publish.writeDocument(false, wire -> wire.write((WireKey)CoreFields.reply).typedMarshallable(null));
                });
                return;
            }
            if (MapWireHandler.EventId.subscribe.contentEquals(SubscriptionHandlerProcessor.this.eventName)) {
                Class eventClass = valueIn.typeLiteral();
                Subscriber<Object> listener = e -> SubscriptionHandlerProcessor.this.publisher.add(publish -> {
                    publish.writeDocument(true, wire -> wire.writeEventName((WireKey)CoreFields.tid).int64(inputTid.longValue()));
                    publish.writeNotReadyDocument(false, wire -> wire.write((WireKey)CoreFields.reply).object(e));
                });
                SubscriptionHandlerProcessor.this.tidToListener.put(inputTid, listener);
                SubscriptionHandlerProcessor.this.assetTree.registerSubscriber(SubscriptionHandlerProcessor.this.requestContext.name(), eventClass, listener);
                return;
            }
            if (MapWireHandler.EventId.unSubscribe.contentEquals(SubscriptionHandlerProcessor.this.eventName)) {
                Subscriber listener = (Subscriber)SubscriptionHandlerProcessor.this.tidToListener.remove(inputTid);
                if (listener == null) {
                    LOG.warn("No subscriber to present to unsubscribe (" + inputTid + ")");
                    return;
                }
                SubscriptionHandlerProcessor.this.assetTree.unregisterSubscriber(SubscriptionHandlerProcessor.this.requestContext.name(), listener);
                SubscriptionHandlerProcessor.this.publisher.add(publish -> {
                    publish.writeDocument(true, wire -> wire.writeEventName((WireKey)CoreFields.tid).int64(inputTid.longValue()));
                    publish.writeDocument(false, wire -> wire.write((WireKey)CoreFields.reply).typedMarshallable(null));
                });
                return;
            }
            SubscriptionHandlerProcessor.this.outWire.writeDocument(true, wire -> SubscriptionHandlerProcessor.this.outWire.writeEventName((WireKey)CoreFields.tid).int64(inputTid.longValue()));
            SubscriptionHandlerProcessor.this.writeData(inWire.bytes(), out -> {
                if (EventId.topicSubscriberCount.contentEquals(SubscriptionHandlerProcessor.this.eventName)) {
                    SubscriptionHandlerProcessor.this.outWire.writeEventName((WireKey)CoreFields.reply).int8((long)SubscriptionHandlerProcessor.this.subscription.topicSubscriberCount());
                    return;
                }
                if (EventId.keySubscriberCount.contentEquals(SubscriptionHandlerProcessor.this.eventName)) {
                    SubscriptionHandlerProcessor.this.outWire.writeEventName((WireKey)CoreFields.reply).int8((long)SubscriptionHandlerProcessor.this.subscription.keySubscriberCount());
                    return;
                }
                if (EventId.entrySubscriberCount.contentEquals(SubscriptionHandlerProcessor.this.eventName)) {
                    SubscriptionHandlerProcessor.this.outWire.writeEventName((WireKey)CoreFields.reply).int8((long)SubscriptionHandlerProcessor.this.subscription.entrySubscriberCount());
                }
            });
        }
    };

    void process(Wire inWire, RequestContext requestContext, Queue<Consumer<Wire>> publisher, AssetTree assetTree, long tid, Wire outWire, KVSSubscription subscription) {
        this.setOutWire(outWire);
        this.outWire = outWire;
        this.subscription = subscription;
        this.requestContext = requestContext;
        this.publisher = publisher;
        this.assetTree = assetTree;
        this.dataConsumer.accept((WireIn)inWire, tid);
    }

    public static enum EventId implements ParameterizeWireKey
    {
        unSubscribe(new WireKey[0]),
        subscribe(new WireKey[0]),
        registerTopicSubscriber(new WireKey[0]),
        unregisterTopicSubscriber(new WireKey[0]),
        entrySubscriberCount(new WireKey[0]),
        topicSubscriberCount(new WireKey[0]),
        keySubscriberCount(new WireKey[0]),
        onEndOfSubscription(new WireKey[0]);

        private final WireKey[] params;

        private <P extends WireKey> EventId(P ... params) {
            this.params = params;
        }

        @NotNull
        public <P extends WireKey> P[] params() {
            return this.params;
        }
    }
}

