/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map;

import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.bytes.IORuntimeException;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.api.tree.View;
import net.openhft.chronicle.engine.map.ChronicleMapKeyValueStore;
import net.openhft.chronicle.engine.map.replication.Bootstrap;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.ReplicationHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractAsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationHub
extends AbstractStatelessClient
implements View {
    private static final Logger LOG = LoggerFactory.getLogger(ChronicleMapKeyValueStore.class);
    private final EventLoop eventLoop;
    private final AtomicBoolean isClosed;

    public ReplicationHub(@NotNull RequestContext context, @NotNull TcpChannelHub hub, EventLoop eventLoop, AtomicBoolean isClosed) {
        super(hub, 0L, ReplicationHub.toUri(context));
        this.eventLoop = eventLoop;
        this.isClosed = isClosed;
    }

    private static String toUri(@NotNull RequestContext context) {
        StringBuilder uri = new StringBuilder(context.fullName() + "?view=" + "Replication");
        if (context.keyType() != String.class) {
            uri.append("&keyType=").append(context.keyType().getName());
        }
        if (context.valueType() != String.class) {
            uri.append("&valueType=").append(context.valueType().getName());
        }
        return uri.toString();
    }

    public void bootstrap(final @NotNull EngineReplication replication, final byte localIdentifier, final byte remoteIdentifier) throws InterruptedException {
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, this.csp, localIdentifier, "ReplicationHub bootstrap"){

            @Override
            public void onSubscribe(WireOut wireOut) {
                System.out.println("onSubscribe - localIdentifier=" + localIdentifier + ",remoteIdentifier=" + remoteIdentifier);
                wireOut.writeEventName((WireKey)ReplicationHandler.EventId.identifier).marshallable(WriteMarshallable.EMPTY).writeComment((CharSequence)(this.toString() + ", tcpChannelHub={" + ReplicationHub.this.hub.toString() + "}"));
            }

            @Override
            public void onConsumer(@NotNull WireIn inWire) {
                inWire.readDocument(null, d -> {
                    byte remoteIdentifier2 = d.read((WireKey)ReplicationHandler.EventId.identifierReply).int8();
                    ReplicationHub.this.onConnected(localIdentifier, remoteIdentifier2, replication);
                });
            }

            @Override
            public String toString() {
                return "bootstrap {localIdentifier=" + localIdentifier + " ,remoteIdentifier=" + remoteIdentifier + "}";
            }
        });
    }

    private void onConnected(final byte localIdentifier, final byte remoteIdentifier, EngineReplication replication) {
        final EngineReplication.ModificationIterator mi = replication.acquireModificationIterator(remoteIdentifier);
        long lastModificationTime = replication.lastModificationTime(remoteIdentifier);
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.lastUpdatedTime(lastModificationTime);
        bootstrap.identifier(localIdentifier);
        this.subscribe(replication, localIdentifier, remoteIdentifier);
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, this.csp, localIdentifier, "replication onConnected"){

            @Override
            public void onSubscribe(WireOut wireOut) {
                wireOut.writeEventName((WireKey)MapWireHandler.EventId.bootstrap).typedMarshallable((WriteMarshallable)bootstrap);
            }

            @Override
            public void onConsumer(@NotNull WireIn inWire) {
                inWire.readDocument(null, d -> {
                    Bootstrap b = (Bootstrap)d.read((WireKey)ReplicationHandler.EventId.bootstrapReply).typedMarshallable();
                    try {
                        ReplicationHub.this.publish(mi, b, localIdentifier, remoteIdentifier);
                    }
                    catch (Exception e) {
                        LOG.error("", (Throwable)e);
                    }
                });
            }
        });
    }

    private void publish(final @NotNull EngineReplication.ModificationIterator mi, @NotNull Bootstrap remote, final byte localIdentifier, byte remoteIdentifier) throws InterruptedException {
        final TcpChannelHub hub = this.hub;
        mi.setModificationNotifier(() -> ((EventLoop)this.eventLoop).unpause());
        this.eventLoop.addHandler(new EventHandler(){

            public boolean action() throws InvalidEventHandlerException {
                try {
                    if (ReplicationHub.this.isClosed.get()) {
                        throw new InvalidEventHandlerException();
                    }
                    hub.lock(() -> mi.forEach(e -> {
                        if (e.identifier() != localIdentifier) {
                            return;
                        }
                        ReplicationHub.this.sendEventAsyncWithoutLock((WireKey)ReplicationHandler.EventId.replicationEvent, v -> v.typedMarshallable((WriteMarshallable)e));
                    }));
                    return true;
                }
                catch (IORuntimeException e) {
                    LOG.error(e.getMessage());
                    throw new InvalidEventHandlerException();
                }
            }

            @NotNull
            public HandlerPriority priority() {
                return HandlerPriority.MEDIUM;
            }
        });
        mi.dirtyEntries(remote.lastUpdatedTime());
    }

    private void subscribe(final @NotNull EngineReplication replication, final byte localIdentifier, final byte remoteIdentifier) {
        this.hub.subscribe(new AbstractAsyncTemporarySubscription(this.hub, this.csp, localIdentifier, "replication subscribe"){

            @Override
            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName((WireKey)ReplicationHandler.EventId.replicationSubscribe).int8(localIdentifier).writeComment((CharSequence)("remoteIdentifier=" + remoteIdentifier));
            }

            @Override
            public void onConsumer(@NotNull WireIn d) {
                d.readDocument(null, w -> replication.applyReplication((EngineReplication.ReplicationEntry)w.read((WireKey)ReplicationHandler.EventId.replicationReply).typedMarshallable()));
            }
        });
    }
}

