package com.datatorrent.lib.io.jms;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.util.ActiveMQMultiTypeMessageListener;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/jms/JMSOutputOperatorTest.class */
public class JMSOutputOperatorTest extends JMSTestBase {
    public static final transient int maxTuple = 20;
    public static final String CLIENT_ID = "Client1";
    public static final String APP_ID = "appId";
    public static final int OPERATOR_ID = 1;
    public static JMSStringSinglePortOutputOperator outputOperator;
    public static Context.OperatorContext testOperatorContext;
    public static Context.OperatorContext testOperatorContextAMO;
    public static final int HALF_BATCH_SIZE = 5;
    public static final int BATCH_SIZE = 10;

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static final Logger logger = LoggerFactory.getLogger(JMSOutputOperatorTest.class);
    public static int tupleCount = 0;
    public static final Random random = new Random();

    /* loaded from: input_file:com/datatorrent/lib/io/jms/JMSOutputOperatorTest$JMSMultiPortOutputOperator.class */
    public static class JMSMultiPortOutputOperator extends AbstractJMSOutputOperator {
        public final transient DefaultInputPort<String> inputPort1 = new DefaultInputPort<String>() { // from class: com.datatorrent.lib.io.jms.JMSOutputOperatorTest.JMSMultiPortOutputOperator.1
            public void process(String str) {
                JMSMultiPortOutputOperator.this.sendMessage(str);
            }
        };
        public final transient DefaultInputPort<Integer> inputPort2 = new DefaultInputPort<Integer>() { // from class: com.datatorrent.lib.io.jms.JMSOutputOperatorTest.JMSMultiPortOutputOperator.2
            public void process(Integer num) {
                JMSMultiPortOutputOperator.this.sendMessage(num);
            }
        };

