package org.apache.activemq.bugs;

import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/DataFileNotDeletedTest.class */
public class DataFileNotDeletedTest extends TestCase {
    private static int messageCounter;
    private BrokerService broker;
    private Connection receiverConnection;
    private Connection producerConnection;
    AMQPersistenceAdapter persistentAdapter;
    private static final Logger LOG = LoggerFactory.getLogger(DataFileNotDeletedTest.class);
    private static int max_messages = 600;
    protected static final String payload = new String(new byte[512]);
    private final CountDownLatch latch = new CountDownLatch(max_messages);
    private final String destinationName = getName() + "_Queue";
    final boolean useTopic = false;

    public void setUp() throws Exception {
        messageCounter = 0;
        startBroker();
        this.receiverConnection = createConnection();
        this.receiverConnection.start();
        this.producerConnection = createConnection();
        this.producerConnection.start();
    }

    public void tearDown() throws Exception {
        this.receiverConnection.close();
        this.producerConnection.close();
        this.broker.stop();
    }

    public void testForDataFileNotDeleted() throws Exception {
        doTestForDataFileNotDeleted(false);
    }

    public void testForDataFileNotDeletedTransacted() throws Exception {
        doTestForDataFileNotDeleted(true);
    }

    private void doTestForDataFileNotDeleted(boolean z) throws Exception {
        buildReceiver(this.receiverConnection, this.destinationName, z, new Receiver() { // from class: org.apache.activemq.bugs.DataFileNotDeletedTest.1
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                DataFileNotDeletedTest.access$008();
                DataFileNotDeletedTest.this.latch.countDown();
            }
        }, false);
        MessageSender messageSender = new MessageSender(this.destinationName, this.producerConnection, z, false);
        for (int i = 0; i < max_messages; i++) {
            messageSender.send(payload);
        }
        this.latch.await();
        assertEquals(max_messages, messageCounter);
        LOG.info("Sent and received + " + messageCounter + ", file count " + this.persistentAdapter.getAsyncDataManager().getFiles().size());
        waitFordataFilesToBeCleanedUp(this.persistentAdapter.getAsyncDataManager(), 60000, 2);
    }

    private void waitFordataFilesToBeCleanedUp(AsyncDataManager asyncDataManager, int i, int i2) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + i;
        while (currentTimeMillis > System.currentTimeMillis() && asyncDataManager.getFiles().size() > i2) {
            Thread.sleep(1000L);
        }
        assertEquals("persistence adapter dataManager has correct number of files", i2, asyncDataManager.getFiles().size());
    }

    private Connection createConnection() throws JMSException {
        return new ActiveMQConnectionFactory("tcp://localhost:61616").createConnection();
    }

    private void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.addConnector("tcp://localhost:61616").setName("Default");
        this.broker.setPersistenceFactory(new AMQPersistenceAdapterFactory());
        AMQPersistenceAdapterFactory persistenceFactory = this.broker.getPersistenceFactory();
        persistenceFactory.setMaxFileLength(20480);
        persistenceFactory.setCheckpointInterval(500L);
        persistenceFactory.setCleanupInterval(500L);
        persistenceFactory.setSyncOnWrite(false);
        this.persistentAdapter = this.broker.getPersistenceAdapter();
        this.broker.start();
        LOG.info("Starting broker..");
    }

    private void buildReceiver(Connection connection, String str, boolean z, final Receiver receiver, boolean z2) throws Exception {
        final Session createSession = z ? connection.createSession(true, 0) : connection.createSession(false, 1);
        createSession.createConsumer(z2 ? createSession.createTopic(str) : createSession.createQueue(str)).setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.DataFileNotDeletedTest.2
            public void onMessage(Message message) {
                try {
                    receiver.receive((String) ((ObjectMessage) message).getObject());
                    if (createSession.getTransacted()) {
                        createSession.commit();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    static /* synthetic */ int access$008() {
        int i = messageCounter;
        messageCounter = i + 1;
        return i;
    }
}
