package com.datatorrent.contrib.mqtt;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.aerospike.AerospikeTestUtils;
import com.datatorrent.lib.util.KeyValPair;
import java.util.HashMap;
import java.util.Map;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/mqtt/MqttInputOperatorTest.class */
public class MqttInputOperatorTest {
    private static final Logger logger = LoggerFactory.getLogger(MqttInputOperatorTest.class);
    private static HashMap<String, Object> resultMap = new HashMap<>();
    private static int resultCount = 0;
    private static boolean activated = false;

    /* loaded from: input_file:com/datatorrent/contrib/mqtt/MqttInputOperatorTest$CollectorModule.class */
    public static class CollectorModule<T> extends BaseOperator {
        public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() { // from class: com.datatorrent.contrib.mqtt.MqttInputOperatorTest.CollectorModule.1
            public void process(T t) {
                KeyValPair keyValPair = (KeyValPair) t;
                MqttInputOperatorTest.resultMap.put(keyValPair.getKey(), keyValPair.getValue());
                MqttInputOperatorTest.access$208();
            }
        };
    }

    /* loaded from: input_file:com/datatorrent/contrib/mqtt/MqttInputOperatorTest$TestMqttInputOperator.class */
    public static class TestMqttInputOperator extends AbstractSinglePortMqttInputOperator<KeyValPair<String, String>> {
        /* renamed from: getTuple, reason: merged with bridge method [inline-methods] */
        public KeyValPair<String, String> m55getTuple(Message message) {
            return new KeyValPair<>(message.getTopic(), new String(message.getPayload()));
        }

        public void generateData() throws Exception {
            HashMap hashMap = new HashMap();
            hashMap.put("a", 10);
            hashMap.put("b", 200);
            hashMap.put("c", Integer.valueOf(AerospikeTestUtils.PORT));
            System.out.println("Data generator map:" + hashMap.toString());
            for (Map.Entry entry : hashMap.entrySet()) {
                this.connection.publish((String) entry.getKey(), ((Integer) entry.getValue()).toString().getBytes(), QoS.AT_MOST_ONCE, false);
            }
        }

        public void activate(Context.OperatorContext operatorContext) {
            super.activate(operatorContext);
            boolean unused = MqttInputOperatorTest.activated = true;
        }
    }

    @Test
    public void testInputOperator() throws InterruptedException, Exception {
        MqttClientConfig mqttClientConfig = new MqttClientConfig();
        mqttClientConfig.setHost("localhost");
        mqttClientConfig.setPort(1883);
        mqttClientConfig.setCleanSession(true);
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        TestMqttInputOperator addOperator = dag.addOperator("input", TestMqttInputOperator.class);
        CollectorModule addOperator2 = dag.addOperator("collector", new CollectorModule());
        addOperator.addSubscribeTopic("a", QoS.AT_MOST_ONCE);
        addOperator.addSubscribeTopic("b", QoS.AT_MOST_ONCE);
        addOperator.addSubscribeTopic("c", QoS.AT_MOST_ONCE);
        addOperator.setMqttClientConfig(mqttClientConfig);
        addOperator.setup(null);
        dag.addStream("stream", addOperator.outputPort, addOperator2.inputPort);
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        long currentTimeMillis = System.currentTimeMillis() + 3000;
        while (!activated) {
            Assert.assertTrue("Activation timeout", currentTimeMillis > System.currentTimeMillis());
            Thread.sleep(1000L);
        }
        addOperator.activate((Context.OperatorContext) null);
        addOperator.generateData();
        long currentTimeMillis2 = System.currentTimeMillis() + 5000;
        while (resultCount == 0) {
            try {
                Thread.sleep(10L);
                Assert.assertTrue("timeout without getting any data", System.currentTimeMillis() < currentTimeMillis2);
            } catch (InterruptedException e) {
            }
        }
        controller.shutdown();
        Assert.assertEquals("Number of emitted tuples", 3L, resultMap.size());
        Assert.assertEquals("value of a is ", "10", resultMap.get("a"));
        Assert.assertEquals("value of b is ", "200", resultMap.get("b"));
        Assert.assertEquals("value of c is ", "3000", resultMap.get("c"));
        System.out.println("resultCount:" + resultCount);
    }

    static /* synthetic */ int access$208() {
        int i = resultCount;
        resultCount = i + 1;
        return i;
    }
}
