package org.apache.activemq.store.kahadb;

import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.class */
public class KahaDBFastEnqueueTest {
    private static final Logger LOG = LoggerFactory.getLogger(KahaDBFastEnqueueTest.class);
    private BrokerService broker;
    private ActiveMQConnectionFactory connectionFactory;
    KahaDBPersistenceAdapter kahaDBPersistenceAdapter;
    private Destination destination = new ActiveMQQueue("Test");
    private String payloadString = new String(new byte[6144]);
    private boolean useBytesMessage = true;
    private final int parallelProducer = 20;
    private Vector<Exception> exceptions = new Vector<>();
    long toSend = DurableSubProcessWithRestartTest.BROKER_RESTART;
    final double sampleRate = 100000.0d;

    @Test
    public void testPublishNoConsumer() throws Exception {
        startBroker(true, 10);
        final AtomicLong atomicLong = new AtomicLong(this.toSend);
        long currentTimeMillis = System.currentTimeMillis();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.store.kahadb.KahaDBFastEnqueueTest.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        KahaDBFastEnqueueTest.this.publishMessages(atomicLong, 0);
                    } catch (Exception e) {
                        KahaDBFastEnqueueTest.this.exceptions.add(e);
                    }
                }
            });
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(30L, TimeUnit.MINUTES);
        Assert.assertTrue("Producers done in time", newCachedThreadPool.isTerminated());
        Assert.assertTrue("No exceptions: " + this.exceptions, this.exceptions.isEmpty());
        long length = this.toSend * this.payloadString.length();
        double currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        stopBroker();
        LOG.info("Duration:                " + currentTimeMillis2 + "ms");
        LOG.info("Rate:                       " + ((this.toSend * 1000) / currentTimeMillis2) + "m/s");
        LOG.info("Total send:             " + length);
        LOG.info("Total journal write: " + this.kahaDBPersistenceAdapter.getStore().getJournal().length());
        LOG.info("Total index size " + this.kahaDBPersistenceAdapter.getStore().getPageFile().getDiskSize());
        LOG.info("Total store size: " + this.kahaDBPersistenceAdapter.size());
        LOG.info("Journal writes %:    " + ((this.kahaDBPersistenceAdapter.getStore().getJournal().length() / length) * 100.0d) + "%");
        restartBroker(0, 1200000);
        consumeMessages(this.toSend);
    }

    @Test
    public void testPublishNoConsumerNoCheckpoint() throws Exception {
        this.toSend = 100L;
        startBroker(true, 0);
        final AtomicLong atomicLong = new AtomicLong(this.toSend);
        long currentTimeMillis = System.currentTimeMillis();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.store.kahadb.KahaDBFastEnqueueTest.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        KahaDBFastEnqueueTest.this.publishMessages(atomicLong, 0);
                    } catch (Exception e) {
                        KahaDBFastEnqueueTest.this.exceptions.add(e);
                    }
                }
            });
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(30L, TimeUnit.MINUTES);
        Assert.assertTrue("Producers done in time", newCachedThreadPool.isTerminated());
        Assert.assertTrue("No exceptions: " + this.exceptions, this.exceptions.isEmpty());
        long length = this.toSend * this.payloadString.length();
        this.broker.getAdminView().gc();
        double currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        stopBroker();
        LOG.info("Duration:                " + currentTimeMillis2 + "ms");
        LOG.info("Rate:                       " + ((this.toSend * 1000) / currentTimeMillis2) + "m/s");
        LOG.info("Total send:             " + length);
        LOG.info("Total journal write: " + this.kahaDBPersistenceAdapter.getStore().getJournal().length());
        LOG.info("Total index size " + this.kahaDBPersistenceAdapter.getStore().getPageFile().getDiskSize());
        LOG.info("Total store size: " + this.kahaDBPersistenceAdapter.size());
        LOG.info("Journal writes %:    " + ((this.kahaDBPersistenceAdapter.getStore().getJournal().length() / length) * 100.0d) + "%");
        restartBroker(0, 0);
        consumeMessages(this.toSend);
    }

    private void consumeMessages(long j) throws Exception {
        ActiveMQConnection createConnection = this.connectionFactory.createConnection();
        createConnection.setWatchTopicAdvisories(false);
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(this.destination);
        for (int i = 0; i < j; i++) {
            Assert.assertNotNull("got message " + i, createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART));
        }
        Assert.assertNull("none left over", createConsumer.receive(2000L));
    }

    private void restartBroker(int i, int i2) throws Exception {
        stopBroker();
        TimeUnit.MILLISECONDS.sleep(i);
        startBroker(false, i2);
    }

    @Before
    public void setProps() {
        System.setProperty("org.apache.kahadb.journal.CALLER_BUFFER_APPENDER", Boolean.toString(true));
        System.setProperty("org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW", "10000");
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        System.clearProperty("org.apache.kahadb.journal.CALLER_BUFFER_APPENDER");
        System.clearProperty("org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishMessages(AtomicLong atomicLong, int i) throws Exception {
        BytesMessage createTextMessage;
        ActiveMQConnection createConnection = this.connectionFactory.createConnection();
        createConnection.setWatchTopicAdvisories(false);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.destination);
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        while (true) {
            long andDecrement = atomicLong.getAndDecrement();
            if (andDecrement <= 0) {
                createConnection.syncSendPacket(new ConnectionControl());
                createConnection.close();
                return;
            }
            if (this.useBytesMessage) {
                createTextMessage = createSession.createBytesMessage();
                createTextMessage.writeBytes(this.payloadString.getBytes());
            } else {
                createTextMessage = createSession.createTextMessage(this.payloadString);
            }
            createProducer.send(createTextMessage, 2, 5, i);
            if (andDecrement != this.toSend && andDecrement % 100000.0d == 0.0d) {
                long currentTimeMillis = System.currentTimeMillis();
                LOG.info("Remainder: " + andDecrement + ", rate: " + (1.0E8d / (currentTimeMillis - valueOf.longValue())) + "m/s");
                valueOf = Long.valueOf(currentTimeMillis);
            }
        }
    }

    public void startBroker(boolean z, int i) throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.kahaDBPersistenceAdapter = this.broker.getPersistenceAdapter();
        this.kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false);
        this.kahaDBPersistenceAdapter.setCleanupInterval(i);
        this.kahaDBPersistenceAdapter.setCheckpointInterval(i);
        this.kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(25165824);
        this.kahaDBPersistenceAdapter.setJournalMaxFileLength(134217728);
        this.kahaDBPersistenceAdapter.setIndexCacheSize(500000);
        this.kahaDBPersistenceAdapter.setIndexWriteBatchSize(500000);
        this.kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false);
        this.kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
        this.connectionFactory = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri() + "?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192");
    }

    @Test
    public void testRollover() throws Exception {
        byte b = 1;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 32767) {
                return;
            }
            byte b2 = (byte) (b ^ 1);
            Assert.assertEquals("0 @:" + j2, 0, b2);
            byte b3 = (byte) (b2 ^ 1);
            b = b3;
            Assert.assertEquals("1 @:" + j2, 1, b3);
            j = j2 + 1;
        }
    }
}
