package com.datatorrent.contrib.rabbitmq;

import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.contrib.helper.SourceModule;
import com.datatorrent.contrib.memcache.MemcachePOJOOperatorTest;
import com.datatorrent.lib.io.IdempotentStorageManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.class */
public class RabbitMQOutputOperatorTest {
    private static Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperatorTest.class);

    /* loaded from: input_file:com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest$RabbitMQMessageReceiver.class */
    public class RabbitMQMessageReceiver {
        String cTag;
        public HashMap<String, Integer> dataMap = new HashMap<>();
        public int count = 0;
        private final String host = "localhost";
        ConnectionFactory connFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        TracingConsumer tracingConsumer = null;
        String queueName = "testQ";
        String exchange = "testEx";

        /* loaded from: input_file:com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest$RabbitMQMessageReceiver$TracingConsumer.class */
        public class TracingConsumer extends DefaultConsumer {
            public TracingConsumer(Channel channel) {
                super(channel);
            }

            public void handleConsumeOk(String str) {
                RabbitMQOutputOperatorTest.logger.debug(this + ".handleConsumeOk(" + str + ")");
                super.handleConsumeOk(str);
            }

            public void handleCancelOk(String str) {
                RabbitMQOutputOperatorTest.logger.debug(this + ".handleCancelOk(" + str + ")");
                super.handleCancelOk(str);
            }

            public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
                RabbitMQOutputOperatorTest.logger.debug(this + ".handleShutdownSignal(" + str + ", " + shutdownSignalException + ")");
                super.handleShutdownSignal(str, shutdownSignalException);
            }

            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                String str2 = new String(bArr);
                if (str2.indexOf("{") == -1) {
                    return;
                }
                int indexOf = str2.indexOf(61);
                RabbitMQMessageReceiver.this.dataMap.put(str2.substring(1, indexOf), Integer.valueOf(Integer.parseInt(str2.substring(indexOf + 1, str2.length() - 1))));
                RabbitMQMessageReceiver.this.count++;
            }
        }

        public RabbitMQMessageReceiver() {
        }

        public void setup() throws IOException {
            RabbitMQOutputOperatorTest.logger.debug("setting up receiver..");
            this.connFactory.setHost("localhost");
            this.connection = this.connFactory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.exchangeDeclare(this.exchange, "fanout");
            this.queueName = this.channel.queueDeclare().getQueue();
            this.channel.queueBind(this.queueName, this.exchange, "");
            this.tracingConsumer = new TracingConsumer(this.channel);
            this.cTag = this.channel.basicConsume(this.queueName, true, this.tracingConsumer);
        }

        public String getQueueName() {
            return this.queueName;
        }

        public void teardown() throws IOException {
            this.channel.close();
            this.connection.close();
        }
    }

    @Test
    public void testDag() throws InterruptedException, MalformedURLException, IOException, Exception {
        runTest(3);
        logger.debug("end of test");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runTest(int i) throws IOException {
        RabbitMQMessageReceiver rabbitMQMessageReceiver = new RabbitMQMessageReceiver();
        rabbitMQMessageReceiver.setup();
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        SourceModule addOperator = dag.addOperator("source", new SourceModule());
        addOperator.setTestNum(i);
        RabbitMQOutputOperator addOperator2 = dag.addOperator("generator", new RabbitMQOutputOperator());
        addOperator2.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
        addOperator2.setExchange("testEx");
        dag.addStream("Stream", addOperator.outPort, addOperator2.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        controller.runAsync();
        try {
            try {
                Thread.sleep(1000L);
                long currentTimeMillis = System.currentTimeMillis();
                while (rabbitMQMessageReceiver.count < i * 3 && System.currentTimeMillis() - currentTimeMillis < 10000) {
                    Thread.sleep(100L);
                }
            } catch (InterruptedException e) {
                Assert.fail(e.getMessage());
                controller.shutdown();
            }
            Assert.assertEquals("emitted value for testNum was ", i * 3, rabbitMQMessageReceiver.count);
            for (Map.Entry<String, Integer> entry : rabbitMQMessageReceiver.dataMap.entrySet()) {
                if (entry.getKey().equals("a")) {
                    Assert.assertEquals("emitted value for 'a' was ", new Integer(2), entry.getValue());
                } else if (entry.getKey().equals("b")) {
                    Assert.assertEquals("emitted value for 'b' was ", new Integer(20), entry.getValue());
                } else if (entry.getKey().equals("c")) {
                    Assert.assertEquals("emitted value for 'c' was ", new Integer(MemcachePOJOOperatorTest.TUPLE_SIZE), entry.getValue());
                }
            }
        } finally {
            controller.shutdown();
        }
    }
}
