package org.apache.qpid.server.queue;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/qpid/server/queue/AbstractQueueTestBase.class */
public abstract class AbstractQueueTestBase extends QpidTestCase {
    private static final Logger _logger = LoggerFactory.getLogger(AbstractQueueTestBase.class);
    private static final long QUEUE_RUNNER_WAIT_TIME = Long.getLong("AbstractQueueTestBase.queueRunnerWaitTime", 150).longValue();
    private AMQQueue<?> _queue;
    private VirtualHostImpl _virtualHost;
    private DirectExchange _exchange;
    private QueueConsumer<?> _consumer;
    private String _qname = "qname";
    private String _owner = "owner";
    private String _routingKey = "routing key";
    private MockConsumer _consumerTarget = new MockConsumer();
    private Map<String, Object> _arguments = Collections.emptyMap();

    /* loaded from: input_file:org/apache/qpid/server/queue/AbstractQueueTestBase$EntryListAddingAction.class */
    private static class EntryListAddingAction implements Action<MessageInstance> {
        private final ArrayList<QueueEntry> _queueEntries;

        public EntryListAddingAction(ArrayList<QueueEntry> arrayList) {
            this._queueEntries = arrayList;
        }

        public void performAction(MessageInstance messageInstance) {
            this._queueEntries.add((QueueEntry) messageInstance);
        }
    }

    public void setUp() throws Exception {
        super.setUp();
        BrokerTestHelper.setUp();
        this._virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
        HashMap hashMap = new HashMap(this._arguments);
        hashMap.put("name", this._qname);
        hashMap.put("owner", this._owner);
        this._queue = this._virtualHost.createQueue(hashMap);
        this._exchange = this._virtualHost.getChildByName(Exchange.class, "amq.direct");
    }

    public void tearDown() throws Exception {
        try {
            this._queue.close();
            this._virtualHost.close();
            BrokerTestHelper.tearDown();
            super.tearDown();
        } catch (Throwable th) {
            BrokerTestHelper.tearDown();
            super.tearDown();
            throw th;
        }
    }

    public void testCreateQueue() throws Exception {
        this._queue.close();
        try {
            this._queue = this._virtualHost.createQueue(new HashMap(this._arguments));
            assertNull("Queue was created", this._queue);
        } catch (IllegalArgumentException e) {
            assertTrue("Exception was not about missing name", e.getMessage().contains("name"));
        }
        HashMap hashMap = new HashMap(this._arguments);
        hashMap.put("name", "differentName");
        this._queue = this._virtualHost.createQueue(hashMap);
        assertNotNull("Queue was not created", this._queue);
    }

    public void testGetVirtualHost() {
        assertEquals("Virtual host was wrong", this._virtualHost, this._queue.getVirtualHost());
    }

    public void testBinding() {
        this._exchange.addBinding(this._routingKey, this._queue, Collections.EMPTY_MAP);
        assertTrue("Routing key was not bound", this._exchange.isBound(this._routingKey));
        assertTrue("Queue was not bound to key", this._exchange.isBound(this._routingKey, this._queue));
        assertEquals("Exchange binding count", 1, this._queue.getBindings().size());
        BindingImpl bindingImpl = (BindingImpl) this._queue.getBindings().iterator().next();
        assertEquals("Wrong exchange bound", this._routingKey, bindingImpl.getBindingKey());
        assertEquals("Wrong exchange bound", this._exchange, bindingImpl.getExchange());
        this._exchange.deleteBinding(this._routingKey, this._queue);
        assertFalse("Routing key was still bound", this._exchange.isBound(this._routingKey));
    }

