package io.scalecube.transport;

import io.scalecube.ThreadFactoryBuilder;
import io.scalecube.Throwables;
import io.scalecube.testlib.BaseTest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;

/* loaded from: input_file:io/scalecube/transport/TransportSendOrderTest.class */
public class TransportSendOrderTest extends BaseTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransportSendOrderTest.class);
    private Transport client;
    private Transport server;

    @After
    public void tearDown() throws Exception {
        TransportTestUtils.destroyTransport(this.client);
        TransportTestUtils.destroyTransport(this.server);
    }

    @Test
    public void testSendOrderSingleThreadWithoutPromises() throws Exception {
        this.server = TransportTestUtils.createTransport();
        long[] jArr = new long[11 - 1];
        for (int i = 0; i < 11; i++) {
            LOGGER.info("####### {} : iteration = {}", this.testName.getMethodName(), Integer.valueOf(i));
            this.client = TransportTestUtils.createTransport();
            ArrayList arrayList = new ArrayList();
            CountDownLatch countDownLatch = new CountDownLatch(1000);
            Disposable subscribe = this.server.listen().subscribe(message -> {
                arrayList.add(message);
                countDownLatch.countDown();
            });
            long currentTimeMillis = System.currentTimeMillis();
            for (int i2 = 0; i2 < 1000; i2++) {
                this.client.send(this.server.address(), Message.fromQualifier("q" + i2));
            }
            countDownLatch.await(20L, TimeUnit.SECONDS);
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (i > 0) {
                jArr[i - 1] = currentTimeMillis2;
            }
            assertSendOrder(1000, arrayList);
            LOGGER.info("Iteration time: {} ms", Long.valueOf(currentTimeMillis2));
            subscribe.dispose();
            TransportTestUtils.destroyTransport(this.client);
        }
        LOGGER.info("Iteration time stats (ms): {}", LongStream.of(jArr).summaryStatistics());
    }

    @Test
    public void testSendOrderSingleThread() throws Exception {
        this.server = TransportTestUtils.createTransport();
        long[] jArr = new long[11 - 1];
        ArrayList arrayList = new ArrayList(1000 * (11 - 1));
        for (int i = 0; i < 11; i++) {
            LOGGER.info("####### {} : iteration = {}", this.testName.getMethodName(), Integer.valueOf(i));
            ArrayList arrayList2 = new ArrayList(1000);
            this.client = TransportTestUtils.createTransport();
            ArrayList arrayList3 = new ArrayList();
            CountDownLatch countDownLatch = new CountDownLatch(1000);
            Disposable subscribe = this.server.listen().subscribe(message -> {
                arrayList3.add(message);
                countDownLatch.countDown();
            });
            long currentTimeMillis = System.currentTimeMillis();
            for (int i2 = 0; i2 < 1000; i2++) {
                CompletableFuture completableFuture = new CompletableFuture();
                long currentTimeMillis2 = System.currentTimeMillis();
                this.client.send(this.server.address(), Message.fromQualifier("q" + i2), completableFuture);
                completableFuture.whenComplete((r10, th) -> {
                    if (th == null) {
                        arrayList2.add(Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                    } else {
                        LOGGER.error("Failed to send message in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2), th);
                    }
                });
            }
            countDownLatch.await(20L, TimeUnit.SECONDS);
            long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
            if (i > 0) {
                jArr[i - 1] = currentTimeMillis3;
            }
            assertSendOrder(1000, arrayList3);
            Thread.sleep(10L);
            LongSummaryStatistics summaryStatistics = arrayList2.stream().mapToLong(l -> {
                return l.longValue();
            }).summaryStatistics();
            if (i == 0) {
                LOGGER.info("Warm up iteration time: {} ms", Long.valueOf(currentTimeMillis3));
                LOGGER.info("Sent time stats warm up iter (ms): {}", summaryStatistics);
            } else {
                arrayList.addAll(arrayList2);
                LongSummaryStatistics summaryStatistics2 = arrayList.stream().mapToLong(l2 -> {
                    return l2.longValue();
                }).summaryStatistics();
                LOGGER.info("Iteration time: {} ms", Long.valueOf(currentTimeMillis3));
                LOGGER.info("Sent time stats iter  (ms): {}", summaryStatistics);
                LOGGER.info("Sent time stats total (ms): {}", summaryStatistics2);
            }
            subscribe.dispose();
            TransportTestUtils.destroyTransport(this.client);
        }
        LOGGER.info("Iteration time stats (ms): {}", LongStream.of(jArr).summaryStatistics());
    }

    @Test
    public void testSendOrderMultiThread() throws Exception {
        Transport createTransport = TransportTestUtils.createTransport();
        for (int i = 0; i < 10; i++) {
            LOGGER.info("####### {} : iteration = {}", this.testName.getMethodName(), Integer.valueOf(i));
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4, new ThreadFactoryBuilder().setDaemon(true).build());
            Transport createTransport2 = TransportTestUtils.createTransport();
            ArrayList arrayList = new ArrayList();
            CountDownLatch countDownLatch = new CountDownLatch(4000);
            createTransport.listen().subscribe(message -> {
                arrayList.add(message);
                countDownLatch.countDown();
            });
            Future submit = newFixedThreadPool.submit(sender(0, createTransport2, createTransport.address(), 1000));
            Future submit2 = newFixedThreadPool.submit(sender(1, createTransport2, createTransport.address(), 1000));
            Future submit3 = newFixedThreadPool.submit(sender(2, createTransport2, createTransport.address(), 1000));
            Future submit4 = newFixedThreadPool.submit(sender(3, createTransport2, createTransport.address(), 1000));
            countDownLatch.await(20L, TimeUnit.SECONDS);
            submit.get(1L, TimeUnit.SECONDS);
            submit2.get(1L, TimeUnit.SECONDS);
            submit3.get(1L, TimeUnit.SECONDS);
            submit4.get(1L, TimeUnit.SECONDS);
            newFixedThreadPool.shutdownNow();
            assertSenderOrder(0, 1000, arrayList);
            assertSenderOrder(1, 1000, arrayList);
            assertSenderOrder(2, 1000, arrayList);
            assertSenderOrder(3, 1000, arrayList);
            TransportTestUtils.destroyTransport(createTransport2);
        }
        TransportTestUtils.destroyTransport(this.client);
        TransportTestUtils.destroyTransport(createTransport);
    }

    private void assertSendOrder(int i, List<Message> list) {
        ArrayList arrayList = new ArrayList(list);
        Assert.assertEquals(i, arrayList.size());
        for (int i2 = 0; i2 < i; i2++) {
            Assert.assertEquals("q" + i2, ((Message) arrayList.get(i2)).qualifier());
        }
    }

    private Callable<Void> sender(int i, Transport transport, Address address, int i2) {
        return () -> {
            for (int i3 = 0; i3 < i2; i3++) {
                CompletableFuture completableFuture = new CompletableFuture();
                transport.send(address, Message.withQualifier("q").correlationId(i + "/" + i3).build(), completableFuture);
                try {
                    completableFuture.get(3L, TimeUnit.SECONDS);
                } catch (Exception e) {
                    LOGGER.error("Failed to send message: j = {} id = {}", new Object[]{Integer.valueOf(i3), Integer.valueOf(i), e});
                    Throwables.propagate(e);
                }
            }
            return null;
        };
    }

    private void assertSenderOrder(int i, int i2, List<Message> list) {
        ArrayList arrayList = new ArrayList(list);
        HashMap hashMap = new HashMap();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Message message = (Message) it.next();
            ((List) hashMap.computeIfAbsent(Integer.valueOf(message.correlationId().split("/")[0]), (v1) -> {
                return new ArrayList(v1);
            })).add(message);
        }
        Assert.assertEquals(i2, ((List) hashMap.get(Integer.valueOf(i))).size());
        for (int i3 = 0; i3 < i2; i3++) {
            Assert.assertEquals(i + "/" + i3, ((Message) ((List) hashMap.get(Integer.valueOf(i))).get(i3)).correlationId());
        }
    }
}
