/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlowConsumerTest
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(SlowConsumerTest.class);
    private static final int MESSAGES_COUNT = 10000;
    private final int messageLogFrequency = 2500;
    private final long messageReceiveTimeout = 10000L;
    private Socket stompSocket;
    private ByteArrayOutputStream inputBuffer;
    private int messagesCount;

    public void testRemoveSubscriber() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(true);
        broker.setUseJmx(true);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.addConnector("tcp://localhost:0").setName("Default");
        broker.start();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0)).getPublishableConnectString());
        final Connection connection = factory.createConnection();
        connection.start();
        Thread producingThread = new Thread("Producing thread"){

            @Override
            public void run() {
                try {
                    Session session = connection.createSession(false, 1);
                    MessageProducer producer = session.createProducer((Destination)new ActiveMQQueue(SlowConsumerTest.this.getDestinationName()));
                    for (int idx = 0; idx < 10000; ++idx) {
                        TextMessage message = session.createTextMessage("" + idx);
                        producer.send((Message)message);
                        LOG.debug("Sending: " + idx);
                    }
                    producer.close();
                    session.close();
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        producingThread.setPriority(10);
        producingThread.start();
        Thread.sleep(1000L);
        Thread consumingThread = new Thread("Consuming thread"){

            @Override
            public void run() {
                try {
                    Session session = connection.createSession(false, 2);
                    MessageConsumer consumer = session.createConsumer((Destination)new ActiveMQQueue(SlowConsumerTest.this.getDestinationName()));
                    int diff = 0;
                    while (SlowConsumerTest.this.messagesCount != 10000) {
                        Message msg = consumer.receive(10000L);
                        if (msg == null) {
                            LOG.warn("Got null message at count: " + SlowConsumerTest.this.messagesCount + ". Continuing...");
                            break;
                        }
                        String text = ((TextMessage)msg).getText();
                        int currentMsgIdx = Integer.parseInt(text);
                        LOG.debug("Received: " + text + " messageCount: " + SlowConsumerTest.this.messagesCount);
                        msg.acknowledge();
                        if (SlowConsumerTest.this.messagesCount + diff != currentMsgIdx) {
                            LOG.debug("Message(s) skipped!! Should be message no.: " + SlowConsumerTest.this.messagesCount + " but got: " + currentMsgIdx);
                            diff = currentMsgIdx - SlowConsumerTest.this.messagesCount;
                        }
                        ++SlowConsumerTest.this.messagesCount;
                        if (SlowConsumerTest.this.messagesCount % 2500 != 0) continue;
                        LOG.info("Received: " + SlowConsumerTest.this.messagesCount + " messages so far");
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        consumingThread.start();
        consumingThread.join();
        SlowConsumerTest.assertEquals((int)10000, (int)this.messagesCount);
    }

    public void sendFrame(String data) throws Exception {
        byte[] bytes = data.getBytes("UTF-8");
        OutputStream outputStream = this.stompSocket.getOutputStream();
        for (int i = 0; i < bytes.length; ++i) {
            outputStream.write(bytes[i]);
        }
        outputStream.flush();
    }

    public String receiveFrame(long timeOut) throws Exception {
        this.stompSocket.setSoTimeout((int)timeOut);
        InputStream is = this.stompSocket.getInputStream();
        int c = 0;
        while (true) {
            if ((c = is.read()) < 0) {
                throw new IOException("socket closed.");
            }
            if (c == 0) {
                c = is.read();
                byte[] ba = this.inputBuffer.toByteArray();
                this.inputBuffer.reset();
                return new String(ba, "UTF-8");
            }
            this.inputBuffer.write(c);
        }
    }

    protected String getDestinationName() {
        return ((Object)((Object)this)).getClass().getName() + "." + this.getName();
    }
}

