package org.infinispan.notifications.cachelistener.cluster.impl;

import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.ClusteringConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.cachelistener.cluster.ClusterEvent;
import org.infinispan.notifications.cachelistener.cluster.ClusterEventManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl.class */
public class BatchingClusterEventManagerImpl<K, V> implements ClusterEventManager<K, V> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());

    @Inject
    EmbeddedCacheManager cacheManager;

    @Inject
    Configuration configuration;

    @Inject
    RpcManager rpcManager;

    @Inject
    ComponentRef<CommandsFactory> commandsFactory;
    private long timeout;
    private final Map<Object, EventContext<K, V>> eventContextMap = new ConcurrentHashMap();

    /* loaded from: input_file:org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl$EventContext.class */
    private interface EventContext<K, V> {
        void addTargets(Address address, UUID uuid, Collection<ClusterEvent<K, V>> collection, boolean z);

        CompletionStage<Void> sendToTargets();
    }

    /* loaded from: input_file:org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl$TargetEvents.class */
    private static class TargetEvents<K, V> {
        final Map<UUID, Collection<ClusterEvent<K, V>>> events = new HashMap();
        boolean sync = false;

        private TargetEvents() {
        }
    }

    /* loaded from: input_file:org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl$UnicastEventContext.class */
    protected class UnicastEventContext<K, V> implements EventContext<K, V> {
        protected final Map<Address, TargetEvents<K, V>> targets = new HashMap();

        protected UnicastEventContext() {
        }

        @Override // org.infinispan.notifications.cachelistener.cluster.impl.BatchingClusterEventManagerImpl.EventContext
        public void addTargets(Address address, UUID uuid, Collection<ClusterEvent<K, V>> collection, boolean z) {
            TargetEvents<K, V> targetEvents = this.targets.get(address);
            if (targetEvents == null) {
                targetEvents = new TargetEvents<>();
                this.targets.put(address, targetEvents);
            }
            Collection<ClusterEvent<K, V>> put = targetEvents.events.put(uuid, collection);
            if (put != null) {
                collection.addAll(put);
            }
            if (z) {
                targetEvents.sync = true;
            }
        }

        @Override // org.infinispan.notifications.cachelistener.cluster.impl.BatchingClusterEventManagerImpl.EventContext
        public CompletionStage<Void> sendToTargets() {
            AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            CommandsFactory running = BatchingClusterEventManagerImpl.this.commandsFactory.running();
            for (Map.Entry<Address, TargetEvents<K, V>> entry : this.targets.entrySet()) {
                TargetEvents<K, V> value = entry.getValue();
                CompletionStage<?> invokeCommand = BatchingClusterEventManagerImpl.this.rpcManager.invokeCommand(entry.getKey(), running.buildMultiClusterEventCommand(value.events), SingleResponseCollector.validOnly(), new RpcOptions(DeliverOrder.NONE, BatchingClusterEventManagerImpl.this.timeout, TimeUnit.MILLISECONDS));
                if (value.sync) {
                    aggregateCompletionStage.dependsOn(invokeCommand);
                }
            }
            return aggregateCompletionStage.freeze();
        }
    }

    @Start
    public void start() {
        this.timeout = this.configuration.clustering().remoteTimeout();
        this.configuration.clustering().attributes().attribute(ClusteringConfiguration.REMOTE_TIMEOUT).addListener((attribute, l) -> {
            this.timeout = ((Long) attribute.get()).longValue();
        });
    }

    @Override // org.infinispan.notifications.cachelistener.cluster.ClusterEventManager
    public void addEvents(Object obj, Address address, UUID uuid, Collection<ClusterEvent<K, V>> collection, boolean z) {
        this.eventContextMap.compute(obj, (obj2, eventContext) -> {
            if (eventContext == null) {
                if (log.isTraceEnabled()) {
                    log.tracef("Created new unicast event context for identifier %s", obj);
                }
                eventContext = new UnicastEventContext();
            }
            if (log.isTraceEnabled()) {
                log.tracef("Adding new events %s for identifier %s", collection, obj);
            }
            eventContext.addTargets(address, uuid, collection, z);
            return eventContext;
        });
    }

    @Override // org.infinispan.notifications.cachelistener.cluster.ClusterEventManager
    public CompletionStage<Void> sendEvents(Object obj) {
        EventContext<K, V> remove = this.eventContextMap.remove(obj);
        if (remove != null) {
            if (log.isTraceEnabled()) {
                log.tracef("Sending events for identifier %s", obj);
            }
            return remove.sendToTargets();
        }
        if (log.isTraceEnabled()) {
            log.tracef("No events to send for identifier %s", obj);
        }
        return CompletableFutures.completedNull();
    }

    @Override // org.infinispan.notifications.cachelistener.cluster.ClusterEventManager
    public void dropEvents(Object obj) {
        EventContext<K, V> remove = this.eventContextMap.remove(obj);
        if (log.isTraceEnabled()) {
            if (remove != null) {
                log.tracef("Dropping events for identifier %s", obj);
            } else {
                log.tracef("No events to drop for identifier %s", obj);
            }
        }
    }
}
