package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.bugs.AMQ4607Test;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usecases.DurableSubDelayedUnsubscribeTest;
import org.apache.activemq.util.ThreadTracker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.class */
public class DurableSubProcessConcurrentCommitActivateNoDuplicateTest {
    public static final long RUNTIME = 300000;
    public static final int SERVER_SLEEP = 500;
    public static final int CARGO_SIZE = 600;
    public static final int MAX_CLIENTS = 2;
    public static final int CLIENT_OFFLINE_DURING_COMMIT = 2;
    public static final long BROKER_RESTART = -120000;
    public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
    public static final boolean CHECK_REDELIVERY = true;
    private BrokerService broker;
    private ActiveMQTopic topic;
    private ClientManager clientManager;
    private Server server;
    private HouseKeeper houseKeeper;
    private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(true);
    private int restartCount = 0;
    private final AtomicInteger onlineCount = new AtomicInteger(0);
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.class);
    public static final Random CLIENT_LIFETIME = new Random(VerifyNetworkConsumersDisconnectTest.TIMEOUT, 120000);
    public static final Random CLIENT_ONLINE = new Random(VerifyNetworkConsumersDisconnectTest.TIMEOUT, 40000);
    public static final Random CLIENT_OFFLINE = new Random(1000, 10000);
    public static final Persistence PERSISTENT_ADAPTER = Persistence.KAHADB;
    static final Vector<Throwable> exceptions = new Vector<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest$Client.class */
    public final class Client extends Thread {
        String url;
        final ConnectionFactory cf;
        public static final String SUBSCRIPTION_NAME = "subscription";
        private final int id;
        private final String conClientId;
        private final Random lifetime;
        private final Random online;
        private final Random offline;
        private final ClientType clientType;
        private final String selector;
        private final ConcurrentLinkedQueue<Message> waitingList;
        private final HashSet<Integer> processed;
        private ActiveMQMessageConsumer consumer;

        public Client(int i, ClientType clientType, Random random, Random random2, Random random3) throws JMSException {
            super("Client" + i);
            this.url = "failover:(tcp://localhost:61656?wireFormat.maxInactivityDuration=0)?jms.watchTopicAdvisories=false&jms.alwaysSyncSend=true&jms.dispatchAsync=true&jms.producerWindowSize=20971520&jms.copyMessageOnSend=false&jms.sendAcksAsync=false&initialReconnectDelay=100&maxReconnectDelay=30000&useExponentialBackOff=true";
            this.cf = new ActiveMQConnectionFactory(this.url);
            this.waitingList = new ConcurrentLinkedQueue<>();
            this.processed = new HashSet<>(10000);
            this.consumer = null;
            setDaemon(true);
            this.id = i;
            this.conClientId = "cli" + i;
            this.clientType = clientType;
            this.selector = "(COMMIT = true and RELEVANT = true) or " + clientType.selector;
            this.lifetime = random;
            this.online = random2;
            this.offline = random3;
            subscribe();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis() + 60000;
            while (currentTimeMillis - System.currentTimeMillis() > 0) {
                try {
                    Thread.sleep(100L);
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().lock();
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.onlineCount.incrementAndGet();
                    try {
                        process(this.online.next());
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.onlineCount.decrementAndGet();
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().unlock();
                    } catch (Throwable th) {
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.onlineCount.decrementAndGet();
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().unlock();
                        throw th;
                    }
                } catch (Throwable th2) {
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.exit(toString() + " failed.", th2);
                }
            }
            if (DurableSubProcessConcurrentCommitActivateNoDuplicateTest.random(1) > 0) {
                unsubscribe();
            } else {
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Client abandon the subscription. " + this);
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.houseKeeper.abandonedSubscriptions.add(this.conClientId);
            }
            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.clientManager.removeClient(this);
            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info(toString() + " DONE.");
        }

        private void process(long j) throws JMSException {
            long currentTimeMillis = System.currentTimeMillis() + 200;
            long j2 = currentTimeMillis + 20000;
            boolean z = false;
            int i = 0;
            Connection openConnection = openConnection();
            Session createSession = openConnection.createSession(false, 2);
            this.consumer = createSession.createDurableSubscriber(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.topic, "subscription", this.selector, false);
            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info(toString() + " ONLINE.");
            while (true) {
                try {
                    long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                    if (currentTimeMillis2 <= 0) {
                        if (!z) {
                            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info(toString() + " done after no work!");
                            break;
                        } else {
                            currentTimeMillis2 = j2 - System.currentTimeMillis();
                            if (currentTimeMillis2 <= 0) {
                                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.exit(this + " failed: Transaction is not finished.");
                            }
                        }
                    }
                    Message receive = this.consumer.receive(currentTimeMillis2);
                    if (receive != null) {
                        onClientMessage(receive);
                        if (receive.propertyExists("COMMIT")) {
                            receive.acknowledge();
                            int intProperty = receive.getIntProperty("TRANS");
                            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Received Trans[id=" + intProperty + ", count=" + i + "] in " + this + ".");
                            z = false;
                            i = 0;
                            if (DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.server.committingTransaction == intProperty) {
                                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Going offline during transaction commit. messageID=" + receive.getIntProperty("ID"));
                                break;
                            }
                        } else {
                            z = true;
                            i++;
                            if (1 == i) {
                                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("In Trans[id=" + receive.getIntProperty("TRANS") + "] first ID=" + receive.getIntProperty("ID"));
                            }
                        }
                    }
                } finally {
                    createSession.close();
                    openConnection.close();
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info(toString() + " OFFLINE.");
                    Message peek = this.waitingList.peek();
                    if (peek != null) {
                        checkDeliveryTime(peek);
                    }
                }
            }
        }

        public void onServerMessage(Message message) throws JMSException {
            if (Boolean.TRUE.equals(message.getObjectProperty("COMMIT"))) {
                if (Boolean.TRUE.equals(message.getObjectProperty("RELEVANT"))) {
                    this.waitingList.add(message);
                }
            } else {
                if (this.clientType.isRelevant(message.getStringProperty("TYPE"))) {
                    this.waitingList.add(message);
                }
            }
        }

        public void onClientMessage(Message message) {
            Message poll = this.waitingList.poll();
            try {
                Integer num = (Integer) message.getObjectProperty("ID");
                if (this.processed != null && this.processed.contains(num)) {
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("! Message has been processed before. " + this + " redeliveredFlag=" + message.getJMSRedelivered() + ", message = " + message);
                }
                if (poll == null) {
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.exit(this + " failed: There is no next server message, but received: " + message);
                }
                Integer num2 = (Integer) poll.getObjectProperty("ID");
                if (num == null || num2 == null) {
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.exit(this + " failed: message ID not found.\r\n received: " + message + "\r\n   server: " + poll);
                }
                if (!num2.equals(num)) {
                    StringBuilder sb = new StringBuilder();
                    Object obj = null;
                    int i = 0;
                    Message message2 = poll;
                    while (true) {
                        if (!((Integer) message2.getObjectProperty("ID")).equals(num)) {
                            Object objectProperty = message2.getObjectProperty("TRANS");
                            if (objectProperty.equals(obj)) {
                                i++;
                            } else {
                                if (obj != null) {
                                    sb.append("Missing TRANS=").append(obj).append(", size=").append(i).append("\r\n");
                                }
                                obj = objectProperty;
                                i = 1;
                            }
                            Message poll2 = this.waitingList.poll();
                            message2 = poll2;
                            if (poll2 == null) {
                                break;
                            }
                        } else if (obj != null) {
                            sb.append("Missing TRANS=").append(obj).append(", size=").append(i).append("\r\n");
                        }
                    }
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.exit("Missing messages!\r\n" + sb + "Received message: " + message + "\r\nExpected message: " + poll);
                }
                checkDeliveryTime(message);
                if (this.processed != null) {
                    this.processed.add(num);
                }
            } catch (Throwable th) {
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.exit(this + ".onClientMessage failed.\r\n received: " + message + "\r\n   server: " + poll, th);
            }
        }

        public void checkDeliveryTime(Message message) throws JMSException {
            message.getJMSTimestamp();
            long currentTimeMillis = System.currentTimeMillis() - ((this.offline.max + this.online.min) * 1);
        }

        private Connection openConnection() throws JMSException {
            ActiveMQConnection createConnection = this.cf.createConnection();
            createConnection.setClientID(this.conClientId);
            createConnection.setCloseTimeout(DurableSubDelayedUnsubscribeTest.Client.lifetime);
            createConnection.start();
            return createConnection;
        }

        private void subscribe() throws JMSException {
            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().lock();
            try {
                Connection openConnection = openConnection();
                Session createSession = openConnection.createSession(false, 1);
                createSession.createDurableSubscriber(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.topic, "subscription", this.selector, true);
                createSession.close();
                openConnection.close();
            } finally {
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().unlock();
            }
        }

        private void unsubscribe() throws JMSException {
            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().lock();
            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Unsubscribe: " + this);
            try {
                Connection openConnection = openConnection();
                Session createSession = openConnection.createSession(false, 1);
                createSession.unsubscribe("subscription");
                createSession.close();
                openConnection.close();
            } finally {
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().unlock();
            }
        }

        @Override // java.lang.Thread
        public String toString() {
            return "Client[id=" + this.id + ", type=" + this.clientType + "] consumerId=" + (this.consumer != null ? this.consumer.getConsumerId() : "null");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest$ClientManager.class */
    public final class ClientManager extends Thread {
        private int clientRover;
        private final CopyOnWriteArrayList<Client> clients;
        private boolean end;

        public ClientManager() {
            super("ClientManager");
            this.clientRover = 0;
            this.clients = new CopyOnWriteArrayList<>();
            setDaemon(true);
        }

        public synchronized void setEnd(boolean z) {
            this.end = z;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    if (this.clients.size() < 2 && !this.end) {
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().lock();
                        try {
                            createNewClient();
                            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().unlock();
                        } finally {
                        }
                    }
                    this.clients.size();
                    Thread.sleep(100L);
                } catch (Throwable th) {
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.exit("ClientManager.run failed.", th);
                    return;
                }
            }
        }

        private void createNewClient() throws JMSException {
            Client client;
            ClientType randomClientType = ClientType.randomClientType();
            synchronized (DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.server.sendMutex) {
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest durableSubProcessConcurrentCommitActivateNoDuplicateTest = DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this;
                int i = this.clientRover + 1;
                this.clientRover = i;
                client = new Client(i, randomClientType, DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CLIENT_LIFETIME, DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CLIENT_ONLINE, DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CLIENT_OFFLINE);
                this.clients.add(client);
            }
            client.start();
            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info(client.toString() + " created. " + this);
        }

        public void removeClient(Client client) {
            this.clients.remove(client);
        }

        public void onServerMessage(Message message) throws JMSException {
            Iterator<Client> it = this.clients.iterator();
            while (it.hasNext()) {
                it.next().onServerMessage(message);
            }
        }

        @Override // java.lang.Thread
        public String toString() {
            StringBuilder sb = new StringBuilder("ClientManager[count=");
            sb.append(this.clients.size());
            sb.append(", clients=");
            boolean z = false;
            Iterator<Client> it = this.clients.iterator();
            while (it.hasNext()) {
                Client next = it.next();
                if (z) {
                    sb.append(", ");
                } else {
                    z = true;
                }
                sb.append(next.toString());
            }
            sb.append(']');
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest$ClientType.class */
    public enum ClientType {
        A("a", "b", "c"),
        B("c", "d", "e"),
        C("d", "e", "f"),
        D("g", "h");

        public final String[] messageTypes;
        public final HashSet<String> messageTypeSet;
        public final String selector;

        ClientType(String... strArr) {
            this.messageTypes = strArr;
            this.messageTypeSet = new HashSet<>(Arrays.asList(strArr));
            StringBuilder sb = new StringBuilder("TYPE in (");
            for (int i = 0; i < strArr.length; i++) {
                if (i > 0) {
                    sb.append(", ");
                }
                sb.append('\'').append(strArr[i]).append('\'');
            }
            sb.append(')');
            this.selector = sb.toString();
        }

        public static ClientType randomClientType() {
            return values()[DurableSubProcessConcurrentCommitActivateNoDuplicateTest.random(values().length - 1)];
        }

        public final String randomMessageType() {
            return this.messageTypes[DurableSubProcessConcurrentCommitActivateNoDuplicateTest.random(this.messageTypes.length - 1)];
        }

        public static String randomNonRelevantMessageType() {
            return Integer.toString(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.random(20));
        }

        public final boolean isRelevant(String str) {
            return this.messageTypeSet.contains(str);
        }

        @Override // java.lang.Enum
        public final String toString() {
            return name();
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest$HouseKeeper.class */
    private final class HouseKeeper extends Thread {
        public final CopyOnWriteArrayList<String> abandonedSubscriptions;

        private HouseKeeper() {
            super("HouseKeeper");
            this.abandonedSubscriptions = new CopyOnWriteArrayList<>();
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Thread.sleep(DurableSubSelectorDelayTest.RUNTIME);
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().lock();
                    try {
                        sweep();
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().unlock();
                    } catch (Throwable th) {
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().unlock();
                        throw th;
                        break;
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Throwable th2) {
                    new Exception("HouseKeeper failed.", th2).printStackTrace();
                }
            }
        }

        private void sweep() throws Exception {
            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Housekeeper sweeping.");
            int i = 0;
            ArrayList arrayList = new ArrayList();
            try {
                try {
                    Iterator<String> it = this.abandonedSubscriptions.iterator();
                    while (it.hasNext()) {
                        String next = it.next();
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Sweeping out subscription of " + next + ".");
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.broker.getAdminView().destroyDurableSubscriber(next, "subscription");
                        arrayList.add(next);
                        i++;
                    }
                    this.abandonedSubscriptions.removeAll(arrayList);
                } catch (Exception e) {
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Ex on destroy sub " + e);
                    this.abandonedSubscriptions.removeAll(arrayList);
                }
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Housekeeper sweeped out " + i + " subscriptions.");
            } catch (Throwable th) {
                this.abandonedSubscriptions.removeAll(arrayList);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest$Persistence.class */
    public enum Persistence {
        MEMORY,
        KAHADB
    }

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest$Random.class */
    public static final class Random {
        final int min;
        final int max;

        Random(int i, int i2) {
            this.min = i;
            this.max = i2;
        }

        public int next() {
            return DurableSubProcessConcurrentCommitActivateNoDuplicateTest.random(this.min, this.max);
        }

        public void sleepRandom() throws InterruptedException {
            DurableSubProcessConcurrentCommitActivateNoDuplicateTest.sleepRandom(this.min, this.max);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest$Server.class */
    public final class Server extends Thread {
        final String url;
        final ConnectionFactory cf;
        final Object sendMutex;
        final String[] cargos;
        int transRover;
        int messageRover;
        public volatile int committingTransaction;
        public boolean done;

        public Server() {
            super("Server");
            this.url = "vm://" + DurableSubProcessConcurrentCommitActivateNoDuplicateTest.getName() + "?jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=false&jms.alwaysSyncSend=true&jms.dispatchAsync=false&jms.watchTopicAdvisories=false&waitForStart=200&create=false";
            this.cf = new ActiveMQConnectionFactory(this.url);
            this.sendMutex = new Object();
            this.cargos = new String[DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP];
            this.transRover = 0;
            this.messageRover = 0;
            this.committingTransaction = -1;
            this.done = false;
            setPriority(1);
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.done) {
                try {
                    Thread.sleep(1000L);
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().lock();
                    try {
                        send();
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().unlock();
                    } catch (Throwable th) {
                        DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.processLock.readLock().unlock();
                        throw th;
                    }
                } catch (Throwable th2) {
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.exit("Server.run failed", th2);
                    return;
                }
            }
        }

        public void send() throws JMSException {
            synchronized (this.sendMutex) {
                int i = this.transRover + 1;
                this.transRover = i;
                ClientType randomClientType = 1 != 0 ? ClientType.randomClientType() : null;
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Sending Trans[id=" + i + ", count=" + 1000 + ", clientType=" + randomClientType + ", firstID=" + (this.messageRover + 1) + "]");
                Connection createConnection = this.cf.createConnection();
                Session createSession = createConnection.createSession(true, 0);
                MessageProducer createProducer = createSession.createProducer((Destination) null);
                for (int i2 = 0; i2 < 1000; i2++) {
                    Message createMessage = createSession.createMessage();
                    int i3 = this.messageRover + 1;
                    this.messageRover = i3;
                    createMessage.setIntProperty("ID", i3);
                    createMessage.setIntProperty("TRANS", i);
                    createMessage.setStringProperty("TYPE", randomClientType != null ? randomClientType.randomMessageType() : ClientType.randomNonRelevantMessageType());
                    createMessage.setStringProperty("CARGO", getCargo(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.random(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CARGO_SIZE)));
                    createProducer.send(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.topic, createMessage);
                    DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.clientManager.onServerMessage(createMessage);
                }
                Message createMessage2 = createSession.createMessage();
                int i4 = this.messageRover + 1;
                this.messageRover = i4;
                createMessage2.setIntProperty("ID", i4);
                createMessage2.setIntProperty("TRANS", i);
                createMessage2.setBooleanProperty("COMMIT", true);
                createMessage2.setBooleanProperty("RELEVANT", true);
                createProducer.send(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.topic, createMessage2);
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.this.clientManager.onServerMessage(createMessage2);
                this.committingTransaction = i;
                createSession.commit();
                this.committingTransaction = -1;
                DurableSubProcessConcurrentCommitActivateNoDuplicateTest.LOG.info("Committed Trans[id=" + i + ", count=" + 1000 + ", clientType=" + randomClientType + "], ID=" + this.messageRover);
                createSession.close();
                createConnection.close();
            }
        }

        private String getCargo(int i) {
            if (i == 0) {
                return null;
            }
            if (i >= this.cargos.length) {
                return getCargoImpl(i);
            }
            String str = this.cargos[i];
            if (str == null) {
                str = getCargoImpl(i);
                this.cargos[i] = str;
            }
            return str;
        }

        private String getCargoImpl(int i) {
            StringBuilder sb = new StringBuilder(i);
            int i2 = i;
            while (true) {
                i2--;
                if (i2 < 0) {
                    return sb.toString();
                }
                sb.append('a');
            }
        }
    }

    @Test
    @Ignore("short version in org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testNoDuplicateOnConcurrentSendTranCommitAndActivate and org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testOrderOnActivateDeactivate")
    public void testProcess() {
        try {
            this.server.start();
            this.clientManager.start();
            this.houseKeeper.start();
            Thread.sleep(300000L);
        } catch (Throwable th) {
            exit("ProcessTest.testProcess failed.", th);
        }
        this.clientManager.setEnd(true);
        try {
            Thread.sleep(60000L);
        } catch (InterruptedException e) {
            exit("ProcessTest.testProcess failed.", e);
        }
        this.server.done = true;
        try {
            this.server.join(60000L);
        } catch (Exception e2) {
        }
        this.processLock.writeLock().lock();
        Assert.assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
        LOG.info("DONE.");
    }

    private void restartBroker() throws Exception {
        LOG.info("Broker restart: waiting for components.");
        this.processLock.writeLock().lock();
        try {
            destroyBroker();
            startBroker(false);
            this.restartCount++;
            LOG.info("Broker restarted. count: " + this.restartCount);
        } finally {
            this.processLock.writeLock().unlock();
        }
    }

    public static int random(int i) {
        return (int) (Math.random() * (i + 1));
    }

    public static int random(int i, int i2) {
        return random(i2 - i) + i;
    }

    public static void sleepRandom(int i) throws InterruptedException {
        Thread.sleep(random(i));
    }

    public static void sleepRandom(int i, int i2) throws InterruptedException {
        Thread.sleep(random(i, i2));
    }

    public static void exit(String str) {
        exit(str, null);
    }

    public static void exit(String str, Throwable th) {
        RuntimeException runtimeException = new RuntimeException(str, th);
        LOG.error(str, runtimeException);
        exceptions.add(runtimeException);
        ThreadTracker.result();
        System.exit(-9);
    }

    @Before
    public void setUp() throws Exception {
        this.topic = new ActiveMQTopic("TopicT");
        startBroker();
        this.clientManager = new ClientManager();
        this.server = new Server();
        this.houseKeeper = new HouseKeeper();
    }

    @After
    public void tearDown() throws Exception {
        destroyBroker();
    }

    private void startBroker() throws Exception {
        startBroker(true);
    }

    private void startBroker(boolean z) throws Exception {
        if (this.broker != null) {
            return;
        }
        this.broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
        this.broker.setBrokerName(getName());
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(z);
        switch (PERSISTENT_ADAPTER) {
            case MEMORY:
                this.broker.setPersistent(false);
                break;
            case KAHADB:
                File file = new File("activemq-data/" + getName() + "-kahadb");
                if (z) {
                    delete(file);
                }
                this.broker.setPersistent(true);
                KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
                kahaDBPersistenceAdapter.setDirectory(file);
                kahaDBPersistenceAdapter.setJournalMaxFileLength(5242880);
                this.broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
                break;
        }
        this.broker.addConnector("tcp://localhost:61656");
        this.broker.getSystemUsage().getMemoryUsage().setLimit(268435456L);
        this.broker.getSystemUsage().getTempUsage().setLimit(268435456L);
        this.broker.getSystemUsage().getStoreUsage().setLimit(1073741824L);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMaxAuditDepth(AMQ4607Test.TIMEOUT);
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
    }

    protected static String getName() {
        return "DurableSubProcessWithRestartTest";
    }

    private static boolean delete(File file) {
        if (file == null) {
            return true;
        }
        if (file.isDirectory()) {
            for (File file2 : file.listFiles()) {
                delete(file2);
            }
        }
        return file.delete();
    }

    private void destroyBroker() throws Exception {
        if (this.broker == null) {
            return;
        }
        this.broker.stop();
        this.broker = null;
    }
}
