package com.datatorrent.contrib.rabbitmq;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.contrib.helper.CollectorModule;
import com.datatorrent.contrib.helper.MessageQueueTestHelper;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.netlet.util.DTThrowable;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.class */
public class RabbitMQInputOperatorTest {
    private static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest$RabbitMQMessageGenerator.class */
    public final class RabbitMQMessageGenerator {
        ConnectionFactory connFactory;
        QueueingConsumer consumer;
        Connection connection;
        Channel channel;
        final String exchange = "testEx";
        public String queueName;

        private RabbitMQMessageGenerator() {
            this.connFactory = new ConnectionFactory();
            this.consumer = null;
            this.connection = null;
            this.channel = null;
            this.exchange = "testEx";
            this.queueName = "testQ";
        }

        public void setup() throws IOException {
            this.connFactory.setHost("localhost");
            this.connection = this.connFactory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.exchangeDeclare("testEx", "fanout");
        }

        public void setQueueName(String str) {
            this.queueName = str;
        }

        public void process(Object obj) throws IOException {
            this.channel.basicPublish("testEx", "", (AMQP.BasicProperties) null, obj.toString().getBytes());
        }

        public void teardown() throws IOException {
            this.channel.close();
            this.connection.close();
        }

        public void generateMessages(int i) throws InterruptedException, IOException {
            for (int i2 = 0; i2 < i; i2++) {
                ArrayList<HashMap<String, Integer>> messages = MessageQueueTestHelper.getMessages();
                for (int i3 = 0; i3 < messages.size(); i3++) {
                    process(messages.get(i3));
                }
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest$TestStringRabbitMQInputOperator.class */
    public static final class TestStringRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String> {
        /* renamed from: getTuple, reason: merged with bridge method [inline-methods] */
        public String m72getTuple(byte[] bArr) {
            return new String(bArr);
        }

        public void replayTuples(long j) {
            throw new UnsupportedOperationException("Not supported yet.");
        }
    }

    @Test
    public void testDag() throws Exception {
        runTest(3);
        logger.debug("end of test");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Type inference failed for: r0v21, types: [com.datatorrent.contrib.rabbitmq.RabbitMQInputOperatorTest$1] */
    public void runTest(final int i) throws IOException {
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        RabbitMQInputOperator addOperator = dag.addOperator("Consumer", RabbitMQInputOperator.class);
        addOperator.setWindowDataManager(new FSWindowDataManager());
        final CollectorModule addOperator2 = dag.addOperator("Collector", new CollectorModule());
        addOperator.setHost("localhost");
        addOperator.setExchange("testEx");
        addOperator.setExchangeType("fanout");
        final RabbitMQMessageGenerator rabbitMQMessageGenerator = new RabbitMQMessageGenerator();
        rabbitMQMessageGenerator.setup();
        dag.addStream("Stream", addOperator.outputPort, addOperator2.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        final LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        new Thread("LocalClusterController") { // from class: com.datatorrent.contrib.rabbitmq.RabbitMQInputOperatorTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                while (true) {
                    try {
                        try {
                            DefaultInputPort defaultInputPort = addOperator2.inputPort;
                            if (CollectorModule.CollectorInputPort.collections.containsKey("collector") || System.currentTimeMillis() - currentTimeMillis >= 100000) {
                                break;
                            } else {
                                Thread.sleep(500L);
                            }
                        } catch (IOException e) {
                            RabbitMQInputOperatorTest.logger.error(e.getMessage(), e);
                            DTThrowable.rethrow(e);
                            controller.shutdown();
                            return;
                        } catch (InterruptedException e2) {
                            DTThrowable.rethrow(e2);
                            controller.shutdown();
                            return;
                        }
                    } catch (Throwable th) {
                        controller.shutdown();
                        throw th;
                    }
                }
                rabbitMQMessageGenerator.generateMessages(i);
                long currentTimeMillis2 = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis2 < 100000) {
                    DefaultInputPort defaultInputPort2 = addOperator2.inputPort;
                    if (CollectorModule.CollectorInputPort.collections.get("collector").size() >= i * 3) {
                        break;
                    } else {
                        Thread.sleep(10L);
                    }
                }
                controller.shutdown();
            }
        }.start();
        controller.run();
        Logger logger2 = logger;
        DefaultInputPort defaultInputPort = addOperator2.inputPort;
        Integer valueOf = Integer.valueOf(CollectorModule.CollectorInputPort.collections.size());
        DefaultInputPort defaultInputPort2 = addOperator2.inputPort;
        logger2.debug("collection size: {} {}", valueOf, CollectorModule.CollectorInputPort.collections);
        DefaultInputPort defaultInputPort3 = addOperator2.inputPort;
        MessageQueueTestHelper.validateResults(i, CollectorModule.CollectorInputPort.collections);
    }

    @Test
    public void testRecoveryAndIdempotency() throws Exception {
        RabbitMQInputOperator rabbitMQInputOperator = new RabbitMQInputOperator();
        rabbitMQInputOperator.setWindowDataManager(new FSWindowDataManager());
        rabbitMQInputOperator.setHost("localhost");
        rabbitMQInputOperator.setExchange("testEx");
        rabbitMQInputOperator.setExchangeType("fanout");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        rabbitMQInputOperator.outputPort.setSink(collectorTestSink);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(1, defaultAttributeMap);
        rabbitMQInputOperator.setup(mockOperatorContext);
        rabbitMQInputOperator.activate(mockOperatorContext);
        RabbitMQMessageGenerator rabbitMQMessageGenerator = new RabbitMQMessageGenerator();
        rabbitMQMessageGenerator.setup();
        rabbitMQMessageGenerator.generateMessages(5);
        Thread.sleep(10000L);
        rabbitMQInputOperator.beginWindow(1L);
        rabbitMQInputOperator.emitTuples();
        rabbitMQInputOperator.endWindow();
        rabbitMQInputOperator.deactivate();
        Assert.assertEquals("num of messages in window 1", 15L, collectorTestSink.collectedTuples.size());
        collectorTestSink.collectedTuples.clear();
        rabbitMQInputOperator.setup(mockOperatorContext);
        rabbitMQInputOperator.activate(mockOperatorContext);
        Assert.assertEquals("largest recovery window", 1L, rabbitMQInputOperator.getWindowDataManager().getLargestCompletedWindow());
        rabbitMQInputOperator.beginWindow(1L);
        rabbitMQInputOperator.endWindow();
        Assert.assertEquals("num of messages in window 1", 15L, collectorTestSink.collectedTuples.size());
        collectorTestSink.collectedTuples.clear();
        rabbitMQInputOperator.deactivate();
        rabbitMQInputOperator.teardown();
        rabbitMQInputOperator.getWindowDataManager().committed(1L);
        rabbitMQMessageGenerator.teardown();
    }
}
