package net.openhft.chronicle.engine.map;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.map.replication.Bootstrap;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.ReplicationHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractAsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/engine/map/ReplicationHub.class */
class ReplicationHub extends AbstractStatelessClient {
    private static final Logger LOG;
    private final EventLoop eventLoop;
    private final AtomicBoolean isClosed;
    private final Function<Bytes, Wire> wireType;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ReplicationHub(@NotNull RequestContext requestContext, @NotNull TcpChannelHub tcpChannelHub, @NotNull EventLoop eventLoop, @NotNull AtomicBoolean atomicBoolean, @NotNull Function<Bytes, Wire> function) {
        super(tcpChannelHub, 0L, toUri(requestContext));
        this.eventLoop = eventLoop;
        this.isClosed = atomicBoolean;
        this.wireType = function;
    }

    private static String toUri(@NotNull RequestContext requestContext) {
        StringBuilder sb = new StringBuilder(requestContext.fullName() + "?view=Replication");
        if (requestContext.keyType() != String.class) {
            sb.append("&keyType=").append(requestContext.keyType().getName());
        }
        if (requestContext.valueType() != String.class) {
            sb.append("&valueType=").append(requestContext.valueType().getName());
        }
        return sb.toString();
    }

    public void bootstrap(@NotNull final EngineReplication engineReplication, final byte b, final byte b2) {
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, this.csp, b, "ReplicationHub bootstrap") { // from class: net.openhft.chronicle.engine.map.ReplicationHub.1
            public void onSubscribe(@NotNull WireOut wireOut) {
                if (ReplicationHub.LOG.isDebugEnabled()) {
                    ReplicationHub.LOG.debug("onSubscribe - localIdentifier=" + ((int) b) + ",remoteIdentifier=" + ((int) b2));
                }
                wireOut.writeEventName(ReplicationHandler.EventId.identifier).marshallable(WriteMarshallable.EMPTY).writeComment(toString() + ", tcpChannelHub={" + ReplicationHub.this.hub.toString() + "}");
            }

            public void onConsumer(@NotNull WireIn wireIn) {
                if (Jvm.isDebug()) {
                    System.out.println("client : bootstrap");
                }
                byte b3 = b;
                EngineReplication engineReplication2 = engineReplication;
                wireIn.readDocument((ReadMarshallable) null, wireIn2 -> {
                    ReplicationHub.this.onConnected(b3, wireIn2.read(ReplicationHandler.EventId.identifierReply).int8(), engineReplication2);
                });
            }

            @NotNull
            public String toString() {
                return "bootstrap {localIdentifier=" + ((int) b) + " ,remoteIdentifier=" + ((int) b2) + "}";
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onConnected(byte b, final byte b2, @NotNull final EngineReplication engineReplication) {
        final EngineReplication.ModificationIterator acquireModificationIterator = engineReplication.acquireModificationIterator(b2);
        if (!$assertionsDisabled && acquireModificationIterator == null) {
            throw new AssertionError();
        }
        long lastModificationTime = engineReplication.lastModificationTime(b2);
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.lastUpdatedTime(lastModificationTime);
        bootstrap.identifier(b);
        this.hub.subscribe(new AbstractAsyncTemporarySubscription(this.hub, this.csp, b, "replication onConnected") { // from class: net.openhft.chronicle.engine.map.ReplicationHub.2
            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName(MapWireHandler.EventId.bootstrap).typedMarshallable(bootstrap);
            }

            public void onConsumer(@NotNull WireIn wireIn) {
                if (Jvm.isDebug()) {
                    System.out.println("client : onConnected - publishing updates");
                }
                EngineReplication.ModificationIterator modificationIterator = acquireModificationIterator;
                byte b3 = b2;
                EngineReplication engineReplication2 = engineReplication;
                wireIn.readDocument((ReadMarshallable) null, wireIn2 -> {
                    StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
                    ValueIn readEventName = wireIn2.readEventName(acquireStringBuilder);
                    if (ReplicationHandler.EventId.bootstrap.contentEquals(acquireStringBuilder)) {
                        try {
                            ReplicationHub.this.publish(modificationIterator, readEventName.typedMarshallable(), b3);
                            return;
                        } catch (Exception e) {
                            ReplicationHub.LOG.error("", e);
                            return;
                        }
                    }
                    if (ReplicationHandler.EventId.replicationEvent.contentEquals(acquireStringBuilder)) {
                        engineReplication2.applyReplication((EngineReplication.ReplicationEntry) readEventName.typedMarshallable());
                    } else if (CoreFields.lastUpdateTime.contentEquals(acquireStringBuilder)) {
                        if (Jvm.isDebug()) {
                            System.out.println("server : received lastUpdateTime");
                        }
                        engineReplication2.setLastModificationTime(wireIn2.read(() -> {
                            return "id";
                        }).int8(), readEventName.int64());
                    }
                });
            }
        });
    }

    void publish(@NotNull final EngineReplication.ModificationIterator modificationIterator, @NotNull Bootstrap bootstrap, final byte b) {
        final TcpChannelHub tcpChannelHub = this.hub;
        EventLoop eventLoop = this.eventLoop;
        eventLoop.getClass();
        modificationIterator.setModificationNotifier(eventLoop::unpause);
        this.eventLoop.addHandler(true, new EventHandler() { // from class: net.openhft.chronicle.engine.map.ReplicationHub.3
            final Wire wire;
            boolean hasLogged;
            final Bytes bytes = Bytes.elasticByteBuffer();
            boolean hasSentLastUpdateTime = false;
            long lastUpdateTime = 0;

            {
                this.wire = (Wire) ReplicationHub.this.wireType.apply(this.bytes);
            }

            public boolean action() throws InvalidEventHandlerException {
                if (tcpChannelHub.isOutBytesLocked() || !tcpChannelHub.isOutBytesEmpty()) {
                    return false;
                }
                if (ReplicationHub.this.isClosed.get()) {
                    throw new InvalidEventHandlerException();
                }
                this.bytes.clear();
                if (modificationIterator.hasNext() || this.hasSentLastUpdateTime || this.lastUpdateTime <= 0) {
                    modificationIterator.nextEntry(replicationEntry -> {
                        long max = Math.max(this.lastUpdateTime, replicationEntry.timestamp());
                        if (max > this.lastUpdateTime) {
                            this.hasSentLastUpdateTime = false;
                            this.lastUpdateTime = max;
                        }
                        this.wire.writeNotReadyDocument(false, wireOut -> {
                            wireOut.writeEventName(ReplicationHandler.EventId.replicationEvent).typedMarshallable(replicationEntry);
                        });
                    });
                    if (this.bytes.readRemaining() <= 0) {
                        return false;
                    }
                    ReplicationHub.this.sendBytes(this.bytes, false);
                    return true;
                }
                Wire wire = this.wire;
                byte b2 = b;
                wire.writeNotReadyDocument(false, wireOut -> {
                    wireOut.writeEventName(CoreFields.lastUpdateTime).int64(this.lastUpdateTime);
                    wireOut.write(() -> {
                        return "id";
                    }).int8(b2);
                });
                this.hasSentLastUpdateTime = true;
                if (!this.hasLogged) {
                    this.hasLogged = true;
                }
                if (this.bytes.readRemaining() <= 0) {
                    return false;
                }
                ReplicationHub.this.sendBytes(this.bytes, false);
                return true;
            }

            public HandlerPriority priority() {
                return HandlerPriority.REPLICATION;
            }
        });
        modificationIterator.dirtyEntries(bootstrap.lastUpdatedTime());
    }

    static {
        $assertionsDisabled = !ReplicationHub.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ChronicleMapKeyValueStore.class);
    }
}
