package com.datatorrent.lib.io.jms;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.jms.JMSOutputOperatorTest;
import com.datatorrent.lib.util.ActiveMQMultiTypeMessageListener;
import java.io.File;
import javax.jms.JMSException;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

/* loaded from: input_file:com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.class */
public class JMSTransactionableStoreTestBase extends JMSTestBase {
    public static final String SUBJECT = "TEST.FOO";
    public static final String CLIENT_ID = "Client1";
    public static final String APP_ID = "appId";
    public static final int OPERATOR_ID = 1;
    public static JMSBaseTransactionableStore store;
    public static JMSOutputOperatorTest.JMSStringSinglePortOutputOperator outputOperator;
    public static Class<? extends JMSBaseTransactionableStore> storeClass;
    public static OperatorContextTestHelper.TestIdOperatorContext testOperatorContext;

    @Rule
    public TestMeta testMeta = new TestMeta();

    /* loaded from: input_file:com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        protected void starting(Description description) {
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(DAG.APPLICATION_ID, "appId");
            JMSTransactionableStoreTestBase.testOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, defaultAttributeMap);
            FileUtils.deleteQuietly(new File("recovery"));
        }

        protected void finished(Description description) {
            FileUtils.deleteQuietly(new File("recovery"));
        }
    }

    public JMSTransactionableStoreTestBase(Class<? extends JMSBaseTransactionableStore> cls) {
        storeClass = cls;
    }

    private void createOperator() {
        outputOperator = new JMSOutputOperatorTest.JMSStringSinglePortOutputOperator();
        try {
            store = storeClass.newInstance();
            outputOperator.getConnectionFactoryProperties().put("userName", "");
            outputOperator.getConnectionFactoryProperties().put("password", "");
            outputOperator.getConnectionFactoryProperties().put("brokerURL", "tcp://localhost:61617");
            outputOperator.setAckMode("CLIENT_ACKNOWLEDGE");
            outputOperator.setClientId("Client1");
            outputOperator.setSubject(SUBJECT);
            outputOperator.setMessageSize(255);
            outputOperator.setBatch(1);
            outputOperator.setTopic(false);
            outputOperator.setDurable(false);
            outputOperator.setStore(store);
            outputOperator.setVerbose(true);
            outputOperator.setup(testOperatorContext);
        } catch (IllegalAccessException | InstantiationException e) {
            throw new RuntimeException(e);
        }
    }

    private void deleteOperator() {
        outputOperator.teardown();
    }

    @Test
    public void connectedTest() {
        createOperator();
        Assert.assertTrue("Should be connected.", store.isConnected());
        deleteOperator();
        Assert.assertFalse("Should not be connected.", store.isConnected());
    }

    @Test
    public void transactionTest() {
        createOperator();
        Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
        store.beginTransaction();
        Assert.assertTrue("Should be in transaction.", store.isInTransaction());
        store.commitTransaction();
        Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
        deleteOperator();
    }

    @Test
    public void storeRetreiveTransactionTest() {
        createOperator();
        outputOperator.beginWindow(0L);
        outputOperator.endWindow();
        Assert.assertEquals(0L, store.getCommittedWindowId("appId", 1));
        deleteOperator();
    }

    @Test
    public void multiWindowTransactionTest() {
        createOperator();
        Assert.assertEquals(-1L, store.getCommittedWindowId("appId", 1));
        outputOperator.beginWindow(0L);
        outputOperator.endWindow();
        Assert.assertEquals(0L, store.getCommittedWindowId("appId", 1));
        outputOperator.beginWindow(1L);
        outputOperator.endWindow();
        Assert.assertEquals(1L, store.getCommittedWindowId("appId", 1));
        outputOperator.beginWindow(2L);
        outputOperator.endWindow();
        Assert.assertEquals(2L, store.getCommittedWindowId("appId", 1));
        outputOperator.beginWindow(3L);
        outputOperator.endWindow();
        Assert.assertEquals(3L, store.getCommittedWindowId("appId", 1));
        outputOperator.beginWindow(4L);
        outputOperator.endWindow();
        deleteOperator();
    }

    @Test
    public void commitTest() throws JMSException, InterruptedException {
        ActiveMQMultiTypeMessageListener activeMQMultiTypeMessageListener = new ActiveMQMultiTypeMessageListener();
        activeMQMultiTypeMessageListener.setSubject(SUBJECT);
        activeMQMultiTypeMessageListener.setupConnection();
        activeMQMultiTypeMessageListener.run();
        createOperator();
        store.beginTransaction();
        outputOperator.inputPort.put("a");
        Thread.sleep(500L);
        Assert.assertEquals(0L, activeMQMultiTypeMessageListener.receivedData.size());
        store.commitTransaction();
        Thread.sleep(500L);
        Assert.assertEquals(1L, activeMQMultiTypeMessageListener.receivedData.size());
        deleteOperator();
        activeMQMultiTypeMessageListener.closeConnection();
    }
}
