package org.apache.activemq.artemis.protocol.amqp.federation.internal;

import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.class */
public abstract class FederationQueuePolicyManager implements ActiveMQServerConsumerPlugin, ActiveMQServerBindingPlugin {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final ActiveMQServer server;
    protected final Predicate<ServerConsumer> federationConsumerMatcher;
    protected final FederationReceiveFromQueuePolicy policy;
    protected final Map<FederationConsumerInfo, FederationQueueEntry> demandTracking = new HashMap();
    protected final FederationInternal federation;
    private volatile boolean started;

    public FederationQueuePolicyManager(FederationInternal federationInternal, FederationReceiveFromQueuePolicy federationReceiveFromQueuePolicy) throws ActiveMQException {
        Objects.requireNonNull(federationInternal, "The Federation instance cannot be null");
        Objects.requireNonNull(federationReceiveFromQueuePolicy, "The Queue match policy cannot be null");
        this.federation = federationInternal;
        this.policy = federationReceiveFromQueuePolicy;
        this.server = federationInternal.getServer();
        this.federationConsumerMatcher = createFederationConsumerMatcher(this.server, federationReceiveFromQueuePolicy);
    }

    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.started = true;
        handlePolicyManagerStarted(this.policy);
        this.server.registerBrokerPlugin(this);
        scanAllQueueBindings();
    }

    public synchronized void stop() {
        if (this.started) {
            this.server.unRegisterBrokerPlugin(this);
            this.started = false;
            this.demandTracking.forEach((federationConsumerInfo, federationQueueEntry) -> {
                if (federationQueueEntry.hasConsumer()) {
                    federationQueueEntry.getConsumer().close();
                }
            });
            this.demandTracking.clear();
        }
    }

    @Override // org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin
    public synchronized void afterCreateConsumer(ServerConsumer serverConsumer) {
        if (this.started) {
            reactIfConsumerMatchesPolicy(serverConsumer);
        }
    }

    @Override // org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin
    public synchronized void afterCloseConsumer(ServerConsumer serverConsumer, boolean z) {
        if (this.started) {
            String simpleString = serverConsumer.getQueue().getName().toString();
            FederationConsumerInfo createConsumerInfo = createConsumerInfo(serverConsumer);
            FederationQueueEntry federationQueueEntry = this.demandTracking.get(createConsumerInfo);
            if (federationQueueEntry == null) {
                return;
            }
            federationQueueEntry.removeDemand(serverConsumer);
            logger.trace("Reducing demand on federated queue {}, remaining demand? {}", simpleString, Boolean.valueOf(federationQueueEntry.hasDemand()));
            if (federationQueueEntry.hasDemand() || !federationQueueEntry.hasConsumer()) {
                return;
            }
            FederationConsumerInternal consumer = federationQueueEntry.getConsumer();
            try {
                signalBeforeCloseFederationConsumer(consumer);
                consumer.close();
                signalAfterCloseFederationConsumer(consumer);
                this.demandTracking.remove(createConsumerInfo);
            } catch (Throwable th) {
                this.demandTracking.remove(createConsumerInfo);
                throw th;
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin
    public synchronized void afterRemoveBinding(Binding binding, Transaction transaction, boolean z) throws ActiveMQException {
        if (binding instanceof QueueBinding) {
            String simpleString = ((QueueBinding) binding).getQueue().getName().toString();
            this.demandTracking.values().forEach(federationQueueEntry -> {
                if (federationQueueEntry.getConsumerInfo().getQueueName().equals(simpleString) && federationQueueEntry.hasConsumer()) {
                    federationQueueEntry.getConsumer().close();
                }
            });
        }
    }

    protected final void scanAllQueueBindings() {
        this.server.getPostOffice().getAllBindings().filter(binding -> {
            return binding instanceof QueueBinding;
        }).map(binding2 -> {
            return (QueueBinding) binding2;
        }).forEach(queueBinding -> {
            checkQueueForMatch(queueBinding.getQueue());
        });
    }

    protected final void checkQueueForMatch(Queue queue) {
        queue.getConsumers().stream().filter(consumer -> {
            return consumer instanceof ServerConsumer;
        }).map(consumer2 -> {
            return (ServerConsumer) consumer2;
        }).forEach(this::reactIfConsumerMatchesPolicy);
    }

    protected final void reactIfConsumerMatchesPolicy(ServerConsumer serverConsumer) {
        FederationQueueEntry federationQueueEntry;
        String simpleString = serverConsumer.getQueue().getName().toString();
        if (!testIfQueueMatchesPolicy(serverConsumer.getQueueAddress().toString(), simpleString) || this.federationConsumerMatcher.test(serverConsumer)) {
            return;
        }
        logger.trace("Federation Policy matched on consumer for binding: {}", serverConsumer.getBinding());
        FederationConsumerInfo createConsumerInfo = createConsumerInfo(serverConsumer);
        if (this.demandTracking.containsKey(createConsumerInfo)) {
            logger.trace("Federation Queue Policy manager found existing demand for queue: {}, adding demand", simpleString);
            federationQueueEntry = this.demandTracking.get(createConsumerInfo);
        } else {
            Map<FederationConsumerInfo, FederationQueueEntry> map = this.demandTracking;
            FederationQueueEntry createConsumerEntry = createConsumerEntry(createConsumerInfo);
            federationQueueEntry = createConsumerEntry;
            map.put(createConsumerInfo, createConsumerEntry);
        }
        federationQueueEntry.addDemand(serverConsumer);
        tryCreateFederationConsumerForQueue(federationQueueEntry, serverConsumer.getQueue());
    }

    private void tryCreateFederationConsumerForQueue(FederationQueueEntry federationQueueEntry, Queue queue) {
        if (!federationQueueEntry.hasDemand() || federationQueueEntry.hasConsumer() || isPluginBlockingFederationConsumerCreate(queue)) {
            return;
        }
        logger.trace("Federation Queue Policy manager creating remote consumer for queue: {}", federationQueueEntry.getQueueName());
        signalBeforeCreateFederationConsumer(federationQueueEntry.getConsumerInfo());
        FederationConsumerInternal createFederationConsumer = createFederationConsumer(federationQueueEntry.getConsumerInfo());
        createFederationConsumer.setRemoteClosedHandler(federationConsumerInternal -> {
            synchronized (this) {
                try {
                    FederationQueueEntry federationQueueEntry2 = this.demandTracking.get(federationConsumerInternal.getConsumerInfo());
                    if (federationQueueEntry2 != null) {
                        federationQueueEntry2.clearConsumer();
                    }
                    federationConsumerInternal.close();
                } catch (Throwable th) {
                    federationConsumerInternal.close();
                    throw th;
                }
            }
        });
        federationQueueEntry.setConsumer(createFederationConsumer);
        createFederationConsumer.start();
        signalAfterCreateFederationConsumer(createFederationConsumer);
    }

    public synchronized void afterRemoteQueueAdded(String str, String str2) throws Exception {
        Queue locateQueue;
        if (this.started && testIfQueueMatchesPolicy(str2) && (locateQueue = this.server.locateQueue(str2)) != null) {
            this.demandTracking.forEach((federationConsumerInfo, federationQueueEntry) -> {
                if (federationConsumerInfo.getQueueName().equals(str2)) {
                    tryCreateFederationConsumerForQueue(federationQueueEntry, locateQueue);
                }
            });
        }
    }

    protected boolean testIfQueueMatchesPolicy(String str, String str2) {
        return this.policy.test(str, str2);
    }

    protected boolean testIfQueueMatchesPolicy(String str) {
        return this.policy.testQueue(str);
    }

    protected abstract void handlePolicyManagerStarted(FederationReceiveFromQueuePolicy federationReceiveFromQueuePolicy);

    protected abstract FederationConsumerInfo createConsumerInfo(ServerConsumer serverConsumer);

    protected FederationQueueEntry createConsumerEntry(FederationConsumerInfo federationConsumerInfo) {
        return new FederationQueueEntry(federationConsumerInfo);
    }

    protected abstract FederationConsumerInternal createFederationConsumer(FederationConsumerInfo federationConsumerInfo);

    protected Predicate<ServerConsumer> createFederationConsumerMatcher(ActiveMQServer activeMQServer, FederationReceiveFromQueuePolicy federationReceiveFromQueuePolicy) throws ActiveMQException {
        if (federationReceiveFromQueuePolicy.isIncludeFederated()) {
            return serverConsumer -> {
                return false;
            };
        }
        Filter createFilter = FilterImpl.createFilter("\"federation-name\" IS NOT NULL");
        return serverConsumer2 -> {
            ServerSession sessionByID = activeMQServer.getSessionByID(serverConsumer2.getSessionID());
            if (sessionByID == null || sessionByID.getMetaData() == null) {
                return false;
            }
            return createFilter.match(sessionByID.getMetaData());
        };
    }

    protected abstract void signalBeforeCreateFederationConsumer(FederationConsumerInfo federationConsumerInfo);

    protected abstract void signalAfterCreateFederationConsumer(FederationConsumer federationConsumer);

    protected abstract void signalBeforeCloseFederationConsumer(FederationConsumer federationConsumer);

    protected abstract void signalAfterCloseFederationConsumer(FederationConsumer federationConsumer);

    protected abstract boolean isPluginBlockingFederationConsumerCreate(Queue queue);
}
