package org.infinispan.xsite.events;

import au.csiro.pathling.shaded.com.fasterxml.jackson.databind.deser.DeserializerCache;
import ch.qos.logback.core.spi.AbstractComponentTracker;
import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStarted;
import org.infinispan.notifications.cachemanagerlistener.annotation.SiteViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.CacheStartedEvent;
import org.infinispan.notifications.cachemanagerlistener.event.SitesViewChangedEvent;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.XSiteCacheMapper;
import org.infinispan.xsite.XSiteNamedCache;
import org.infinispan.xsite.commands.XSiteLocalEventCommand;
import org.infinispan.xsite.commands.remote.XSiteRemoteEventCommand;
import org.infinispan.xsite.statetransfer.XSiteStateTransferManager;

@Scope(Scopes.GLOBAL)
@Listener
/* loaded from: input_file:org/infinispan/xsite/events/XSiteEventsManagerImpl.class */
public class XSiteEventsManagerImpl implements XSiteEventsManager {
    private static final int[] BACK_OFF_DELAYS = {200, 500, 1000, DeserializerCache.DEFAULT_MAX_CACHE_SIZE, 5000};
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());

    @Inject
    Transport transport;

    @Inject
    CacheManagerNotifier notifier;

    @Inject
    GlobalComponentRegistry globalRegistry;

    @Inject
    XSiteCacheMapper xSiteCacheMapper;
    private Executor backOffExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/xsite/events/XSiteEventsManagerImpl$BackOffSender.class */
    public class BackOffSender implements Runnable, Function<Throwable, Void> {
        private final XSiteRemoteEventCommand cmd;
        private final XSiteBackup backup;

        @GuardedBy("this")
        private int backoffStep;

        private BackOffSender(XSiteRemoteEventCommand xSiteRemoteEventCommand, XSiteBackup xSiteBackup) {
            this.cmd = xSiteRemoteEventCommand;
            this.backup = xSiteBackup;
        }

        @Override // java.lang.Runnable
        public void run() {
            XSiteEventsManagerImpl.log.debugf("Sending %s to %s", this.cmd, this.backup);
            XSiteEventsManagerImpl.this.transport.backupRemotely(this.backup, this.cmd).exceptionally(this);
        }

        @Override // java.util.function.Function
        public Void apply(Throwable th) {
            int nextBackOffStep = nextBackOffStep();
            if (nextBackOffStep >= XSiteEventsManagerImpl.BACK_OFF_DELAYS.length) {
                XSiteEventsManagerImpl.log.debugf(th, "Failed to send %s to %s", this.cmd, this.cmd);
                return null;
            }
            XSiteEventsManagerImpl.log.debugf(th, "Sending %s to %s with delay of %s milliseconds", this.cmd, this.backup, Integer.valueOf(XSiteEventsManagerImpl.BACK_OFF_DELAYS[nextBackOffStep]));
            XSiteEventsManagerImpl.this.delayExecutor(nextBackOffStep).execute(this);
            return null;
        }

        private synchronized int nextBackOffStep() {
            int i = this.backoffStep;
            this.backoffStep = i + 1;
            return i;
        }
    }

    @Start
    public void start() {
        this.notifier.addListener(this);
    }

    @Stop
    public void stop() {
        this.notifier.removeListener(this);
    }

    @Inject
    public void createExecutor(BlockingManager blockingManager) {
        this.backOffExecutor = blockingManager.asExecutor("x-site-evt-backoff");
    }

    @Override // org.infinispan.xsite.events.XSiteEventsManager
    public CompletionStage<Void> onLocalEvents(List<XSiteEvent> list) {
        log.debugf("Local events received: %s", list);
        try {
            XSiteEventSender xSiteEventSender = new XSiteEventSender(this::sendWithBackOff);
            try {
                for (XSiteEvent xSiteEvent : list) {
                    switch (xSiteEvent.getType()) {
                        case SITE_CONNECTED:
                            onRemoteSiteConnected(xSiteEvent.getSiteName(), xSiteEventSender);
                            break;
                        case STATE_REQUEST:
                        case INITIAL_STATE_REQUEST:
                            onRemoteSiteStateRequest(xSiteEvent.getSiteName(), xSiteEvent.getCacheName(), xSiteEvent.getType() == XSiteEventType.INITIAL_STATE_REQUEST);
                            break;
                        default:
                            log.debugf("Unknown event received: %s", xSiteEvent);
                            break;
                    }
                }
                xSiteEventSender.close();
                return CompletableFutures.completedNull();
            } finally {
            }
        } catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override // org.infinispan.xsite.events.XSiteEventsManager
    public CompletionStage<Void> onRemoteEvents(List<XSiteEvent> list) {
        log.debugf("Remote events received: %s", list);
        if (this.transport.isCoordinator()) {
            return onLocalEvents(list);
        }
        try {
            log.debugf("Forwarding events to coordinator: %s", list);
            this.transport.sendTo(this.transport.getCoordinator(), new XSiteLocalEventCommand(list), DeliverOrder.PER_SENDER_NO_FC);
            return CompletableFutures.completedNull();
        } catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @SiteViewChanged
    public void onSiteViewChanged(SitesViewChangedEvent sitesViewChangedEvent) {
        if (this.transport.isPrimaryRelayNode()) {
            log.debugf("On site view changed event: %s", sitesViewChangedEvent);
            sitesViewChangedEvent.getJoiners().stream().filter(str -> {
                return !Objects.equals(str, localSite().toString());
            }).forEach(this::sendNewConnectionEvent);
        }
    }

    @CacheStarted
    public void onCacheStarted(CacheStartedEvent cacheStartedEvent) {
        log.debugf("On cache started (is coordinator? %s): %s", Boolean.valueOf(this.transport.isCoordinator()), cacheStartedEvent.getCacheName());
        if (this.transport.isCoordinator()) {
            try {
                XSiteEventSender xSiteEventSender = new XSiteEventSender(this::sendWithBackOff);
                try {
                    this.xSiteCacheMapper.findRemoteCachesWithAsyncBackup(cacheStartedEvent.getCacheName()).forEach(remoteCacheInfo -> {
                        xSiteEventSender.addEventToSite(remoteCacheInfo.siteName(), XSiteEvent.createInitialStateRequest(localSite(), remoteCacheInfo.cacheName()));
                    });
                    xSiteEventSender.close();
                } finally {
                }
            } catch (Exception e) {
                log.debugf(e, "Unable to send state request for cache %s", cacheStartedEvent.getCacheName());
            }
        }
    }

    private void onRemoteSiteConnected(ByteString byteString, XSiteEventSender xSiteEventSender) {
        Iterator<ByteString> it = this.xSiteCacheMapper.remoteCachesFromSite(byteString).iterator();
        while (it.hasNext()) {
            xSiteEventSender.addEventToSite(byteString, XSiteEvent.createRequestState(localSite(), it.next()));
        }
    }

    private void sendNewConnectionEvent(String str) {
        XSiteRemoteEventCommand xSiteRemoteEventCommand = new XSiteRemoteEventCommand(List.of(XSiteEvent.createConnectEvent(localSite())));
        XSiteBackup xSiteBackup = new XSiteBackup(str, false, AbstractComponentTracker.LINGERING_TIMEOUT);
        log.debugf("Sending connection event to %s: %s", xSiteBackup, xSiteRemoteEventCommand);
        sendWithBackOff(xSiteBackup, xSiteRemoteEventCommand);
    }

    private void onRemoteSiteStateRequest(ByteString byteString, ByteString byteString2, boolean z) {
        ComponentRegistry namedComponentRegistry = this.globalRegistry.getNamedComponentRegistry(byteString2);
        if (namedComponentRegistry == null) {
            log.debugf("State Transfer request from site '%s' and cache '%s' failed. Cache does no exist.", byteString, byteString2);
            return;
        }
        ComponentRef<XSiteStateTransferManager> xSiteStateTransferManager = namedComponentRegistry.getXSiteStateTransferManager();
        if (xSiteStateTransferManager.isRunning()) {
            xSiteStateTransferManager.running().startAutomaticStateTransferTo(byteString, z);
        } else {
            log.debugf("State Transfer request from site '%s' and cache '%s' failed. Cache is not started.", byteString, byteString2);
        }
    }

    private ByteString localSite() {
        return XSiteNamedCache.cachedByteString(this.transport.localSiteName());
    }

    private void sendWithBackOff(XSiteBackup xSiteBackup, XSiteRemoteEventCommand xSiteRemoteEventCommand) {
        if (this.transport.localSiteName().equals(xSiteBackup.getSiteName())) {
            return;
        }
        new BackOffSender(xSiteRemoteEventCommand, xSiteBackup).run();
    }

    private Executor delayExecutor(int i) {
        return CompletableFuture.delayedExecutor(BACK_OFF_DELAYS[i], TimeUnit.MILLISECONDS, this.backOffExecutor);
    }
}
