package com.datatorrent.lib.io.jms;

import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.auth.PropertiesFileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.jms.JMSBase;
import com.datatorrent.lib.testbench.CollectorTestSink;
import java.io.File;
import javax.jms.ConnectionFactory;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.AssumptionViolatedException;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.class */
public class SQSStringInputOperatorTest {

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static final transient Logger LOG = LoggerFactory.getLogger(SQSStringInputOperatorTest.class);

    /* loaded from: input_file:com/datatorrent/lib/io/jms/SQSStringInputOperatorTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        String baseDir;
        JMSStringInputOperator operator;
        CollectorTestSink<Object> sink;
        Context.OperatorContext context;
        SQSTestBase testBase;

        protected void starting(Description description) {
            final String methodName = description.getMethodName();
            final String className = description.getClassName();
            this.testBase = new SQSTestBase();
            if (this.testBase.validateTestCreds()) {
                this.testBase.generateCurrentQueueName(methodName);
                try {
                    this.testBase.beforTest();
                    this.baseDir = "target/" + className + "/" + methodName;
                    Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
                    defaultAttributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
                    defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, this.baseDir);
                    this.context = OperatorContextTestHelper.mockOperatorContext(1, defaultAttributeMap);
                    this.operator = new JMSStringInputOperator();
                    this.operator.setConnectionFactoryBuilder(new JMSBase.ConnectionFactoryBuilder() { // from class: com.datatorrent.lib.io.jms.SQSStringInputOperatorTest.TestMeta.1
                        public ConnectionFactory buildConnectionFactory() {
                            return SQSConnectionFactory.builder().withRegion(Region.getRegion(Regions.US_EAST_1)).withAWSCredentialsProvider(new PropertiesFileCredentialsProvider(TestMeta.this.testBase.getDevCredsFilePath())).build();
                        }

                        public String toString() {
                            return className + "/" + methodName + "/ConnectionFactoryBuilder";
                        }
                    });
                    this.operator.setSubject(this.testBase.getCurrentQueueName());
                    this.operator.setAckMode("AUTO_ACKNOWLEDGE");
                    this.operator.setTransacted(false);
                    this.sink = new CollectorTestSink<>();
                    this.operator.output.setSink(this.sink);
                    this.operator.setup(this.context);
                    this.operator.activate(this.context);
                } catch (AssumptionViolatedException e) {
                    throw e;
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            }
        }

        protected void finished(Description description) {
            if (this.operator == null) {
                Assert.assertFalse(this.testBase.validateTestCreds());
                return;
            }
            this.operator.deactivate();
            this.operator.teardown();
            try {
                FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
                this.testBase.afterTest();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    public void testStringMsgInput() throws Exception {
        this.testMeta.testBase.validateAssumption();
        this.testMeta.testBase.produceMsg("testStringMsgInput", 10, false);
        Thread.sleep(1000L);
        this.testMeta.operator.emitTuples();
        Assert.assertEquals("num of messages", 10L, this.testMeta.sink.collectedTuples.size());
    }

    @Test
    public void testRecoveryAndIdempotency() throws Exception {
        this.testMeta.testBase.validateAssumption();
        this.testMeta.testBase.produceUniqueMsgs("testRecoveryAndIdempotency", 25, false);
        Thread.sleep(3000L);
        this.testMeta.operator.beginWindow(1L);
        this.testMeta.operator.emitTuples();
        this.testMeta.operator.endWindow();
        Assert.assertEquals("num of messages in window 1 pre-failure", 25L, this.testMeta.sink.collectedTuples.size());
        String str = (String) this.testMeta.sink.collectedTuples.get(4);
        String str2 = (String) this.testMeta.sink.collectedTuples.get(17);
        this.testMeta.sink.collectedTuples.clear();
        this.testMeta.operator.setup(this.testMeta.context);
        this.testMeta.operator.activate(this.testMeta.context);
        Assert.assertEquals("largest recovery window", 1L, this.testMeta.operator.getWindowDataManager().getLargestCompletedWindow());
        this.testMeta.operator.beginWindow(1L);
        this.testMeta.operator.endWindow();
        Assert.assertEquals("num of messages in window 1", 25L, this.testMeta.sink.collectedTuples.size());
        Assert.assertEquals(str, this.testMeta.sink.collectedTuples.get(4));
        Assert.assertEquals(str2, this.testMeta.sink.collectedTuples.get(17));
        this.testMeta.sink.collectedTuples.clear();
    }

    @Test
    public void testFailureAfterPersistenceAndBeforeRecovery() throws Exception {
        this.testMeta.testBase.validateAssumption();
        this.testMeta.sink = new CollectorTestSink<Object>() { // from class: com.datatorrent.lib.io.jms.SQSStringInputOperatorTest.1
            public void put(Object obj) {
                if ((obj instanceof String) && ((String) obj).startsWith("4:")) {
                    throw new RuntimeException("fail 4th message");
                }
                synchronized (this.collectedTuples) {
                    this.collectedTuples.add(obj);
                    this.collectedTuples.notifyAll();
                }
            }
        };
        this.testMeta.operator.output.setSink(this.testMeta.sink);
        this.testMeta.testBase.produceUniqueMsgs("testFailureAfterPersistenceAndBeforeRecovery", 10, false);
        Thread.sleep(1000L);
        this.testMeta.operator.beginWindow(1L);
        try {
            this.testMeta.operator.emitTuples();
        } catch (Throwable th) {
            LOG.debug("emit exception");
        }
        Assert.assertTrue("num of messages before endWindow 1", this.testMeta.sink.collectedTuples.size() < 9);
        this.testMeta.operator.endWindow();
        Assert.assertEquals("num of messages after endWindow 1", 9L, this.testMeta.sink.collectedTuples.size());
        this.testMeta.operator.setup(this.testMeta.context);
        this.testMeta.operator.activate(this.testMeta.context);
        Assert.assertEquals("window 1 should exist", 1L, this.testMeta.operator.getWindowDataManager().getLargestCompletedWindow());
    }
}
