/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.server.internal;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.pubsub.Replication;
import net.openhft.chronicle.engine.map.replication.Bootstrap;
import net.openhft.chronicle.engine.server.internal.AbstractHandler;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.tree.HostIdentifier;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.WireOutPublisher;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.ParameterizeWireKey;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationHandler<E>
extends AbstractHandler {
    private final StringBuilder eventName = new StringBuilder();
    private Replication replication;
    private WireOutPublisher publisher;
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class);
    private HostIdentifier hostId;
    private long tid;
    private EventLoop eventLoop;
    @Nullable
    private final BiConsumer<WireIn, Long> dataConsumer = new BiConsumer<WireIn, Long>(){

        @Override
        public void accept(@NotNull WireIn inWire, final Long inputTid) {
            ReplicationHandler.this.eventName.setLength(0);
            ValueIn valueIn = inWire.readEventName(ReplicationHandler.this.eventName);
            if (EventId.replicationSubscribe.contentEquals(ReplicationHandler.this.eventName)) {
                final byte id = valueIn.int8();
                final EngineReplication.ModificationIterator mi = ReplicationHandler.this.replication.acquireModificationIterator(id);
                mi.setModificationNotifier(() -> ((EventLoop)ReplicationHandler.this.eventLoop).unpause());
                ReplicationHandler.this.eventLoop.addHandler(new EventHandler(){

                    public boolean action() throws InvalidEventHandlerException {
                        if (ReplicationHandler.this.connectionClosed) {
                            throw new InvalidEventHandlerException();
                        }
                        AtomicBoolean hadNext = new AtomicBoolean();
                        mi.forEach(e -> {
                            if (Jvm.isDebug()) {
                                System.out.println(e.toString());
                            }
                            ReplicationHandler.this.publisher.add(publish1 -> {
                                if (e.identifier() != ReplicationHandler.this.hostId.hostId()) {
                                    return;
                                }
                                hadNext.set(true);
                                if (Jvm.isDebug()) {
                                    System.out.println("publish from server response from itterator localIdentifier=" + ReplicationHandler.this.hostId + " ,remoteIdentifier=" + id + " event=" + e);
                                }
                                publish1.writeDocument(true, wire -> wire.writeEventName((WireKey)CoreFields.tid).int64(inputTid.longValue()));
                                publish1.writeNotReadyDocument(false, wire -> wire.write((WireKey)EventId.replicationReply).typedMarshallable((WriteMarshallable)e));
                            });
                        });
                        return hadNext.get();
                    }

                    @NotNull
                    public HandlerPriority priority() {
                        return HandlerPriority.MEDIUM;
                    }
                });
                try {
                    mi.dirtyEntries(0L);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return;
            }
            if (EventId.replicationEvent.contentEquals(ReplicationHandler.this.eventName)) {
                EngineReplication.ReplicationEntry replicatedEntry = (EngineReplication.ReplicationEntry)valueIn.typedMarshallable();
                ReplicationHandler.this.replication.applyReplication(replicatedEntry);
                return;
            }
            ReplicationHandler.this.outWire.writeDocument(true, wire -> ReplicationHandler.this.outWire.writeEventName((WireKey)CoreFields.tid).int64(ReplicationHandler.this.tid));
            ReplicationHandler.this.writeData(inWire.bytes(), out -> {
                if (EventId.identifier.contentEquals(ReplicationHandler.this.eventName)) {
                    ReplicationHandler.this.outWire.write((WireKey)EventId.identifierReply).int8(ReplicationHandler.this.hostId.hostId());
                    return;
                }
                if (MapWireHandler.EventId.bootstrap.contentEquals(ReplicationHandler.this.eventName)) {
                    Bootstrap inBootstrap = (Bootstrap)valueIn.typedMarshallable();
                    if (inBootstrap == null) {
                        return;
                    }
                    byte id = inBootstrap.identifier();
                    EngineReplication.ModificationIterator mi = ReplicationHandler.this.replication.acquireModificationIterator(id);
                    try {
                        mi.dirtyEntries(inBootstrap.lastUpdatedTime());
                    }
                    catch (InterruptedException e) {
                        LOG.error("", (Throwable)e);
                    }
                    Bootstrap outBootstrap = new Bootstrap();
                    outBootstrap.identifier(ReplicationHandler.this.hostId.hostId());
                    outBootstrap.lastUpdatedTime(ReplicationHandler.this.replication.lastModificationTime(id));
                    ReplicationHandler.this.outWire.write((WireKey)EventId.bootstrapReply).typedMarshallable((WriteMarshallable)outBootstrap);
                }
            });
        }
    };

    void process(@NotNull WireIn inWire, WireOutPublisher publisher, long tid, Wire outWire, HostIdentifier hostId, Replication replication, EventLoop eventLoop) {
        this.eventLoop = eventLoop;
        this.setOutWire((WireOut)outWire);
        this.hostId = hostId;
        this.publisher = publisher;
        this.replication = replication;
        this.tid = tid;
        this.dataConsumer.accept(inWire, tid);
    }

    public static enum EventId implements ParameterizeWireKey
    {
        publish(new WireKey[0]),
        onEndOfSubscription(new WireKey[0]),
        apply(new WireKey[0]),
        replicationEvent(new WireKey[0]),
        replicationSubscribe(new WireKey[0]),
        replicationReply(new WireKey[0]),
        bootstrapReply(new WireKey[0]),
        identifierReply(new WireKey[0]),
        identifier(new WireKey[0]);

        private final WireKey[] params;

        private <P extends WireKey> EventId(P ... params) {
            this.params = params;
        }

        @NotNull
        public <P extends WireKey> P[] params() {
            return this.params;
        }
    }
}

