package org.apache.activemq.artemis.shaded.org.jgroups.protocols;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.activemq.artemis.shaded.org.jgroups.Address;
import org.apache.activemq.artemis.shaded.org.jgroups.Message;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.MBean;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.ManagedAttribute;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.Property;
import org.apache.activemq.artemis.shaded.org.jgroups.conf.AttributeType;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Credit;
import org.apache.activemq.artemis.shaded.org.jgroups.util.NonBlockingCredit;

@MBean(description = "Simple non-blocking flow control protocol based on a credit system")
/* loaded from: input_file:org/apache/activemq/artemis/shaded/org/jgroups/protocols/UFC_NB.class */
public class UFC_NB extends UFC {

    @Property(description = "Max number of bytes of all queued messages for a given destination. If a given destination has no credits left and the message cannot be added to the queue because it is full, then the sender thread will be blocked until there is again space available in the queue, or the protocol is stopped.", type = AttributeType.BYTES)
    protected int max_queue_size = 10000000;
    protected final Consumer<Message> send_function = message -> {
        this.down_prot.down(message);
    };
    protected Future<?> credit_send_task;

    public int getMaxQueueSize() {
        return this.max_queue_size;
    }

    public UFC_NB setMaxQueueSize(int i) {
        this.max_queue_size = i;
        return this;
    }

    @ManagedAttribute(description = "The number of messages currently queued due to insufficient credit", type = AttributeType.SCALAR)
    public int getNumberOfQueuedMessages() {
        return ((Integer) this.sent.values().stream().map(credit -> {
            return Integer.valueOf(((NonBlockingCredit) credit).getQueuedMessages());
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }

    @ManagedAttribute(description = "The total size of all currently queued messages for all destinations", type = AttributeType.BYTES)
    public int getQueuedSize() {
        return ((Integer) this.sent.values().stream().map(credit -> {
            return Integer.valueOf(((NonBlockingCredit) credit).getQueuedMessageSize());
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }

    @ManagedAttribute(description = "The number of times messages have been queued due to insufficient credits", type = AttributeType.SCALAR)
    public int getNumberOfQueuings() {
        return ((Integer) this.sent.values().stream().map(credit -> {
            return Integer.valueOf(((NonBlockingCredit) credit).getEnqueuedMessages());
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }

    public boolean isQueuingTo(Address address) {
        NonBlockingCredit nonBlockingCredit = (NonBlockingCredit) this.sent.get(address);
        return nonBlockingCredit != null && nonBlockingCredit.isQueuing();
    }

    public int getQueuedMessagesTo(Address address) {
        NonBlockingCredit nonBlockingCredit = (NonBlockingCredit) this.sent.get(address);
        if (nonBlockingCredit != null) {
            return nonBlockingCredit.getQueuedMessages();
        }
        return 0;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.FlowControl, org.apache.activemq.artemis.shaded.org.jgroups.stack.Protocol, org.apache.activemq.artemis.shaded.org.jgroups.Lifecycle
    public void start() throws Exception {
        super.start();
        if (this.max_block_time > 0) {
            this.credit_send_task = getTransport().getTimer().scheduleWithFixedDelay(this::sendCreditRequestsIfNeeded, this.max_block_time, this.max_block_time, TimeUnit.MILLISECONDS);
        }
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.UFC, org.apache.activemq.artemis.shaded.org.jgroups.protocols.FlowControl, org.apache.activemq.artemis.shaded.org.jgroups.stack.Protocol, org.apache.activemq.artemis.shaded.org.jgroups.Lifecycle
    public void stop() {
        super.stop();
        if (this.credit_send_task != null) {
            this.credit_send_task.cancel(true);
        }
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.UFC, org.apache.activemq.artemis.shaded.org.jgroups.protocols.FlowControl
    protected Object handleDownMessage(Message message, int i) {
        Address dest = message.getDest();
        if (dest == null) {
            this.log.error("%s doesn't handle multicast messages; passing message down", getClass().getSimpleName());
            return this.down_prot.down(message);
        }
        Credit credit = this.sent.get(dest);
        if (credit != null && this.running && !credit.decrementIfEnoughCredits(message, i, 0L)) {
            if (!credit.needToSendCreditRequest(this.max_block_time)) {
                return null;
            }
            sendCreditRequest(dest, Math.max(0L, this.max_credits - credit.get()));
            return null;
        }
        return this.down_prot.down(message);
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.UFC
    protected <T extends Credit> T createCredit(int i) {
        return new NonBlockingCredit(i, this.max_queue_size, new ReentrantLock(true), this.send_function);
    }

    protected void sendCreditRequestsIfNeeded() {
        this.sent.forEach((address, credit) -> {
            NonBlockingCredit nonBlockingCredit = (NonBlockingCredit) credit;
            if (nonBlockingCredit.get() < this.min_credits && nonBlockingCredit.isQueuing() && nonBlockingCredit.needToSendCreditRequest(this.max_block_time)) {
                sendCreditRequest(address, Math.max(0L, this.max_credits - nonBlockingCredit.get()));
            }
        });
    }
}
