package com.datatorrent.lib.io.jms;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.jms.JMSOutputOperatorTest;
import com.datatorrent.lib.util.ActiveMQMultiTypeMessageListener;
import java.io.File;
import javax.jms.JMSException;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

/* loaded from: input_file:com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.class */
public class JMSTransactionableStoreTestBase extends JMSTestBase {
    public static final String SUBJECT = "TEST.FOO";
    public static final String CLIENT_ID = "Client1";
    public static final String APP_ID = "appId";
    public static final int OPERATOR_ID = 1;
    public static final int OPERATOR_2_ID = 2;
    public static Class<? extends JMSBaseTransactionableStore> storeClass;
    public static Context.OperatorContext testOperatorContext;
    public static Context.OperatorContext testOperator2Context;

    @Rule
    public TestMeta testMeta = new TestMeta();

    /* loaded from: input_file:com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        protected void starting(Description description) {
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(DAG.APPLICATION_ID, "appId");
            JMSTransactionableStoreTestBase.testOperatorContext = OperatorContextTestHelper.mockOperatorContext(1, defaultAttributeMap);
            JMSTransactionableStoreTestBase.testOperator2Context = OperatorContextTestHelper.mockOperatorContext(2, defaultAttributeMap);
            FileUtils.deleteQuietly(new File("recovery"));
        }

        protected void finished(Description description) {
            FileUtils.deleteQuietly(new File("recovery"));
        }
    }

    public JMSTransactionableStoreTestBase(Class<? extends JMSBaseTransactionableStore> cls) {
        storeClass = cls;
    }

    private JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator(Context.OperatorContext operatorContext, String str) {
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator jMSStringSinglePortOutputOperator = new JMSOutputOperatorTest.JMSStringSinglePortOutputOperator();
        try {
            JMSTransactionableStore jMSTransactionableStore = (JMSBaseTransactionableStore) storeClass.newInstance();
            if (JMSTransactionableStore.class.equals(storeClass) && str != null) {
                jMSTransactionableStore.setMetaQueueName(str);
            }
            jMSStringSinglePortOutputOperator.getConnectionFactoryProperties().put("userName", "");
            jMSStringSinglePortOutputOperator.getConnectionFactoryProperties().put("password", "");
            jMSStringSinglePortOutputOperator.getConnectionFactoryProperties().put("brokerURL", "tcp://localhost:61617");
            jMSStringSinglePortOutputOperator.setAckMode("CLIENT_ACKNOWLEDGE");
            jMSStringSinglePortOutputOperator.setClientId("Client1");
            jMSStringSinglePortOutputOperator.setSubject(SUBJECT);
            jMSStringSinglePortOutputOperator.setMessageSize(255);
            jMSStringSinglePortOutputOperator.setBatch(1);
            jMSStringSinglePortOutputOperator.setTopic(false);
            jMSStringSinglePortOutputOperator.setDurable(false);
            jMSStringSinglePortOutputOperator.setStore(jMSTransactionableStore);
            jMSStringSinglePortOutputOperator.setVerbose(true);
            jMSStringSinglePortOutputOperator.setup(operatorContext);
            return jMSStringSinglePortOutputOperator;
        } catch (IllegalAccessException | InstantiationException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void connectedTest() {
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator = createOperator(testOperatorContext, null);
        JMSBaseTransactionableStore store = createOperator.getStore();
        Assert.assertTrue("Should be connected.", store.isConnected());
        createOperator.teardown();
        Assert.assertFalse("Should not be connected.", store.isConnected());
    }

    @Test
    public void transactionTest() {
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator = createOperator(testOperatorContext, null);
        JMSBaseTransactionableStore store = createOperator.getStore();
        Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
        store.beginTransaction();
        Assert.assertTrue("Should be in transaction.", store.isInTransaction());
        store.commitTransaction();
        Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
        createOperator.teardown();
    }

    @Test
    public void storeRetreiveTransactionTest() {
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator = createOperator(testOperatorContext, null);
        JMSBaseTransactionableStore store = createOperator.getStore();
        createOperator.beginWindow(0L);
        createOperator.endWindow();
        Assert.assertEquals(0L, store.getCommittedWindowId("appId", 1));
        createOperator.teardown();
    }

    @Test
    public void twoOperatorsStoreRetrieveWithMessageSelectorTransactionTest() {
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator = createOperator(testOperatorContext, null);
        createOperator.beginWindow(0L);
        createOperator.endWindow();
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator2 = createOperator(testOperator2Context, null);
        createOperator2.beginWindow(1L);
        createOperator2.endWindow();
        Assert.assertEquals(0L, createOperator.getStore().getCommittedWindowId("appId", 1));
        Assert.assertEquals(1L, createOperator2.getStore().getCommittedWindowId("appId", 2));
        createOperator.teardown();
        createOperator2.teardown();
    }

    @Test
    public void twoOperatorsStoreRetrieveWithMessageSelectorTransactionTestWithCustomMetaQueueName() {
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator = createOperator(testOperatorContext, "metaQ1");
        createOperator.beginWindow(0L);
        createOperator.endWindow();
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator2 = createOperator(testOperator2Context, "metaQ2");
        createOperator2.beginWindow(1L);
        createOperator2.endWindow();
        Assert.assertEquals(0L, createOperator.getStore().getCommittedWindowId("appId", 1));
        Assert.assertEquals(1L, createOperator2.getStore().getCommittedWindowId("appId", 2));
        createOperator.teardown();
        createOperator2.teardown();
    }

    @Test
    public void multiWindowTransactionTest() {
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator = createOperator(testOperatorContext, null);
        JMSBaseTransactionableStore store = createOperator.getStore();
        Assert.assertEquals(-1L, store.getCommittedWindowId("appId", 1));
        createOperator.beginWindow(0L);
        createOperator.endWindow();
        Assert.assertEquals(0L, store.getCommittedWindowId("appId", 1));
        createOperator.beginWindow(1L);
        createOperator.endWindow();
        Assert.assertEquals(1L, store.getCommittedWindowId("appId", 1));
        createOperator.beginWindow(2L);
        createOperator.endWindow();
        Assert.assertEquals(2L, store.getCommittedWindowId("appId", 1));
        createOperator.beginWindow(3L);
        createOperator.endWindow();
        Assert.assertEquals(3L, store.getCommittedWindowId("appId", 1));
        createOperator.beginWindow(4L);
        createOperator.endWindow();
        createOperator.teardown();
    }

    @Test
    public void commitTest() throws JMSException, InterruptedException {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setSubject(SUBJECT);
        activeMQMultiTypeMessageListener.setupConnection();
        activeMQMultiTypeMessageListener.run();
        JMSOutputOperatorTest.JMSStringSinglePortOutputOperator createOperator = createOperator(testOperatorContext, null);
        JMSBaseTransactionableStore store = createOperator.getStore();
        store.beginTransaction();
        createOperator.inputPort.put("a");
        Thread.sleep(500L);
        Assert.assertEquals(0L, activeMQMultiTypeMessageListener.receivedData.size());
        store.commitTransaction();
        Thread.sleep(500L);
        Assert.assertEquals(1L, activeMQMultiTypeMessageListener.receivedData.size());
        createOperator.teardown();
        activeMQMultiTypeMessageListener.closeConnection();
    }
}
