package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
import com.datatorrent.lib.testbench.CollectorTestSink;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/lib/io/PubSubWebSocketOperatorTest.class */
public class PubSubWebSocketOperatorTest {
    @Test
    public void testPubSubWebSocket() throws Exception {
        Server server = new Server(0);
        SamplePubSubWebSocketServlet samplePubSubWebSocketServlet = new SamplePubSubWebSocketServlet();
        ServletHolder servletHolder = new ServletHolder(samplePubSubWebSocketServlet);
        ServletContextHandler servletContextHandler = new ServletContextHandler(server, "/", 1);
        servletContextHandler.addServlet(servletHolder, "/pubsub");
        servletContextHandler.addServlet(servletHolder, "/*");
        server.start();
        URI create = URI.create("ws://localhost:" + server.getConnectors()[0].getLocalPort() + "/pubsub");
        PubSubWebSocketOutputOperator pubSubWebSocketOutputOperator = new PubSubWebSocketOutputOperator();
        pubSubWebSocketOutputOperator.setUri(create);
        pubSubWebSocketOutputOperator.setTopic("testTopic");
        PubSubWebSocketInputOperator pubSubWebSocketInputOperator = new PubSubWebSocketInputOperator();
        pubSubWebSocketInputOperator.setUri(create);
        pubSubWebSocketInputOperator.setTopic("testTopic");
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        pubSubWebSocketInputOperator.outputPort.setSink(collectorTestSink);
        pubSubWebSocketInputOperator.setup((Context.OperatorContext) null);
        pubSubWebSocketOutputOperator.setup((Context.OperatorContext) null);
        pubSubWebSocketInputOperator.activate((Context.OperatorContext) null);
        long currentTimeMillis = System.currentTimeMillis() + 3000;
        while (!samplePubSubWebSocketServlet.hasSubscriber()) {
            Thread.sleep(10L);
            if (System.currentTimeMillis() > currentTimeMillis) {
                throw new TimeoutException("No subscribers connected after 3 seconds");
            }
        }
        pubSubWebSocketInputOperator.beginWindow(1000L);
        pubSubWebSocketOutputOperator.beginWindow(1000L);
        HashMap hashMap = new HashMap();
        hashMap.put("hello", "world");
        pubSubWebSocketOutputOperator.input.process(hashMap);
        pubSubWebSocketOutputOperator.input.process("StringMessage");
        int i = 2000;
        while (collectorTestSink.collectedTuples.size() < 2 && i > 0) {
            pubSubWebSocketInputOperator.emitTuples();
            i -= 20;
            Thread.sleep(20L);
        }
        pubSubWebSocketOutputOperator.endWindow();
        pubSubWebSocketInputOperator.endWindow();
        Assert.assertTrue("tuples emitted", collectorTestSink.collectedTuples.size() > 1);
        Assert.assertEquals("Expects {\"hello\":\"world\"} as data", "world", ((Map) collectorTestSink.collectedTuples.get(0)).get("hello"));
        Assert.assertEquals("Expects {\"hello\":\"world\"} as data", "StringMessage", (String) collectorTestSink.collectedTuples.get(1));
        pubSubWebSocketInputOperator.deactivate();
        pubSubWebSocketOutputOperator.teardown();
        pubSubWebSocketInputOperator.teardown();
        server.stop();
    }
}
