package io.scalecube.transport;

import io.scalecube.testlib.BaseTest;
import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* loaded from: input_file:io/scalecube/transport/TransportTest.class */
public class TransportTest extends BaseTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransportTest.class);
    private Transport client;
    private Transport server;

    @After
    public void tearDown() throws Exception {
        TransportTestUtils.destroyTransport(this.client);
        TransportTestUtils.destroyTransport(this.server);
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInvalidListenConfig() {
        Transport transport = null;
        try {
            transport = Transport.bindAwait(TransportConfig.builder().listenInterface("eth0").listenAddress("10.10.10.10").build());
            TransportTestUtils.destroyTransport(transport);
        } catch (Throwable th) {
            TransportTestUtils.destroyTransport(transport);
            throw th;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInvalidListenInterface() {
        Transport transport = null;
        try {
            transport = Transport.bindAwait(TransportConfig.builder().listenInterface("yadayada").build());
            TransportTestUtils.destroyTransport(transport);
        } catch (Throwable th) {
            TransportTestUtils.destroyTransport(transport);
            throw th;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInvalidListenAddress() {
        Transport transport = null;
        try {
            transport = Transport.bindAwait(TransportConfig.builder().listenAddress("0.0.0.0").build());
            TransportTestUtils.destroyTransport(transport);
        } catch (Throwable th) {
            TransportTestUtils.destroyTransport(transport);
            throw th;
        }
    }

    @Test
    public void testPortAutoIncrementRaceConditions() throws Exception {
        TransportConfig build = TransportConfig.builder().port(6000).portAutoIncrement(true).portCount(30).build();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 30; i++) {
            newFixedThreadPool.execute(() -> {
            });
        }
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination(60L, TimeUnit.SECONDS);
        try {
            CompletableFuture.allOf((CompletableFuture[]) concurrentHashMap.keySet().toArray(new CompletableFuture[concurrentHashMap.size()])).get(60L, TimeUnit.SECONDS);
            for (CompletableFuture completableFuture : concurrentHashMap.keySet()) {
                if (completableFuture.isDone()) {
                    TransportTestUtils.destroyTransport((Transport) completableFuture.get());
                }
            }
        } catch (Throwable th) {
            for (CompletableFuture completableFuture2 : concurrentHashMap.keySet()) {
                if (completableFuture2.isDone()) {
                    TransportTestUtils.destroyTransport((Transport) completableFuture2.get());
                }
            }
            throw th;
        }
    }

    @Test
    public void testBindExceptionWithoutPortAutoIncrement() throws Exception {
        TransportConfig build = TransportConfig.builder().port(6000).portAutoIncrement(false).portCount(100).build();
        Transport transport = null;
        Transport transport2 = null;
        try {
            try {
                transport = Transport.bindAwait(build);
                transport2 = Transport.bindAwait(build);
                Assert.fail("Didn't get expected bind exception");
                TransportTestUtils.destroyTransport(transport);
                TransportTestUtils.destroyTransport(transport2);
            } catch (Throwable th) {
                Assert.assertTrue((th instanceof BindException) || th.getMessage().contains("Address already in use"));
                TransportTestUtils.destroyTransport(transport);
                TransportTestUtils.destroyTransport(transport2);
            }
        } catch (Throwable th2) {
            TransportTestUtils.destroyTransport(transport);
            TransportTestUtils.destroyTransport(transport2);
            throw th2;
        }
    }

    @Test
    public void testNoBindExceptionWithPortAutoIncrement() throws Exception {
        TransportConfig build = TransportConfig.builder().port(6000).portAutoIncrement(true).portCount(100).build();
        Transport transport = null;
        Transport transport2 = null;
        try {
            transport = Transport.bindAwait(build);
            transport2 = Transport.bindAwait(build);
            TransportTestUtils.destroyTransport(transport);
            TransportTestUtils.destroyTransport(transport2);
        } catch (Throwable th) {
            TransportTestUtils.destroyTransport(transport);
            TransportTestUtils.destroyTransport(transport2);
            throw th;
        }
    }

    @Test
    public void testNoBindExceptionWithPortAutoIncrementWithHalfClosedSocket() throws Exception {
        ServerSocket serverSocket = new ServerSocket(6000);
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    serverSocket.accept().close();
                } catch (Exception e) {
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
        Socket socket = new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
        Transport transport = null;
        try {
            transport = Transport.bindAwait(TransportConfig.builder().port(socket.getLocalPort()).portAutoIncrement(true).portCount(100).build());
            TransportTestUtils.destroyTransport(transport);
            serverSocket.close();
            socket.close();
        } catch (Throwable th) {
            TransportTestUtils.destroyTransport(transport);
            serverSocket.close();
            socket.close();
            throw th;
        }
    }

    @Test
    public void testValidListenAddress() {
        Transport transport = null;
        try {
            transport = Transport.bindAwait(TransportConfig.builder().listenAddress("127.0.0.1").build());
            TransportTestUtils.destroyTransport(transport);
        } catch (Throwable th) {
            TransportTestUtils.destroyTransport(transport);
            throw th;
        }
    }

    @Test
    public void testUnresolvedHostConnection() throws Exception {
        this.client = TransportTestUtils.createTransport();
        CompletableFuture completableFuture = new CompletableFuture();
        this.client.send(Address.from("wronghost:49255"), Message.fromData("q"), completableFuture);
        try {
            completableFuture.get(5L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertNotNull(cause);
            Assert.assertEquals("Unexpected exception class", UnknownHostException.class, cause.getClass());
        }
    }

    @Test
    public void testInteractWithNoConnection() throws Exception {
        Address from = Address.from("localhost:49255");
        for (int i = 0; i < 10; i++) {
            LOGGER.info("####### {} : iteration = {}", this.testName.getMethodName(), Integer.valueOf(i));
            this.client = TransportTestUtils.createTransport();
            CompletableFuture completableFuture = new CompletableFuture();
            this.client.send(from, Message.fromData("q"), completableFuture);
            try {
                completableFuture.get(3L, TimeUnit.SECONDS);
                Assert.fail();
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                Assert.assertNotNull(cause);
                Assert.assertTrue("Unexpected exception type (expects IOException)", cause instanceof IOException);
            }
            CompletableFuture completableFuture2 = new CompletableFuture();
            this.client.send(from, Message.fromData("q"), completableFuture2);
            try {
                completableFuture2.get(3L, TimeUnit.SECONDS);
                Assert.fail();
            } catch (ExecutionException e2) {
                Throwable cause2 = e2.getCause();
                Assert.assertNotNull(cause2);
                Assert.assertTrue("Unexpected exception type (expects IOException)", cause2 instanceof IOException);
            }
            TransportTestUtils.destroyTransport(this.client);
        }
    }

    @Test
    public void testPingPongClientTFListenAndServerTFListen() throws Exception {
        this.client = TransportTestUtils.createTransport();
        this.server = TransportTestUtils.createTransport();
        this.server.listen().subscribe(message -> {
            Address sender = message.sender();
            Assert.assertEquals("Expected clientAddress", this.client.address(), sender);
            TransportTestUtils.send(this.server, sender, Message.fromQualifier("hi client"));
        });
        CompletableFuture completableFuture = new CompletableFuture();
        Flux listen = this.client.listen();
        completableFuture.getClass();
        listen.subscribe((v1) -> {
            r1.complete(v1);
        });
        TransportTestUtils.send(this.client, this.server.address(), Message.fromQualifier("hello server"));
        Message message2 = (Message) completableFuture.get(3L, TimeUnit.SECONDS);
        Assert.assertNotNull("No response from serverAddress", message2);
        Assert.assertEquals("hi client", message2.qualifier());
    }

    @Test
    public void testNetworkSettings() throws InterruptedException {
        this.client = TransportTestUtils.createTransport();
        this.server = TransportTestUtils.createTransport();
        this.client.networkEmulator().setLinkSettings(this.server.address(), 50, 0);
        ArrayList arrayList = new ArrayList();
        Flux listen = this.server.listen();
        arrayList.getClass();
        listen.subscribe((v1) -> {
            r1.add(v1);
        });
        for (int i = 0; i < 1000; i++) {
            this.client.send(this.server.address(), Message.fromData("q" + i));
        }
        Thread.sleep(1000L);
        int i2 = ((1000 / 100) * 50) + ((1000 / 100) * 5);
        int size = arrayList.size();
        Assert.assertTrue("expectedMax=" + i2 + ", actual size=" + size, size < i2);
    }

    @Test
    public void testPingPongOnSingleChannel() throws Exception {
        this.server = TransportTestUtils.createTransport();
        this.client = TransportTestUtils.createTransport();
        this.server.listen().buffer(2).subscribe(list -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Message message = (Message) it.next();
                this.server.send(message.sender(), Message.fromData("echo/" + message.qualifier()));
            }
        });
        CompletableFuture completableFuture = new CompletableFuture();
        Flux buffer = this.client.listen().buffer(2);
        completableFuture.getClass();
        buffer.subscribe((v1) -> {
            r1.complete(v1);
        });
        this.client.send(this.server.address(), Message.fromData("q1"));
        this.client.send(this.server.address(), Message.fromData("q2"));
        Assert.assertNotNull((List) completableFuture.get(1L, TimeUnit.SECONDS));
        Assert.assertEquals(2L, r0.size());
    }

    @Test
    public void testPingPongOnSeparateChannel() throws Exception {
        this.server = TransportTestUtils.createTransport();
        this.client = TransportTestUtils.createTransport();
        this.server.listen().buffer(2).subscribe(list -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Message message = (Message) it.next();
                this.server.send(message.sender(), Message.fromData("echo/" + message.qualifier()));
            }
        });
        CompletableFuture completableFuture = new CompletableFuture();
        Flux buffer = this.client.listen().buffer(2);
        completableFuture.getClass();
        buffer.subscribe((v1) -> {
            r1.complete(v1);
        });
        this.client.send(this.server.address(), Message.fromData("q1"));
        this.client.send(this.server.address(), Message.fromData("q2"));
        Assert.assertNotNull((List) completableFuture.get(1L, TimeUnit.SECONDS));
        Assert.assertEquals(2L, r0.size());
    }

    @Test
    public void testCompleteObserver() throws Exception {
        this.server = TransportTestUtils.createTransport();
        this.client = TransportTestUtils.createTransport();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        Flux listen = this.server.listen();
        completableFuture2.getClass();
        listen.subscribe((v1) -> {
            r1.complete(v1);
        }, th -> {
        }, () -> {
            completableFuture.complete(true);
        });
        CompletableFuture completableFuture3 = new CompletableFuture();
        this.client.send(this.server.address(), Message.fromData("q"), completableFuture3);
        completableFuture3.get(1L, TimeUnit.SECONDS);
        Assert.assertNotNull(completableFuture2.get(1L, TimeUnit.SECONDS));
        CompletableFuture completableFuture4 = new CompletableFuture();
        this.server.stop(completableFuture4);
        completableFuture4.get();
        Assert.assertTrue(((Boolean) completableFuture.get(1L, TimeUnit.SECONDS)).booleanValue());
    }

    @Test
    public void testObserverThrowsException() throws Exception {
        this.server = TransportTestUtils.createTransport();
        this.client = TransportTestUtils.createTransport();
        this.server.listen().subscribe(message -> {
            String str = (String) message.data();
            if (str.startsWith("throw")) {
                throw new RuntimeException("" + message);
            }
            if (str.startsWith("q")) {
                this.server.send(message.sender(), Message.fromData("echo/" + message.qualifier()));
            }
        }, (v0) -> {
            v0.printStackTrace();
        });
        CompletableFuture completableFuture = new CompletableFuture();
        Flux listen = this.client.listen();
        completableFuture.getClass();
        listen.subscribe((v1) -> {
            r1.complete(v1);
        });
        this.client.send(this.server.address(), Message.fromData("throw"));
        Message message2 = null;
        try {
            message2 = (Message) completableFuture.get(1L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
        }
        Assert.assertNull(message2);
        CompletableFuture completableFuture2 = new CompletableFuture();
        Flux listen2 = this.client.listen();
        completableFuture2.getClass();
        listen2.subscribe((v1) -> {
            r1.complete(v1);
        });
        this.client.send(this.server.address(), Message.fromData("q"));
        Message message3 = null;
        try {
            message3 = (Message) completableFuture2.get(1L, TimeUnit.SECONDS);
        } catch (TimeoutException e2) {
        }
        Assert.assertNull(message3);
    }

    @Test
    public void testBlockAndUnblockTraffic() throws Exception {
        this.client = TransportTestUtils.createTransport();
        this.server = TransportTestUtils.createTransport();
        this.server.listen().subscribe(message -> {
            this.server.send(message.sender(), message);
        });
        ArrayList arrayList = new ArrayList();
        Flux listen = this.client.listen();
        arrayList.getClass();
        listen.subscribe((v1) -> {
            r1.add(v1);
        });
        TransportTestUtils.send(this.client, this.server.address(), Message.fromQualifier("q/unblocked"));
        Thread.sleep(1000L);
        this.client.networkEmulator().block(new Address[]{this.server.address()});
        TransportTestUtils.send(this.client, this.server.address(), Message.fromQualifier("q/blocked"));
        Thread.sleep(1000L);
        Assert.assertEquals(1L, arrayList.size());
        Assert.assertEquals("q/unblocked", ((Message) arrayList.get(0)).qualifier());
    }
}
