package com.datatorrent.lib.io;

import com.datatorrent.lib.testbench.CollectorTestSink;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/SocketInputOperatorTest.class */
public class SocketInputOperatorTest {
    private StringBuffer strBuffer = new StringBuffer();
    private static String testData = "src/test/resources/SocketInputOperatorTest.txt";
    private static final Logger LOG = LoggerFactory.getLogger(SocketInputOperatorTest.class);

    /* loaded from: input_file:com/datatorrent/lib/io/SocketInputOperatorTest$Server.class */
    public class Server implements Runnable {
        private int serverPort;

        Server(int i) {
            this.serverPort = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(SocketInputOperatorTest.testData)));
                ServerSocketChannel open = ServerSocketChannel.open();
                open.socket().bind(new InetSocketAddress(this.serverPort));
                while (true) {
                    SocketChannel accept = open.accept();
                    for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                        SocketInputOperatorTest.this.strBuffer.append(readLine);
                        ByteBuffer wrap = ByteBuffer.wrap(readLine.getBytes());
                        while (wrap.hasRemaining()) {
                            accept.write(wrap);
                        }
                    }
                    bufferedReader.close();
                    accept.close();
                }
            } catch (Exception e) {
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/SocketInputOperatorTest$TestSocketInputOperator.class */
    public class TestSocketInputOperator extends AbstractSocketInputOperator<String> {
        public TestSocketInputOperator() {
        }

        public void processBytes(ByteBuffer byteBuffer) {
            byte[] bArr = new byte[byteBuffer.remaining()];
            byteBuffer.duplicate().get(bArr);
            this.outputPort.emit(new String(bArr));
        }
    }

    @Test
    public void Test() {
        try {
            Thread thread = new Thread(new Server(7898));
            thread.start();
            Thread.sleep(1000L);
            TestSocketInputOperator testSocketInputOperator = new TestSocketInputOperator();
            testSocketInputOperator.setHostname("localhost");
            testSocketInputOperator.setPort(7898);
            testSocketInputOperator.setScanIntervalInMilliSeconds(10);
            CollectorTestSink collectorTestSink = new CollectorTestSink();
            testSocketInputOperator.outputPort.setSink(collectorTestSink);
            testSocketInputOperator.setup(null);
            testSocketInputOperator.activate(null);
            testSocketInputOperator.beginWindow(0L);
            Thread.sleep(1000L);
            testSocketInputOperator.emitTuples();
            Thread.sleep(1000L);
            testSocketInputOperator.emitTuples();
            testSocketInputOperator.endWindow();
            testSocketInputOperator.deactivate();
            testSocketInputOperator.teardown();
            String str = (String) collectorTestSink.collectedTuples.get(0);
            Assert.assertEquals(this.strBuffer.substring(0, str.length()), collectorTestSink.collectedTuples.get(0));
            int length = str.length();
            Assert.assertEquals(this.strBuffer.substring(length, length + ((String) collectorTestSink.collectedTuples.get(1)).length()), collectorTestSink.collectedTuples.get(1));
            thread.interrupt();
            thread.join();
            Thread.sleep(1000L);
        } catch (Exception e) {
            LOG.debug("exception", e);
        }
    }

    @Test
    public void TestWithSmallerBufferSize() {
        try {
            Thread thread = new Thread(new Server(7899));
            thread.start();
            Thread.sleep(1000L);
            TestSocketInputOperator testSocketInputOperator = new TestSocketInputOperator();
            testSocketInputOperator.setHostname("localhost");
            testSocketInputOperator.setPort(7899);
            testSocketInputOperator.setScanIntervalInMilliSeconds(10);
            testSocketInputOperator.setByteBufferSize(10);
            CollectorTestSink collectorTestSink = new CollectorTestSink();
            testSocketInputOperator.outputPort.setSink(collectorTestSink);
            testSocketInputOperator.setup(null);
            testSocketInputOperator.activate(null);
            testSocketInputOperator.beginWindow(0L);
            Thread.sleep(1000L);
            for (int i = 0; i < 10; i++) {
                testSocketInputOperator.emitTuples();
                Thread.sleep(1000L);
            }
            testSocketInputOperator.endWindow();
            testSocketInputOperator.deactivate();
            testSocketInputOperator.teardown();
            Assert.assertEquals(10L, collectorTestSink.collectedTuples.size());
            int i2 = 0;
            int i3 = 0;
            for (int i4 = 0; i4 < 10; i4++) {
                i2 += ((String) collectorTestSink.collectedTuples.get(i4)).length();
                Assert.assertEquals(this.strBuffer.substring(i3, i2), collectorTestSink.collectedTuples.get(i4));
                i3 = i2;
            }
            thread.interrupt();
            thread.join();
            Thread.sleep(1000L);
        } catch (Exception e) {
            LOG.debug("exception", e);
        }
    }
}
