package com.datatorrent.contrib.zmq;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.contrib.helper.CollectorModule;
import com.datatorrent.contrib.helper.MessageQueueTestHelper;
import com.datatorrent.netlet.util.DTThrowable;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.class */
public class ZeroMQInputOperatorTest {
    protected static Logger logger = LoggerFactory.getLogger(ZeroMQInputOperatorTest.class);

    @Test
    public void testDag() throws InterruptedException, Exception {
        testHelper(3);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Type inference failed for: r0v17, types: [com.datatorrent.contrib.zmq.ZeroMQInputOperatorTest$1] */
    /* JADX WARN: Type inference failed for: r0v21, types: [com.datatorrent.contrib.zmq.ZeroMQInputOperatorTest$2] */
    public void testHelper(final int i) {
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        final ZeroMQMessageGenerator zeroMQMessageGenerator = new ZeroMQMessageGenerator();
        zeroMQMessageGenerator.setup();
        ZeroMQInputOperator addOperator = dag.addOperator("Generator", ZeroMQInputOperator.class);
        final CollectorModule addOperator2 = dag.addOperator("Collector", new CollectorModule());
        addOperator.setFilter("");
        addOperator.setUrl("tcp://localhost:5556");
        addOperator.setSyncUrl("tcp://localhost:5557");
        dag.addStream("Stream", addOperator.outputPort, addOperator2.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        new Thread() { // from class: com.datatorrent.contrib.zmq.ZeroMQInputOperatorTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    zeroMQMessageGenerator.generateMessages(i);
                } catch (InterruptedException e) {
                    ZeroMQInputOperatorTest.logger.debug(e.toString());
                }
            }
        }.start();
        final LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        new Thread("LocalClusterController") { // from class: com.datatorrent.contrib.zmq.ZeroMQInputOperatorTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                while (true) {
                    try {
                        try {
                            DefaultInputPort defaultInputPort = addOperator2.inputPort;
                            if (CollectorModule.CollectorInputPort.collections.containsKey("collector") || System.currentTimeMillis() - currentTimeMillis >= 10000) {
                                break;
                            } else {
                                Thread.sleep(500L);
                            }
                        } catch (InterruptedException e) {
                            DTThrowable.rethrow(e);
                            ZeroMQInputOperatorTest.logger.debug("Shutting down..");
                            controller.shutdown();
                            try {
                                try {
                                    Thread.sleep(1000L);
                                    zeroMQMessageGenerator.teardown();
                                    return;
                                } catch (InterruptedException e2) {
                                    DTThrowable.rethrow(e2);
                                    zeroMQMessageGenerator.teardown();
                                    return;
                                }
                            } finally {
                            }
                        }
                    } catch (Throwable th) {
                        ZeroMQInputOperatorTest.logger.debug("Shutting down..");
                        controller.shutdown();
                        try {
                            try {
                                Thread.sleep(1000L);
                                zeroMQMessageGenerator.teardown();
                            } catch (Throwable th2) {
                                zeroMQMessageGenerator.teardown();
                                throw th2;
                            }
                        } catch (InterruptedException e3) {
                            DTThrowable.rethrow(e3);
                            zeroMQMessageGenerator.teardown();
                        }
                        throw th;
                    }
                }
                Thread.sleep(1000L);
                long currentTimeMillis2 = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis2 < 10000) {
                    DefaultInputPort defaultInputPort2 = addOperator2.inputPort;
                    if (CollectorModule.CollectorInputPort.collections.get("collector").size() >= i * 3) {
                        break;
                    } else {
                        Thread.sleep(10L);
                    }
                }
                ZeroMQInputOperatorTest.logger.debug("Shutting down..");
                controller.shutdown();
                try {
                    try {
                        Thread.sleep(1000L);
                        zeroMQMessageGenerator.teardown();
                    } catch (InterruptedException e4) {
                        DTThrowable.rethrow(e4);
                        zeroMQMessageGenerator.teardown();
                    }
                } finally {
                }
            }
        }.start();
        controller.run();
        DefaultInputPort defaultInputPort = addOperator2.inputPort;
        MessageQueueTestHelper.validateResults(i, CollectorModule.CollectorInputPort.collections);
        logger.debug("end of test");
    }
}
