/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map;

import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Publisher;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.server.internal.PublisherHandler;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class RemotePublisher<E>
extends AbstractStatelessClient<PublisherHandler.EventId>
implements Publisher<E> {
    private final Class<E> messageClass;

    public RemotePublisher(@NotNull RequestContext context, Asset asset, Object underlying) throws AssetNotFoundException {
        super(asset.findView(TcpChannelHub.class), 0L, RemotePublisher.toUri(context));
        this.messageClass = context.messageType();
    }

    private static String toUri(RequestContext context) {
        StringBuilder uri = new StringBuilder("/" + context.fullName() + "?view=" + "publisher");
        if (context.valueType() != String.class) {
            uri.append("&messageType=").append(context.messageType().getName());
        }
        return uri.toString();
    }

    @Override
    public void publish(E event) {
        this.checkEvent(event);
        this.sendEventAsync((WireKey)PublisherHandler.EventId.publish, valueOut -> valueOut.object(event));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerSubscriber(Subscriber subscriber) throws AssetNotFoundException {
        long startTime = System.currentTimeMillis();
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        this.hub.outBytesLock().lock();
        try {
            long tid = this.writeMetaDataStartTime(startTime);
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)PublisherHandler.EventId.registerTopicSubscriber).text((CharSequence)""));
            this.hub.asyncReadSocket(tid, w -> w.readDocument(null, d -> {
                StringBuilder eventname = Wires.acquireStringBuilder();
                ValueIn valueIn = d.readEventName(eventname);
                if (PublisherHandler.EventId.onEndOfSubscription.contentEquals(eventname)) {
                    subscriber.onEndOfSubscription();
                } else if (CoreFields.reply.contentEquals(eventname)) {
                    valueIn.marshallable(m -> {
                        Object message = m.read(() -> "message").object(this.messageClass);
                        this.onEvent(message, subscriber);
                    });
                }
            }));
            this.hub.writeSocket(this.hub.outWire());
        }
        finally {
            this.hub.outBytesLock().unlock();
        }
    }

    private void onEvent(E message, Subscriber<E> subscriber) {
        try {
            if (message != null) {
                subscriber.onMessage(message);
            }
        }
        catch (InvalidSubscriberException invalidSubscriberException) {
            // empty catch block
        }
    }

    private void checkEvent(@Nullable Object key) {
        if (key == null) {
            throw new NullPointerException("event can not be null");
        }
    }
}

