package org.apache.activemq.artemis.core.server.cluster.impl;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;

/* loaded from: input_file:BOOT-INF/lib/artemis-server-2.17.0.jar:org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.class */
public class Redistributor implements Consumer {
    private boolean active;
    private final StorageManager storageManager;
    private final PostOffice postOffice;
    private final Executor executor;
    private final int batchSize;
    private final Queue queue;
    private int count;
    private final long sequentialID;
    private ReusableLatch pendingRuns = new ReusableLatch();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/artemis-server-2.17.0.jar:org/apache/activemq/artemis/core/server/cluster/impl/Redistributor$Prompter.class */
    public class Prompter implements Runnable {
        private Prompter() {
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (Redistributor.this) {
                Redistributor.this.active = true;
                Redistributor.this.queue.deliverAsync();
            }
        }
    }

    public Redistributor(Queue queue, StorageManager storageManager, PostOffice postOffice, Executor executor, int i) {
        this.queue = queue;
        this.sequentialID = storageManager.generateID();
        this.storageManager = storageManager;
        this.postOffice = postOffice;
        this.executor = executor;
        this.batchSize = i;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public long sequentialID() {
        return this.sequentialID;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public Filter getFilter() {
        return null;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String debug() {
        return toString();
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String toManagementString() {
        return "Redistributor[" + ((Object) this.queue.getName()) + "/" + this.queue.getID() + "]";
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void disconnect() {
    }

    public synchronized void start() {
        this.active = true;
    }

    public synchronized void stop() throws Exception {
        this.active = false;
        if (flushExecutor()) {
            return;
        }
        ActiveMQServerLogger.LOGGER.errorStoppingRedistributor();
    }

    public synchronized void close() {
        if (!flushExecutor()) {
            throw new IllegalStateException("Timed out waiting for executor to complete");
        }
        this.active = false;
    }

    private boolean flushExecutor() {
        try {
            return this.pendingRuns.await(10000L);
        } catch (InterruptedException e) {
            ActiveMQServerLogger.LOGGER.failedToFlushExecutor(e);
            return false;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public synchronized HandleStatus handle(final MessageReference messageReference) throws Exception {
        if (!this.active) {
            return HandleStatus.BUSY;
        }
        if (messageReference.getMessage().getGroupID() != null) {
            return HandleStatus.NO_MATCH;
        }
        final TransactionImpl transactionImpl = new TransactionImpl(this.storageManager);
        final Pair<RoutingContext, Message> redistribute = this.postOffice.redistribute(messageReference.getMessage(), this.queue, transactionImpl);
        if (redistribute == null) {
            transactionImpl.rollback();
            return HandleStatus.BUSY;
        }
        if (messageReference.getMessage().isLargeMessage()) {
            this.active = false;
            this.executor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.server.cluster.impl.Redistributor.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Redistributor.this.postOffice.processRoute((Message) redistribute.getB(), (RoutingContext) redistribute.getA(), false);
                        Redistributor.this.ackRedistribution(messageReference, transactionImpl);
                        synchronized (Redistributor.this) {
                            Redistributor.this.active = true;
                            Redistributor.access$308(Redistributor.this);
                            Redistributor.this.queue.deliverAsync();
                        }
                    } catch (Exception e) {
                        try {
                            transactionImpl.rollback();
                        } catch (Exception e2) {
                            ActiveMQServerLogger.LOGGER.failedToRollback(e2);
                        }
                    }
                }
            });
        } else {
            this.postOffice.processRoute(redistribute.getB(), redistribute.getA(), false);
            ackRedistribution(messageReference, transactionImpl);
        }
        return HandleStatus.HANDLED;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void proceedDeliver(MessageReference messageReference) {
    }

    private void internalExecute(final Runnable runnable) {
        this.pendingRuns.countUp();
        this.executor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.server.cluster.impl.Redistributor.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    runnable.run();
                } finally {
                    Redistributor.this.pendingRuns.countDown();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ackRedistribution(MessageReference messageReference, Transaction transaction) throws Exception {
        messageReference.handled();
        this.queue.acknowledge(transaction, messageReference);
        transaction.commit();
        this.storageManager.afterCompleteOperations(new IOCallback() { // from class: org.apache.activemq.artemis.core.server.cluster.impl.Redistributor.3
            @Override // org.apache.activemq.artemis.core.io.IOCallback
            public void onError(int i, String str) {
                ActiveMQServerLogger.LOGGER.ioErrorRedistributing(Integer.valueOf(i), str);
            }

            @Override // org.apache.activemq.artemis.core.io.IOCallback
            public void done() {
                Redistributor.this.execPrompter();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void execPrompter() {
        this.count++;
        if (this.count >= this.batchSize) {
            this.active = false;
            this.executor.execute(new Prompter());
            this.count = 0;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public List<MessageReference> getDeliveringMessages() {
        return Collections.emptyList();
    }

    static /* synthetic */ int access$308(Redistributor redistributor) {
        int i = redistributor.count;
        redistributor.count = i + 1;
        return i;
    }
}
