/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.api.tree.View;
import net.openhft.chronicle.engine.collection.CollectionWireHandler;
import net.openhft.chronicle.engine.map.ChronicleMapKeyValueStore;
import net.openhft.chronicle.engine.map.replication.Bootstrap;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.ReplicationHandler;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.ValueOut;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationHub
extends AbstractStatelessClient
implements View {
    private static final Logger LOG = LoggerFactory.getLogger(ChronicleMapKeyValueStore.class);
    private final EventLoop eventLoop;
    private final AtomicBoolean isClosed;

    public ReplicationHub(RequestContext context, @NotNull TcpChannelHub hub, EventLoop eventLoop, AtomicBoolean isClosed) {
        super(hub, 0L, ReplicationHub.toUri(context));
        this.eventLoop = eventLoop;
        this.isClosed = isClosed;
    }

    private static String toUri(RequestContext context) {
        StringBuilder uri = new StringBuilder(context.fullName() + "?view=" + "Replication");
        if (context.keyType() != String.class) {
            uri.append("&keyType=").append(context.keyType().getName());
        }
        if (context.valueType() != String.class) {
            uri.append("&valueType=").append(context.valueType().getName());
        }
        return uri.toString();
    }

    public void bootstrap(EngineReplication replication, int localIdentifer) throws InterruptedException {
        byte remoteIdentifier = this.proxyReturnByte((WireKey)ReplicationHandler.EventId.identifierReply, (WireKey)CollectionWireHandler.SetEventId.identifier);
        EngineReplication.ModificationIterator mi = replication.acquireModificationIterator(remoteIdentifier);
        long lastModificationTime = replication.lastModificationTime(remoteIdentifier);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.lastUpdatedTime(lastModificationTime);
        bootstrap.identifier((byte)localIdentifer);
        Function<ValueIn, Bootstrap> typedMarshallable = ValueIn::typedMarshallable;
        Consumer<ValueOut> valueOutConsumer = o -> o.typedMarshallable((WriteMarshallable)bootstrap);
        Bootstrap b = this.proxyReturnWireConsumerInOut((WireKey)MapWireHandler.EventId.bootstap, (WireKey)ReplicationHandler.EventId.bootstrapReply, valueOutConsumer, typedMarshallable);
        try {
            this.subscribe(replication, localIdentifer);
            this.publish(mi, b);
        }
        catch (Throwable t) {
            LOG.error("", t);
        }
    }

    private void publish(final EngineReplication.ModificationIterator mi, Bootstrap remote) throws InterruptedException {
        final TcpChannelHub hub = this.hub;
        mi.setModificationNotifier(() -> this.eventLoop.unpause());
        this.eventLoop.addHandler(new EventHandler(){

            public boolean action() throws InvalidEventHandlerException {
                if (!mi.hasNext()) {
                    return false;
                }
                if (ReplicationHub.this.isClosed.get()) {
                    throw new InvalidEventHandlerException();
                }
                hub.lock(() -> mi.forEach(e -> ReplicationHub.this.sendEventAsyncWithoutLock((WireKey)ReplicationHandler.EventId.replicationEvent, v -> v.typedMarshallable((WriteMarshallable)e))));
                return true;
            }

            public HandlerPriority priority() {
                return HandlerPriority.MEDIUM;
            }
        });
        mi.dirtyEntries(remote.lastUpdatedTime());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribe(EngineReplication replication, int localIdentifer) {
        this.hub.outBytesLock().lock();
        try {
            long tid = this.writeMetaDataStartTime(System.currentTimeMillis());
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)ReplicationHandler.EventId.replicationSubscribe).int8((long)localIdentifer));
            this.hub.asyncReadSocket(tid, d -> d.readDocument(null, w -> replication.applyReplication((EngineReplication.ReplicationEntry)w.read((WireKey)ReplicationHandler.EventId.replicactionReply).typedMarshallable())));
            this.hub.writeSocket(this.hub.outWire());
        }
        finally {
            this.hub.outBytesLock().unlock();
        }
    }
}