    public void testRegisterConsumerThenEnqueueMessage() throws Exception {
        ServerMessage createMessage = createMessage(new Long(24L));
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        assertEquals("Queue does not have consumer", 1, this._queue.getConsumerCount());
        assertEquals("Queue does not have active consumer", 1, this._queue.getConsumerCountWithCredit());
        this._queue.enqueue(createMessage, (Action) null, (MessageEnqueueRecord) null);
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
        }
        assertEquals(createMessage, this._consumer.getQueueContext().getLastSeenEntry().getMessage());
        assertNull(this._consumer.getQueueContext().getReleasedEntry());
        this._consumer.close();
        assertTrue("Consumer still had queue", this._consumerTarget.isClosed());
        assertFalse("Queue still has consumer", 1 == this._queue.getConsumerCount());
        assertFalse("Queue still has active consumer", 1 == this._queue.getConsumerCountWithCredit());
        this._queue.enqueue(createMessage(new Long(25L)), (Action) null, (MessageEnqueueRecord) null);
        assertNull(this._consumer.getQueueContext());
    }

    public void testEnqueueMessageThenRegisterConsumer() throws Exception, InterruptedException {
        ServerMessage createMessage = createMessage(new Long(24L));
        this._queue.enqueue(createMessage, (Action) null, (MessageEnqueueRecord) null);
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        Thread.sleep(150L);
        assertEquals(createMessage, this._consumer.getQueueContext().getLastSeenEntry().getMessage());
        assertNull("There should be no releasedEntry after an enqueue", this._consumer.getQueueContext().getReleasedEntry());
    }

    public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception {
        ServerMessage createMessage = createMessage(new Long(24L));
        ServerMessage createMessage2 = createMessage(new Long(25L));
        this._queue.enqueue(createMessage, (Action) null, (MessageEnqueueRecord) null);
        this._queue.enqueue(createMessage2, (Action) null, (MessageEnqueueRecord) null);
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        Thread.sleep(150L);
        assertEquals(createMessage2, this._consumer.getQueueContext().getLastSeenEntry().getMessage());
        assertNull("There should be no releasedEntry after enqueues", this._consumer.getQueueContext().getReleasedEntry());
    }

    public void testMessageHeldIfNotYetValidWhenConsumerAdded() throws Exception {
        this._queue.close();
        HashMap hashMap = new HashMap(this._arguments);
        hashMap.put("name", this._qname);
        hashMap.put("owner", this._owner);
        hashMap.put("holdOnPublishEnabled", Boolean.TRUE);
        this._queue = this._virtualHost.createChild(Queue.class, hashMap, new ConfiguredObject[0]);
        ServerMessage createMessage = createMessage(new Long(24L));
        AMQMessageHeader messageHeader = createMessage.getMessageHeader();
        Mockito.when(Long.valueOf(messageHeader.getNotValidBefore())).thenReturn(Long.valueOf(System.currentTimeMillis() + 20000));
        this._queue.enqueue(createMessage, (Action) null, (MessageEnqueueRecord) null);
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
        assertEquals("Message which was not yet valid was received", 0, this._consumerTarget.getMessages().size());
        Mockito.when(Long.valueOf(messageHeader.getNotValidBefore())).thenReturn(Long.valueOf(System.currentTimeMillis() - 100));
        this._queue.checkMessageStatus();
        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
        assertEquals("Message which was valid was not received", 1, this._consumerTarget.getMessages().size());
    }

    public void testMessageHoldingDependentOnQueueProperty() throws Exception {
        this._queue.close();
        HashMap hashMap = new HashMap(this._arguments);
        hashMap.put("name", this._qname);
        hashMap.put("owner", this._owner);
        hashMap.put("holdOnPublishEnabled", Boolean.FALSE);
        this._queue = this._virtualHost.createChild(Queue.class, hashMap, new ConfiguredObject[0]);
        ServerMessage createMessage = createMessage(new Long(24L));
        Mockito.when(Long.valueOf(createMessage.getMessageHeader().getNotValidBefore())).thenReturn(Long.valueOf(System.currentTimeMillis() + 20000));
        this._queue.enqueue(createMessage, (Action) null, (MessageEnqueueRecord) null);
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
        assertEquals("Message was held despite queue not having holding enabled", 1, this._consumerTarget.getMessages().size());
    }

    public void testUnheldMessageOvertakesHeld() throws Exception {
        this._queue.close();
        HashMap hashMap = new HashMap(this._arguments);
        hashMap.put("name", this._qname);
        hashMap.put("owner", this._owner);
        hashMap.put("holdOnPublishEnabled", Boolean.TRUE);
        this._queue = this._virtualHost.createChild(Queue.class, hashMap, new ConfiguredObject[0]);
        ServerMessage createMessage = createMessage(new Long(24L));
        AMQMessageHeader messageHeader = createMessage.getMessageHeader();
        Mockito.when(Long.valueOf(messageHeader.getNotValidBefore())).thenReturn(Long.valueOf(System.currentTimeMillis() + 20000));
        this._queue.enqueue(createMessage, (Action) null, (MessageEnqueueRecord) null);
        ServerMessage createMessage2 = createMessage(new Long(25L));
        this._queue.enqueue(createMessage2, (Action) null, (MessageEnqueueRecord) null);
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
        assertEquals("Expect one message (message B)", 1, this._consumerTarget.getMessages().size());
        assertEquals("Wrong message received", createMessage2.getMessageHeader().getMessageId(), this._consumerTarget.getMessages().get(0).getMessage().getMessageHeader().getMessageId());
        Mockito.when(Long.valueOf(messageHeader.getNotValidBefore())).thenReturn(Long.valueOf(System.currentTimeMillis() - 100));
        this._queue.checkMessageStatus();
        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
        assertEquals("Message which was valid was not received", 2, this._consumerTarget.getMessages().size());
        assertEquals("Wrong message received", createMessage.getMessageHeader().getMessageId(), this._consumerTarget.getMessages().get(1).getMessage().getMessageHeader().getMessageId());
    }

    public void testReleasedMessageIsResentToSubscriber() throws Exception {
        ServerMessage createMessage = createMessage(new Long(24L));
        ServerMessage createMessage2 = createMessage(new Long(25L));
        ServerMessage createMessage3 = createMessage(new Long(26L));
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        ArrayList arrayList = new ArrayList();
        EntryListAddingAction entryListAddingAction = new EntryListAddingAction(arrayList);
        this._queue.enqueue(createMessage, entryListAddingAction, (MessageEnqueueRecord) null);
        this._queue.enqueue(createMessage2, entryListAddingAction, (MessageEnqueueRecord) null);
        this._queue.enqueue(createMessage3, entryListAddingAction, (MessageEnqueueRecord) null);
        Thread.sleep(150L);
        assertEquals("Unexpected total number of messages sent to consumer", 3, this._consumerTarget.getMessages().size());
        assertFalse("Redelivery flag should not be set", ((QueueEntry) arrayList.get(0)).isRedelivered());
        assertFalse("Redelivery flag should not be set", ((QueueEntry) arrayList.get(1)).isRedelivered());
        assertFalse("Redelivery flag should not be set", ((QueueEntry) arrayList.get(2)).isRedelivered());
        ((QueueEntry) arrayList.get(0)).release();
        Thread.sleep(150L);
        assertEquals("Unexpected total number of messages sent to consumer", 4, this._consumerTarget.getMessages().size());
        assertTrue("Redelivery flag should now be set", ((QueueEntry) arrayList.get(0)).isRedelivered());
        assertFalse("Redelivery flag should remain be unset", ((QueueEntry) arrayList.get(1)).isRedelivered());
        assertFalse("Redelivery flag should remain be unset", ((QueueEntry) arrayList.get(2)).isRedelivered());
        assertNull("releasedEntry should be cleared after requeue processed", this._consumer.getQueueContext().getReleasedEntry());
    }

    public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception {
        ServerMessage createMessage = createMessage(new Long(24L));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this._consumerTarget = new MockConsumer() { // from class: org.apache.qpid.server.queue.AbstractQueueTestBase.1
            @Override // org.apache.qpid.server.consumer.MockConsumer
            public long send(ConsumerImpl consumerImpl, MessageInstance messageInstance, boolean z) {
                try {
                    long send = super.send(consumerImpl, messageInstance, z);
                    countDownLatch.countDown();
                    return send;
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        };
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
        ArrayList arrayList = new ArrayList();
        EntryListAddingAction entryListAddingAction = new EntryListAddingAction(arrayList);
        long currentTimeMillis = System.currentTimeMillis() + 200;
        Mockito.when(Long.valueOf(createMessage.getExpiration())).thenReturn(Long.valueOf(currentTimeMillis));
        this._queue.enqueue(createMessage, entryListAddingAction, (MessageEnqueueRecord) null);
        assertTrue("Message was not sent during expected time interval", countDownLatch.await(5000L, TimeUnit.MILLISECONDS));
        assertEquals("Unexpected total number of messages sent to consumer", 1, this._consumerTarget.getMessages().size());
        QueueEntry queueEntry = (QueueEntry) arrayList.get(0);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.EntryState>() { // from class: org.apache.qpid.server.queue.AbstractQueueTestBase.2
            public void stateChanged(MessageInstance messageInstance, MessageInstance.EntryState entryState, MessageInstance.EntryState entryState2) {
                if (entryState2.equals(MessageInstance.DEQUEUED_STATE)) {
                    countDownLatch2.countDown();
                }
            }
        });
        assertFalse("Redelivery flag should not be set", queueEntry.isRedelivered());
        while (!queueEntry.expired() && System.currentTimeMillis() <= currentTimeMillis) {
            Thread.sleep(10L);
        }
        assertTrue("Expecting the queue entry to be now expired", queueEntry.expired());
        queueEntry.release();
        assertTrue("Message was not de-queued due to expiration", countDownLatch2.await(5000L, TimeUnit.MILLISECONDS));
        assertEquals("Total number of messages sent should not have changed", 1, this._consumerTarget.getMessages().size());
        assertFalse("Redelivery flag should not be set", queueEntry.isRedelivered());
        long j = 10;
        while (this._consumer.getQueueContext().getReleasedEntry() != null) {
            long j2 = j;
            j = j2 - 1;
            if (j2 <= 0) {
                break;
            } else {
                Thread.sleep(10L);
            }
        }
        assertNull("releasedEntry should be cleared after requeue processed:" + this._consumer.getQueueContext().getReleasedEntry(), this._consumer.getQueueContext().getReleasedEntry());
    }

    public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception {
        ServerMessage createMessage = createMessage(new Long(24L));
        ServerMessage createMessage2 = createMessage(new Long(25L));
        ServerMessage createMessage3 = createMessage(new Long(26L));
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        ArrayList arrayList = new ArrayList();
        EntryListAddingAction entryListAddingAction = new EntryListAddingAction(arrayList);
        this._queue.enqueue(createMessage, entryListAddingAction, (MessageEnqueueRecord) null);
        this._queue.enqueue(createMessage2, entryListAddingAction, (MessageEnqueueRecord) null);
        this._queue.enqueue(createMessage3, entryListAddingAction, (MessageEnqueueRecord) null);
        Thread.sleep(150L);
        assertEquals("Unexpected total number of messages sent to consumer", 3, this._consumerTarget.getMessages().size());
        assertFalse("Redelivery flag should not be set", ((QueueEntry) arrayList.get(0)).isRedelivered());
        assertFalse("Redelivery flag should not be set", ((QueueEntry) arrayList.get(1)).isRedelivered());
        assertFalse("Redelivery flag should not be set", ((QueueEntry) arrayList.get(2)).isRedelivered());
        ((QueueEntry) arrayList.get(2)).release();
        ((QueueEntry) arrayList.get(0)).release();
        Thread.sleep(150L);
        assertEquals("Unexpected total number of messages sent to consumer", 5, this._consumerTarget.getMessages().size());
        assertTrue("Redelivery flag should now be set", ((QueueEntry) arrayList.get(0)).isRedelivered());
        assertFalse("Redelivery flag should remain be unset", ((QueueEntry) arrayList.get(1)).isRedelivered());
        assertTrue("Redelivery flag should now be set", ((QueueEntry) arrayList.get(2)).isRedelivered());
        assertNull("releasedEntry should be cleared after requeue processed", this._consumer.getQueueContext().getReleasedEntry());
    }

    public void testReleaseForQueueWithMultipleConsumers() throws Exception {
        ServerMessage createMessage = createMessage(new Long(24L));
        ServerMessage createMessage2 = createMessage(new Long(25L));
        MockConsumer mockConsumer = new MockConsumer();
        MockConsumer mockConsumer2 = new MockConsumer();
        QueueConsumer addConsumer = this._queue.addConsumer(mockConsumer, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        QueueConsumer addConsumer2 = this._queue.addConsumer(mockConsumer2, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        ArrayList arrayList = new ArrayList();
        EntryListAddingAction entryListAddingAction = new EntryListAddingAction(arrayList);
        this._queue.enqueue(createMessage, entryListAddingAction, (MessageEnqueueRecord) null);
        this._queue.enqueue(createMessage2, entryListAddingAction, (MessageEnqueueRecord) null);
        Thread.sleep(150L);
        assertEquals("Unexpected total number of messages sent to both after enqueue", 2, mockConsumer.getMessages().size() + mockConsumer2.getMessages().size());
        ((QueueEntry) arrayList.get(0)).release();
        Thread.sleep(150L);
        assertEquals("Unexpected total number of messages sent to both consumers after release", 3, mockConsumer.getMessages().size() + mockConsumer2.getMessages().size());
        assertNull("releasedEntry should be cleared after requeue processed", addConsumer.getQueueContext().getReleasedEntry());
        assertNull("releasedEntry should be cleared after requeue processed", addConsumer2.getQueueContext().getReleasedEntry());
    }

    public void testExclusiveConsumer() throws Exception {
        ServerMessage createMessage = createMessage(new Long(24L));
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.EXCLUSIVE, ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        assertEquals("Queue does not have consumer", 1, this._queue.getConsumerCount());
        assertEquals("Queue does not have active consumer", 1, this._queue.getConsumerCountWithCredit());
        this._queue.enqueue(createMessage, (Action) null, (MessageEnqueueRecord) null);
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
        }
        assertEquals(createMessage, this._consumer.getQueueContext().getLastSeenEntry().getMessage());
        MockConsumer mockConsumer = new MockConsumer();
        MessageSource.ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive = null;
        try {
            this._queue.addConsumer(mockConsumer, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        } catch (MessageSource.ExistingExclusiveConsumer e2) {
            existingConsumerPreventsExclusive = e2;
        }
        assertNotNull(existingConsumerPreventsExclusive);
        this._consumer.close();
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        try {
            this._consumer = this._queue.addConsumer(mockConsumer, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.EXCLUSIVE));
        } catch (MessageSource.ExistingConsumerPreventsExclusive e3) {
            existingConsumerPreventsExclusive = e3;
        }
        assertNotNull(existingConsumerPreventsExclusive);
    }

    public void testResend() throws Exception {
        ServerMessage createMessage = createMessage(new Long(26L));
        this._consumer = this._queue.addConsumer(this._consumerTarget, (FilterManager) null, createMessage.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
        this._queue.enqueue(createMessage, new Action<MessageInstance>() { // from class: org.apache.qpid.server.queue.AbstractQueueTestBase.3
            public void performAction(MessageInstance messageInstance) {
                QueueEntryImpl queueEntryImpl = (QueueEntryImpl) messageInstance;
                queueEntryImpl.setRedelivered();
                AbstractQueueTestBase.this._consumer.resend(queueEntryImpl);
            }
        }, (MessageEnqueueRecord) null);
    }

    public void testGetFirstMessageId() throws Exception {
        Long l = new Long(23L);
        this._queue.enqueue(createMessage(l), (Action) null, (MessageEnqueueRecord) null);
        assertEquals("Message ID was wrong", l, (Long) this._queue.getMessagesOnTheQueue(1).get(0));
    }

    public void testGetFirstFiveMessageIds() throws Exception {
        for (int i = 0; i < 5; i++) {
            this._queue.enqueue(createMessage(new Long(i)), (Action) null, (MessageEnqueueRecord) null);
        }
        List messagesOnTheQueue = this._queue.getMessagesOnTheQueue(5);
        for (int i2 = 0; i2 < 5; i2++) {
            assertEquals("Message ID was wrong", new Long(i2), messagesOnTheQueue.get(i2));
        }
    }

    public void testGetLastFiveMessageIds() throws Exception {
        for (int i = 0; i < 10; i++) {
            this._queue.enqueue(createMessage(new Long(i)), (Action) null, (MessageEnqueueRecord) null);
        }
        List messagesOnTheQueue = this._queue.getMessagesOnTheQueue(5, 5);
        for (int i2 = 0; i2 < 5; i2++) {
            assertEquals("Message ID was wrong", new Long(i2 + 5), messagesOnTheQueue.get(i2));
        }
    }

    public void testGetMessagesRangeOnTheQueue() throws Exception {
        for (int i = 1; i <= 10; i++) {
            this._queue.enqueue(createMessage(new Long(i)), (Action) null, (MessageEnqueueRecord) null);
        }
        assertTrue(this._queue.getMessagesRangeOnTheQueue(0L, 0L).size() == 0);
        List messagesRangeOnTheQueue = this._queue.getMessagesRangeOnTheQueue(0L, 2L);
        assertTrue(messagesRangeOnTheQueue.size() == 2);
        assertEquals("Message ID was wrong", ((QueueEntry) messagesRangeOnTheQueue.get(0)).getMessage().getMessageNumber(), 1L);
        assertEquals("Message ID was wrong", ((QueueEntry) messagesRangeOnTheQueue.get(1)).getMessage().getMessageNumber(), 2L);
        assertTrue(this._queue.getMessagesRangeOnTheQueue(5L, 4L).size() == 0);
        List messagesRangeOnTheQueue2 = this._queue.getMessagesRangeOnTheQueue(1L, 1L);
        assertTrue(messagesRangeOnTheQueue2.size() == 1);
        assertEquals("Message ID was wrong", ((QueueEntry) messagesRangeOnTheQueue2.get(0)).getMessage().getMessageNumber(), 1L);
        List messagesRangeOnTheQueue3 = this._queue.getMessagesRangeOnTheQueue(5L, 7L);
        assertTrue(messagesRangeOnTheQueue3.size() == 3);
        assertEquals("Message ID was wrong", ((QueueEntry) messagesRangeOnTheQueue3.get(0)).getMessage().getMessageNumber(), 5L);
        assertEquals("Message ID was wrong", ((QueueEntry) messagesRangeOnTheQueue3.get(1)).getMessage().getMessageNumber(), 6L);
        assertEquals("Message ID was wrong", ((QueueEntry) messagesRangeOnTheQueue3.get(2)).getMessage().getMessageNumber(), 7L);
        List messagesRangeOnTheQueue4 = this._queue.getMessagesRangeOnTheQueue(10L, 10L);
        assertTrue(messagesRangeOnTheQueue4.size() == 1);
        assertEquals("Message ID was wrong", ((QueueEntry) messagesRangeOnTheQueue4.get(0)).getMessage().getMessageNumber(), 10L);
        assertTrue(this._queue.getMessagesRangeOnTheQueue(11L, 11L).size() == 0);
        List messagesRangeOnTheQueue5 = this._queue.getMessagesRangeOnTheQueue(9L, 11L);
        assertTrue(messagesRangeOnTheQueue5.size() == 2);
        assertEquals("Message ID was wrong", ((QueueEntry) messagesRangeOnTheQueue5.get(0)).getMessage().getMessageNumber(), 9L);
        assertEquals("Message ID was wrong", ((QueueEntry) messagesRangeOnTheQueue5.get(1)).getMessage().getMessageNumber(), 10L);
    }

    public void testGetMessagesOnTheQueueWithDequeuedEntry() {
        enqueueGivenNumberOfMessages(this._queue, 4);
        dequeueMessage(this._queue, 1);
        List messagesOnTheQueue = this._queue.getMessagesOnTheQueue();
        assertEquals(4 - 1, messagesOnTheQueue.size());
        int i = 0;
        for (int i2 = 0; i2 < 4 - 1; i2++) {
            Long valueOf = Long.valueOf(((QueueEntry) messagesOnTheQueue.get(i2)).getMessage().getMessageNumber());
            if (i2 == 1) {
                assertFalse("Message with id 1 was dequeued and should not be returned by method getMessagesOnTheQueue!", new Long(i).equals(valueOf));
                i++;
            }
            assertEquals("Expected message with id " + i + " but got message with id " + valueOf, new Long(i), valueOf);
            i++;
        }
    }

    public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry() {
        enqueueGivenNumberOfMessages(this._queue, 4);
        dequeueMessage(this._queue, 1);
        List messagesOnTheQueue = this._queue.getMessagesOnTheQueue(new AbstractQueue.QueueEntryFilter() { // from class: org.apache.qpid.server.queue.AbstractQueueTestBase.4
            public boolean accept(QueueEntry queueEntry) {
                return true;
            }

            public boolean filterComplete() {
                return false;
            }
        });
        assertEquals(4 - 1, messagesOnTheQueue.size());
        int i = 0;
        for (int i2 = 0; i2 < 4 - 1; i2++) {
            Long valueOf = Long.valueOf(((QueueEntry) messagesOnTheQueue.get(i2)).getMessage().getMessageNumber());
            if (i2 == 1) {
                assertFalse("Message with id 1 was dequeued and should not be returned by method getMessagesOnTheQueue!", new Long(i).equals(valueOf));
                i++;
            }
            assertEquals("Expected message with id " + i + " but got message with id " + valueOf, new Long(i), valueOf);
            i++;
        }
    }

    public void testClearQueueWithDequeuedEntry() throws Exception {
        enqueueGivenNumberOfMessages(this._queue, 4);
        dequeueMessage(this._queue, 1);
        this._queue.clearQueue();
        List messagesOnTheQueue = this._queue.getMessagesOnTheQueue();
        assertNotNull(messagesOnTheQueue);
        assertEquals(0, messagesOnTheQueue.size());
    }

    public void testNotificationFiredOnEnqueue() throws Exception {
        QueueNotificationListener queueNotificationListener = (QueueNotificationListener) Mockito.mock(QueueNotificationListener.class);
        this._queue.setNotificationListener(queueNotificationListener);
        this._queue.setAttributes(Collections.singletonMap("alertThresholdQueueDepthMessages", 2));
        this._queue.enqueue(createMessage(new Long(24L)), (Action) null, (MessageEnqueueRecord) null);
        Mockito.verifyZeroInteractions(new Object[]{queueNotificationListener});
        this._queue.enqueue(createMessage(new Long(25L)), (Action) null, (MessageEnqueueRecord) null);
        ((QueueNotificationListener) Mockito.verify(queueNotificationListener, Mockito.atLeastOnce())).notifyClients((NotificationCheck) Matchers.eq(NotificationCheck.MESSAGE_COUNT_ALERT), (Queue) Matchers.eq(this._queue), Matchers.contains("Maximum count on queue threshold"));
    }

    public void testNotificationFiredAsync() throws Exception {
        QueueNotificationListener queueNotificationListener = (QueueNotificationListener) Mockito.mock(QueueNotificationListener.class);
        this._queue.enqueue(createMessage(new Long(24L)), (Action) null, (MessageEnqueueRecord) null);
        this._queue.enqueue(createMessage(new Long(25L)), (Action) null, (MessageEnqueueRecord) null);
        this._queue.enqueue(createMessage(new Long(26L)), (Action) null, (MessageEnqueueRecord) null);
        this._queue.setNotificationListener(queueNotificationListener);
        this._queue.setAttributes(Collections.singletonMap("alertThresholdQueueDepthMessages", 2));
        Mockito.verifyZeroInteractions(new Object[]{queueNotificationListener});
        this._queue.checkMessageStatus();
        ((QueueNotificationListener) Mockito.verify(queueNotificationListener, Mockito.atLeastOnce())).notifyClients((NotificationCheck) Matchers.eq(NotificationCheck.MESSAGE_COUNT_ALERT), (Queue) Matchers.eq(this._queue), Matchers.contains("Maximum count on queue threshold"));
    }

    public void testMaximumMessageTtl() throws Exception {
        HashMap hashMap = new HashMap(this._arguments);
        hashMap.put("name", "testTtlOverrideMaximumTTl");
        hashMap.put("maximumMessageTtl", 10000L);
        AMQQueue createQueue = this._virtualHost.createQueue(hashMap);
        assertEquals("TTL has not been overridden", 60000L, getExpirationOnQueue(createQueue, 50000L, 0L));
        assertEquals("TTL has not been overridden", 60000L, getExpirationOnQueue(createQueue, 50000L, 65000L));
        assertEquals("TTL has been incorrectly overridden", 55000L, getExpirationOnQueue(createQueue, 50000L, 55000L));
        long currentTimeMillis = System.currentTimeMillis() + 20000;
        assertTrue("TTL has not been overridden", currentTimeMillis != getExpirationOnQueue(createQueue, 0L, currentTimeMillis));
        long currentTimeMillis2 = System.currentTimeMillis() + 5000;
        assertEquals("TTL has been incorrectly overriden", currentTimeMillis2, getExpirationOnQueue(createQueue, 0L, currentTimeMillis2));
        HashMap hashMap2 = new HashMap(this._arguments);
        hashMap2.put("name", "testTtlOverrideMinimumTTl");
        hashMap2.put("minimumMessageTtl", 10000L);
        AMQQueue createQueue2 = this._virtualHost.createQueue(hashMap2);
        assertEquals("TTL has been overridden incorrectly", 0L, getExpirationOnQueue(createQueue2, 50000L, 0L));
        assertEquals("TTL has been overridden incorrectly", 65000L, getExpirationOnQueue(createQueue2, 50000L, 65000L));
        assertEquals("TTL has not been overriden", 60000L, getExpirationOnQueue(createQueue2, 50000L, 55000L));
        assertTrue("TTL has not been overridden", System.currentTimeMillis() + 5000 != getExpirationOnQueue(createQueue2, 0L, currentTimeMillis));
        long currentTimeMillis3 = System.currentTimeMillis() + 20000;
        assertEquals("TTL has been incorrectly overridden", currentTimeMillis3, getExpirationOnQueue(createQueue2, 0L, currentTimeMillis3));
        HashMap hashMap3 = new HashMap(this._arguments);
        hashMap3.put("name", "testTtlOverrideBothTTl");
        hashMap3.put("minimumMessageTtl", 10000L);
        hashMap3.put("maximumMessageTtl", 20000L);
        AMQQueue createQueue3 = this._virtualHost.createQueue(hashMap3);
        assertEquals("TTL has not been overridden", 70000L, getExpirationOnQueue(createQueue3, 50000L, 0L));
        assertEquals("TTL has been overridden incorrectly", 65000L, getExpirationOnQueue(createQueue3, 50000L, 65000L));
        assertEquals("TTL has not been overridden", 60000L, getExpirationOnQueue(createQueue3, 50000L, 55000L));
    }

    public void testOldestMessage() {
        AMQQueue<?> queue = getQueue();
        queue.enqueue(createMessage(1L, (byte) 1, Collections.singletonMap("sortKey", "Z"), 10L), (Action) null, (MessageEnqueueRecord) null);
        queue.enqueue(createMessage(2L, (byte) 4, Collections.singletonMap("sortKey", "M"), 100L), (Action) null, (MessageEnqueueRecord) null);
        queue.enqueue(createMessage(3L, (byte) 9, Collections.singletonMap("sortKey", "A"), 1000L), (Action) null, (MessageEnqueueRecord) null);
        assertEquals(10L, queue.getOldestMessageArrivalTime());
    }

    private long getExpirationOnQueue(AMQQueue aMQQueue, long j, long j2) {
        final ArrayList arrayList = new ArrayList();
        ServerMessage createMessage = createMessage(1L);
        Mockito.when(Long.valueOf(createMessage.getArrivalTime())).thenReturn(Long.valueOf(j));
        Mockito.when(Long.valueOf(createMessage.getExpiration())).thenReturn(Long.valueOf(j2));
        aMQQueue.enqueue(createMessage, (Action) null, (MessageEnqueueRecord) null);
        aMQQueue.visit(new QueueEntryVisitor() { // from class: org.apache.qpid.server.queue.AbstractQueueTestBase.5
            public boolean visit(QueueEntry queueEntry) {
                arrayList.add(queueEntry);
                return true;
            }
        });
        assertEquals("Expected only one entry in the queue", 1, arrayList.size());
        Long l = (Long) ((QueueEntry) arrayList.get(0)).getInstanceProperties().getProperty(InstanceProperties.Property.EXPIRATION);
        aMQQueue.clearQueue();
        arrayList.clear();
        return l.longValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<? extends QueueEntry> enqueueGivenNumberOfMessages(AMQQueue<?> aMQQueue, int i) {
        putGivenNumberOfMessages(aMQQueue, i);
        List<? extends QueueEntry> messagesOnTheQueue = aMQQueue.getMessagesOnTheQueue();
        assertEquals(i, messagesOnTheQueue.size());
        for (int i2 = 0; i2 < i; i2++) {
            assertEquals(i2, messagesOnTheQueue.get(i2).getMessage().getMessageNumber());
        }
        return messagesOnTheQueue;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void putGivenNumberOfMessages(AMQQueue<?> aMQQueue, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            aMQQueue.enqueue(createMessage(Long.valueOf(i2)), (Action) null, (MessageEnqueueRecord) null);
        }
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            _logger.error("Thread interrupted", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public QueueEntry dequeueMessage(AMQQueue<?> aMQQueue, int i) {
        QueueEntry queueEntry = (QueueEntry) aMQQueue.getMessagesOnTheQueue().get(i);
        queueEntry.acquire();
        queueEntry.delete();
        assertTrue(queueEntry.isDeleted());
        return queueEntry;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyReceivedMessages(List<MessageInstance> list, List<MessageInstance> list2) {
        assertEquals("Consumer did not receive the expected number of messages", list.size(), list2.size());
        for (MessageInstance messageInstance : list) {
            assertTrue("Consumer did not receive msg: " + messageInstance.getMessage().getMessageNumber(), list2.contains(messageInstance));
        }
    }

    public AMQQueue<?> getQueue() {
        return this._queue;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setQueue(AMQQueue<?> aMQQueue) {
        this._queue = aMQQueue;
    }

    public MockConsumer getConsumer() {
        return this._consumerTarget;
    }

    public Map<String, Object> getArguments() {
        return this._arguments;
    }

    public void setArguments(Map<String, Object> map) {
        this._arguments = map;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ServerMessage createMessage(Long l, byte b, final Map<String, Object> map, long j) {
        ServerMessage createMessage = createMessage(l);
        AMQMessageHeader messageHeader = createMessage.getMessageHeader();
        Mockito.when(Byte.valueOf(messageHeader.getPriority())).thenReturn(Byte.valueOf(b));
        Mockito.when(Long.valueOf(createMessage.getArrivalTime())).thenReturn(Long.valueOf(j));
        Mockito.when(messageHeader.getHeaderNames()).thenReturn(map.keySet());
        final ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        Mockito.when(Boolean.valueOf(messageHeader.containsHeader((String) forClass.capture()))).thenAnswer(new Answer<Boolean>() { // from class: org.apache.qpid.server.queue.AbstractQueueTestBase.6
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Boolean m25answer(InvocationOnMock invocationOnMock) throws Throwable {
                return Boolean.valueOf(map.containsKey(forClass.getValue()));
            }
        });
        final ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Set.class);
        Mockito.when(Boolean.valueOf(messageHeader.containsHeaders((Set) forClass2.capture()))).thenAnswer(new Answer<Boolean>() { // from class: org.apache.qpid.server.queue.AbstractQueueTestBase.7
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Boolean m26answer(InvocationOnMock invocationOnMock) throws Throwable {
                return Boolean.valueOf(map.keySet().containsAll((Collection) forClass2.getValue()));
            }
        });
        final ArgumentCaptor forClass3 = ArgumentCaptor.forClass(String.class);
        Mockito.when(messageHeader.getHeader((String) forClass3.capture())).thenAnswer(new Answer<Object>() { // from class: org.apache.qpid.server.queue.AbstractQueueTestBase.8
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                return map.get(forClass3.getValue());
            }
        });
        return createMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ServerMessage createMessage(Long l) {
        AMQMessageHeader aMQMessageHeader = (AMQMessageHeader) Mockito.mock(AMQMessageHeader.class);
        Mockito.when(aMQMessageHeader.getMessageId()).thenReturn(String.valueOf(l));
        ServerMessage serverMessage = (ServerMessage) Mockito.mock(ServerMessage.class);
        Mockito.when(Long.valueOf(serverMessage.getMessageNumber())).thenReturn(l);
        Mockito.when(serverMessage.getMessageHeader()).thenReturn(aMQMessageHeader);
        MessageReference messageReference = (MessageReference) Mockito.mock(MessageReference.class);
        Mockito.when(messageReference.getMessage()).thenReturn(serverMessage);
        Mockito.when(serverMessage.newReference()).thenReturn(messageReference);
        Mockito.when(serverMessage.newReference((TransactionLogResource) Matchers.any(TransactionLogResource.class))).thenReturn(messageReference);
        return serverMessage;
    }

    public VirtualHostImpl getVirtualHost() {
        return this._virtualHost;
    }

    public String getQname() {
        return this._qname;
    }

    public String getOwner() {
        return this._owner;
    }

    public String getRoutingKey() {
        return this._routingKey;
    }

    public DirectExchange getExchange() {
        return this._exchange;
    }

    public MockConsumer getConsumerTarget() {
        return this._consumerTarget;
    }
}