        protected Message createMessage(Object obj) {
            try {
                return getSession().createTextMessage(obj.toString());
            } catch (JMSException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/jms/JMSOutputOperatorTest$JMSStringSinglePortOutputOperator.class */
    public static class JMSStringSinglePortOutputOperator extends AbstractJMSSinglePortOutputOperator<String> {
        public JMSStringSinglePortOutputOperator() {
            setBatch(10);
        }

        protected Message createMessage(Object obj) {
            try {
                return getSession().createTextMessage(obj.toString());
            } catch (JMSException e) {
                throw new RuntimeException("Failed to create message.", e);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/jms/JMSOutputOperatorTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        protected void starting(Description description) {
            JMSOutputOperatorTest.logger.debug("Starting test {}", description.getMethodName());
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(DAG.APPLICATION_ID, "appId");
            JMSOutputOperatorTest.testOperatorContext = OperatorContextTestHelper.mockOperatorContext(1, defaultAttributeMap);
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap2.put(Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
            JMSOutputOperatorTest.testOperatorContextAMO = OperatorContextTestHelper.mockOperatorContext(1, defaultAttributeMap2);
            try {
                FileUtils.deleteDirectory(new File("recovery"));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        protected void finished(Description description) {
            try {
                FileUtils.deleteDirectory(new File("recovery"));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void createOperator(boolean z, Context.OperatorContext operatorContext) {
        createOperator(z, operatorContext, 0);
    }

    private void createOperator(boolean z, Context.OperatorContext operatorContext, int i) {
        outputOperator = new JMSStringSinglePortOutputOperator();
        outputOperator.getConnectionFactoryProperties().put("userName", "");
        outputOperator.getConnectionFactoryProperties().put("password", "");
        outputOperator.getConnectionFactoryProperties().put("brokerURL", "tcp://localhost:61617");
        outputOperator.setAckMode("CLIENT_ACKNOWLEDGE");
        outputOperator.setClientId("Client1");
        outputOperator.setSubject(JMSTransactionableStoreTestBase.SUBJECT);
        outputOperator.setMessageSize(255);
        outputOperator.setBatch(10);
        outputOperator.setTopic(z);
        outputOperator.setDurable(false);
        outputOperator.setVerbose(true);
        outputOperator.setup(operatorContext);
    }

    @Test
    public void testJMSOutputOperator1() throws Exception {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setupConnection();
        activeMQMultiTypeMessageListener.run();
        createOperator(false, testOperatorContext, 15);
        outputOperator.beginWindow(1L);
        int i = 0;
        while (i < 20) {
            logger.debug("Emitting tuple {}", Integer.valueOf(i));
            i++;
            outputOperator.inputPort.process("testString " + i);
            tupleCount++;
        }
        outputOperator.endWindow();
        outputOperator.teardown();
        Thread.sleep(500L);
        Assert.assertEquals("Number of emitted tuples", 20L, activeMQMultiTypeMessageListener.receivedData.size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(activeMQMultiTypeMessageListener.receivedData.size())));
        Assert.assertEquals("First tuple", "testString 1", activeMQMultiTypeMessageListener.receivedData.get(1));
        activeMQMultiTypeMessageListener.closeConnection();
    }

    @Test
    public void testJMSOutputOperator2() throws Exception {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setTopic(true);
        activeMQMultiTypeMessageListener.setupConnection();
        activeMQMultiTypeMessageListener.run();
        createOperator(true, testOperatorContext, 15);
        outputOperator.beginWindow(1L);
        int i = 0;
        while (i < 20) {
            i++;
            outputOperator.inputPort.process("testString " + i);
            tupleCount++;
        }
        outputOperator.endWindow();
        outputOperator.teardown();
        Thread.sleep(500L);
        Assert.assertEquals("Number of emitted tuples", 20L, activeMQMultiTypeMessageListener.receivedData.size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(activeMQMultiTypeMessageListener.receivedData.size())));
        Assert.assertEquals("First tuple", "testString 1", activeMQMultiTypeMessageListener.receivedData.get(1));
        activeMQMultiTypeMessageListener.closeConnection();
    }

    @Test
    public void testBatch() throws JMSException, InterruptedException {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setTopic(false);
        activeMQMultiTypeMessageListener.setupConnection();
        activeMQMultiTypeMessageListener.run();
        createOperator(false, testOperatorContext);
        outputOperator.beginWindow(0L);
        for (int i = 0; i < 10; i++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        outputOperator.endWindow();
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.beginWindow(1L);
        for (int i2 = 0; i2 < 5; i2++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        outputOperator.endWindow();
        Thread.sleep(200L);
        Assert.assertEquals("Batch should not be written", 15L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.beginWindow(2L);
        for (int i3 = 0; i3 < 5; i3++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        outputOperator.endWindow();
        Thread.sleep(200L);
        Assert.assertEquals("Batch should not be written", 20L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.teardown();
        activeMQMultiTypeMessageListener.closeConnection();
    }

    @Test
    public void testAtLeastOnceFullBatch() throws JMSException, InterruptedException {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setTopic(false);
        activeMQMultiTypeMessageListener.setupConnection();
        activeMQMultiTypeMessageListener.run();
        createOperator(false, testOperatorContext);
        outputOperator.beginWindow(0L);
        for (int i = 0; i < 10; i++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        outputOperator.endWindow();
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.beginWindow(1L);
        for (int i2 = 0; i2 < 10; i2++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.teardown();
        outputOperator.setup(testOperatorContext);
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.beginWindow(0L);
        for (int i3 = 0; i3 < 10; i3++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        outputOperator.endWindow();
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.beginWindow(1L);
        for (int i4 = 0; i4 < 10; i4++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        outputOperator.endWindow();
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 20L, activeMQMultiTypeMessageListener.receivedData.size());
        activeMQMultiTypeMessageListener.closeConnection();
    }

    @Test
    public void testAtLeastOnceHalfBatch() throws JMSException, InterruptedException {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setTopic(false);
        activeMQMultiTypeMessageListener.setupConnection();
        activeMQMultiTypeMessageListener.run();
        createOperator(false, testOperatorContext);
        outputOperator.beginWindow(0L);
        for (int i = 0; i < 10; i++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        outputOperator.endWindow();
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.beginWindow(1L);
        for (int i2 = 0; i2 < 5; i2++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.teardown();
        outputOperator.setup(testOperatorContext);
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.beginWindow(0L);
        for (int i3 = 0; i3 < 10; i3++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        outputOperator.endWindow();
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
        outputOperator.beginWindow(1L);
        for (int i4 = 0; i4 < 5; i4++) {
            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
        }
        outputOperator.endWindow();
        Thread.sleep(200L);
        Assert.assertEquals("Batch should be written", 15L, activeMQMultiTypeMessageListener.receivedData.size());
        activeMQMultiTypeMessageListener.closeConnection();
    }

    @Test
    public void testAtMostOnceFullBatch() {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setTopic(false);
        try {
            activeMQMultiTypeMessageListener.setupConnection();
            activeMQMultiTypeMessageListener.run();
            createOperator(false, testOperatorContextAMO);
            outputOperator.beginWindow(0L);
            for (int i = 0; i < 10; i++) {
                outputOperator.inputPort.put(Integer.toString(random.nextInt()));
            }
            outputOperator.endWindow();
            try {
                Thread.sleep(200L);
                Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
                outputOperator.beginWindow(1L);
                for (int i2 = 0; i2 < 10; i2++) {
                    outputOperator.inputPort.put(Integer.toString(random.nextInt()));
                }
                try {
                    Thread.sleep(200L);
                    Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
                    outputOperator.teardown();
                    outputOperator.setup(testOperatorContext);
                    outputOperator.beginWindow(2L);
                    for (int i3 = 0; i3 < 10; i3++) {
                        outputOperator.inputPort.put(Integer.toString(random.nextInt()));
                    }
                    outputOperator.endWindow();
                    try {
                        Thread.sleep(200L);
                        Assert.assertEquals("Batch should be written", 20L, activeMQMultiTypeMessageListener.receivedData.size());
                        activeMQMultiTypeMessageListener.closeConnection();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            } catch (InterruptedException e3) {
                throw new RuntimeException(e3);
            }
        } catch (JMSException e4) {
            throw new RuntimeException((Throwable) e4);
        }
    }

    @Test
    public void testAtMostOnceHalfBatch() {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setTopic(false);
        try {
            activeMQMultiTypeMessageListener.setupConnection();
            activeMQMultiTypeMessageListener.run();
            createOperator(false, testOperatorContextAMO);
            outputOperator.beginWindow(0L);
            for (int i = 0; i < 10; i++) {
                outputOperator.inputPort.put(Integer.toString(random.nextInt()));
            }
            outputOperator.endWindow();
            try {
                Thread.sleep(200L);
                Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
                outputOperator.beginWindow(1L);
                for (int i2 = 0; i2 < 5; i2++) {
                    outputOperator.inputPort.put(Integer.toString(random.nextInt()));
                }
                try {
                    Thread.sleep(200L);
                    Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
                    outputOperator.teardown();
                    outputOperator.setup(testOperatorContext);
                    try {
                        Thread.sleep(200L);
                        Assert.assertEquals("Batch should be written", 10L, activeMQMultiTypeMessageListener.receivedData.size());
                        outputOperator.beginWindow(2L);
                        for (int i3 = 0; i3 < 10; i3++) {
                            outputOperator.inputPort.put(Integer.toString(random.nextInt()));
                        }
                        outputOperator.endWindow();
                        try {
                            Thread.sleep(200L);
                            Assert.assertEquals("Batch should be written", 20L, activeMQMultiTypeMessageListener.receivedData.size());
                            activeMQMultiTypeMessageListener.closeConnection();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    } catch (InterruptedException e2) {
                        throw new RuntimeException(e2);
                    }
                } catch (InterruptedException e3) {
                    throw new RuntimeException(e3);
                }
            } catch (InterruptedException e4) {
                throw new RuntimeException(e4);
            }
        } catch (JMSException e5) {
            throw new RuntimeException((Throwable) e5);
        }
    }

    @Test
    public void testJMSMultiPortOutputOperator() throws Exception {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setMaximumReceiveMessages(0L);
        activeMQMultiTypeMessageListener.setupConnection();
        activeMQMultiTypeMessageListener.run();
        JMSMultiPortOutputOperator jMSMultiPortOutputOperator = new JMSMultiPortOutputOperator();
        jMSMultiPortOutputOperator.getConnectionFactoryProperties().put("userName", "");
        jMSMultiPortOutputOperator.getConnectionFactoryProperties().put("password", "");
        jMSMultiPortOutputOperator.getConnectionFactoryProperties().put("brokerURL", "tcp://localhost:61617");
        jMSMultiPortOutputOperator.setAckMode("CLIENT_ACKNOWLEDGE");
        jMSMultiPortOutputOperator.setClientId("Client1");
        jMSMultiPortOutputOperator.setSubject(JMSTransactionableStoreTestBase.SUBJECT);
        jMSMultiPortOutputOperator.setMessageSize(255);
        jMSMultiPortOutputOperator.setBatch(10);
        jMSMultiPortOutputOperator.setTopic(false);
        jMSMultiPortOutputOperator.setDurable(false);
        logger.debug("Before set store");
        jMSMultiPortOutputOperator.setStore(new JMSTransactionableStore());
        logger.debug("After set store");
        logger.debug("Store class {}", jMSMultiPortOutputOperator.getStore());
        jMSMultiPortOutputOperator.setVerbose(true);
        jMSMultiPortOutputOperator.setup(testOperatorContext);
        jMSMultiPortOutputOperator.beginWindow(1L);
        int i = 0;
        while (i < 20) {
            i++;
            jMSMultiPortOutputOperator.inputPort1.process("testString " + i);
            jMSMultiPortOutputOperator.inputPort2.process(Integer.valueOf(i));
            tupleCount++;
        }
        jMSMultiPortOutputOperator.endWindow();
        jMSMultiPortOutputOperator.teardown();
        Thread.sleep(500L);
        Assert.assertEquals("Number of emitted tuples", 40L, activeMQMultiTypeMessageListener.receivedData.size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(activeMQMultiTypeMessageListener.receivedData.size())));
        Assert.assertEquals("First tuple", "testString 1", activeMQMultiTypeMessageListener.receivedData.get(1));
        activeMQMultiTypeMessageListener.closeConnection();
    }
}
