package com.datatorrent.contrib.mqtt;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/mqtt/MqttOutputOperatorTest.class */
public class MqttOutputOperatorTest {
    private static final Logger logger = LoggerFactory.getLogger(MqttOutputOperatorTest.class);
    private static final Map<String, String> sendingData = new HashMap();
    private static final Map<String, String> receivedData = new HashMap();
    static int sentTuples = 0;
    static final int totalTuples = 9;

    /* loaded from: input_file:com/datatorrent/contrib/mqtt/MqttOutputOperatorTest$SourceModule.class */
    public static class SourceModule extends BaseOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext> {
        public final transient DefaultOutputPort<Map<String, String>> outPort = new DefaultOutputPort<>();
        static transient ArrayBlockingQueue<Map<String, String>> holdingBuffer;

        public void setup(Context.OperatorContext operatorContext) {
            holdingBuffer = new ArrayBlockingQueue<>(1048576);
        }

        public void emitTuple(Map<String, String> map) {
            this.outPort.emit(map);
        }

        public void emitTuples() {
            int size = holdingBuffer.size();
            while (true) {
                int i = size;
                size--;
                if (i <= 0) {
                    return;
                } else {
                    emitTuple(holdingBuffer.poll());
                }
            }
        }

        public void activate(Context.OperatorContext operatorContext) {
            for (Map.Entry entry : MqttOutputOperatorTest.sendingData.entrySet()) {
                HashMap hashMap = new HashMap();
                hashMap.put(entry.getKey(), entry.getValue());
                holdingBuffer.add(hashMap);
            }
        }

        public void deactivate() {
        }

        public void replayTuples(long j) {
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/mqtt/MqttOutputOperatorTest$TestMqttOutputOperator.class */
    private static final class TestMqttOutputOperator extends AbstractSinglePortMqttOutputOperator<Map<String, String>> {

        /* loaded from: input_file:com/datatorrent/contrib/mqtt/MqttOutputOperatorTest$TestMqttOutputOperator$GetDataThread.class */
        public class GetDataThread extends Thread {
            public GetDataThread() {
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    int i = 0;
                    Topic[] topicArr = new Topic[MqttOutputOperatorTest.sendingData.size()];
                    Iterator it = MqttOutputOperatorTest.sendingData.keySet().iterator();
                    while (it.hasNext()) {
                        int i2 = i;
                        i++;
                        topicArr[i2] = new Topic((String) it.next(), QoS.AT_MOST_ONCE);
                    }
                    TestMqttOutputOperator.this.connection.subscribe(topicArr);
                    while (MqttOutputOperatorTest.receivedData.size() < MqttOutputOperatorTest.sendingData.size()) {
                        Message receive = TestMqttOutputOperator.this.connection.receive();
                        MqttOutputOperatorTest.receivedData.put(receive.getTopic(), new String(receive.getPayload()));
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

        private TestMqttOutputOperator() {
        }

        public void processTuple(Map<String, String> map) {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                try {
                    this.connection.publish(entry.getKey(), entry.getValue().getBytes(), QoS.AT_LEAST_ONCE, false);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Test
    public void testDag() throws Exception {
        MqttClientConfig mqttClientConfig = new MqttClientConfig();
        mqttClientConfig.setHost("localhost");
        mqttClientConfig.setPort(1883);
        mqttClientConfig.setCleanSession(true);
        sendingData.put("testa", "2");
        sendingData.put("testb", "20");
        sendingData.put("testc", "1000");
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        SourceModule addOperator = dag.addOperator("source", SourceModule.class);
        TestMqttOutputOperator addOperator2 = dag.addOperator("producer", new TestMqttOutputOperator());
        addOperator2.setMqttClientConfig(mqttClientConfig);
        dag.addStream("Stream", addOperator.outPort, addOperator2.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        addOperator2.getClass();
        TestMqttOutputOperator.GetDataThread getDataThread = new TestMqttOutputOperator.GetDataThread();
        addOperator2.setup(null);
        getDataThread.start();
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
        }
        controller.shutdown();
        Assert.assertEquals("emitted value for testNum was ", 3L, receivedData.size());
        for (Map.Entry<String, String> entry : receivedData.entrySet()) {
            if (entry.getKey().equals("testa")) {
                Assert.assertEquals("emitted value for 'testa' was ", "2", entry.getValue());
            } else if (entry.getKey().equals("testb")) {
                Assert.assertEquals("emitted value for 'testb' was ", "20", entry.getValue());
            } else if (entry.getKey().equals("testc")) {
                Assert.assertEquals("emitted value for 'testc' was ", "1000", entry.getValue());
            }
        }
        logger.debug("end of test");
    }
}
