package org.apache.ignite.spi.communication.tcp;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.loadtests.communication.GridIoManagerBenchmark0;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
import org.jsr166.ConcurrentLinkedDeque8;

/* loaded from: input_file:org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.class */
public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstractTest<TcpCommunicationSpi> {
    public static final int IDLE_CONN_TIMEOUT = 2000;
    public static final int THREAD_CNT = 20;
    private AtomicLong msgId;
    private final boolean useShmem;
    private static final Collection<IgniteTestResources> spiRsrcs;
    private static final Map<UUID, CommunicationSpi<Message>> spis;
    private static final Map<UUID, MessageListener> lsnrs;
    private static final List<ClusterNode> nodes;
    private static GridTimeoutProcessor timeoutProcessor;
    private static boolean reject;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest$MessageListener.class */
    private static class MessageListener implements CommunicationListener<Message> {
        private final UUID locNodeId;
        private ConcurrentLinkedDeque8<GridTestMessage> rcvdMsgs = new ConcurrentLinkedDeque8<>();
        private AtomicInteger rmtMsgCnt = new AtomicInteger();
        static final /* synthetic */ boolean $assertionsDisabled;

        MessageListener(UUID uuid) {
            if (!$assertionsDisabled && uuid == null) {
                throw new AssertionError();
            }
            this.locNodeId = uuid;
        }

        public void onMessage(UUID uuid, Message message, IgniteRunnable igniteRunnable) {
            igniteRunnable.run();
            if (!(message instanceof GridTestMessage)) {
                TestCase.fail();
                return;
            }
            GridTestMessage gridTestMessage = (GridTestMessage) message;
            if (!gridTestMessage.getSourceNodeId().equals(uuid)) {
                TestCase.fail("Listener nodeId is not equal to message nodeId.");
            }
            if (!GridTcpCommunicationSpiMultithreadedSelfTest.reject) {
                this.rcvdMsgs.offer(gridTestMessage);
            }
            if (this.locNodeId.equals(uuid)) {
                return;
            }
            this.rmtMsgCnt.incrementAndGet();
        }

        public void onDisconnected(UUID uuid) {
        }

        public ConcurrentLinkedDeque8<GridTestMessage> receivedMsgs() {
            return this.rcvdMsgs;
        }

        public int remoteMessageCount() {
            return this.rmtMsgCnt.get();
        }

        public String toString() {
            return "MessageListener [nodeId=" + this.locNodeId + ", rcvd=" + this.rcvdMsgs.sizex() + ']';
        }

        static {
            $assertionsDisabled = !GridTcpCommunicationSpiMultithreadedSelfTest.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GridTcpCommunicationSpiMultithreadedSelfTest(boolean z) {
        super(false);
        this.msgId = new AtomicLong();
        this.useShmem = z;
    }

    public GridTcpCommunicationSpiMultithreadedSelfTest() {
        this(false);
    }

    public void testSendToRandomNodesMultithreaded() throws Exception {
        info(">>> Starting send to random nodes multithreaded test. <<<");
        reject = false;
        assertEquals("Invalid listener count", getSpiCount(), lsnrs.size());
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        long currentTimeMillis = System.currentTimeMillis();
        multithreadedAsync(new Runnable() { // from class: org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithreadedSelfTest.2
            private Random rnd = new Random();

            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 5000; i++) {
                    try {
                        ClusterNode randomNode = GridTcpCommunicationSpiMultithreadedSelfTest.this.randomNode(this.rnd);
                        ClusterNode randomNode2 = GridTcpCommunicationSpiMultithreadedSelfTest.this.randomNode(this.rnd);
                        Message gridTestMessage = new GridTestMessage(randomNode.id(), GridTcpCommunicationSpiMultithreadedSelfTest.this.msgId.getAndIncrement(), 0L);
                        ((CommunicationSpi) GridTcpCommunicationSpiMultithreadedSelfTest.spis.get(randomNode.id())).sendMessage(randomNode2, gridTestMessage);
                        ConcurrentLinkedDeque8 concurrentLinkedDeque8 = (ConcurrentLinkedDeque8) concurrentHashMap.get(randomNode2.id());
                        if (concurrentLinkedDeque8 == null) {
                            ConcurrentMap concurrentMap = concurrentHashMap;
                            UUID id = randomNode2.id();
                            ConcurrentLinkedDeque8 concurrentLinkedDeque82 = new ConcurrentLinkedDeque8();
                            concurrentLinkedDeque8 = concurrentLinkedDeque82;
                            ConcurrentLinkedDeque8 concurrentLinkedDeque83 = (ConcurrentLinkedDeque8) concurrentMap.putIfAbsent(id, concurrentLinkedDeque82);
                            if (concurrentLinkedDeque83 != null) {
                                concurrentLinkedDeque8 = concurrentLinkedDeque83;
                            }
                        }
                        concurrentLinkedDeque8.offer(gridTestMessage);
                    } catch (IgniteException e) {
                        GridTcpCommunicationSpiMultithreadedSelfTest.this.log().error("Unable to send message.", e);
                        TestCase.fail("Unable to send message: " + e.getMessage());
                        return;
                    }
                }
            }
        }, getSpiCount() * 3, "message-sender").get();
        info(">>> Sent all messages in " + (System.currentTimeMillis() - currentTimeMillis) + " milliseconds");
        assertEquals("Invalid count of messages was sent", 5000 * getSpiCount() * 3, this.msgId.get());
        U.sleep(4000L);
        for (Map.Entry entry : concurrentHashMap.entrySet()) {
            UUID uuid = (UUID) entry.getKey();
            ConcurrentLinkedDeque8 concurrentLinkedDeque8 = (ConcurrentLinkedDeque8) entry.getValue();
            MessageListener messageListener = lsnrs.get(uuid);
            ConcurrentLinkedDeque8<GridTestMessage> receivedMsgs = messageListener.receivedMsgs();
            info(">>> Node " + uuid + " received " + messageListener.remoteMessageCount() + " remote messages of " + receivedMsgs.sizex() + " total");
            for (int i = 0; i < 3 && concurrentLinkedDeque8.sizex() != receivedMsgs.sizex(); i++) {
                info("Check failed for node [node=" + uuid + ", sent=" + concurrentLinkedDeque8.sizex() + ", rcvd=" + receivedMsgs.sizex() + ']');
                U.sleep(2000L);
            }
            assertEquals("Sent and received messages count mismatch.", concurrentLinkedDeque8.sizex(), receivedMsgs.sizex());
            assertTrue("Listener did not receive some messages: " + messageListener, receivedMsgs.containsAll(concurrentLinkedDeque8));
            assertTrue("Listener received extra messages: " + messageListener, concurrentLinkedDeque8.containsAll(receivedMsgs));
        }
    }

    public void testFlowSend() throws Exception {
        final ClusterNode randomNode;
        reject = true;
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(20);
        Random random = new Random();
        final ClusterNode randomNode2 = randomNode(random);
        do {
            randomNode = randomNode(random);
        } while (randomNode.id().equals(randomNode2.id()));
        final AtomicInteger atomicInteger = new AtomicInteger();
        IgniteInternalFuture<?> multithreadedAsync = multithreadedAsync(new Runnable() { // from class: org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithreadedSelfTest.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    int andIncrement = atomicInteger.getAndIncrement();
                    for (int i = 0; i < 1000; i++) {
                        if (andIncrement == 0) {
                            if (i % 50 == 0) {
                                GridTcpCommunicationSpiMultithreadedSelfTest.this.info(">>> Running iteration " + i);
                            }
                        }
                        try {
                            Iterator it = GridTcpCommunicationSpiMultithreadedSelfTest.nodes.iterator();
                            while (it.hasNext()) {
                                ((CommunicationSpi) GridTcpCommunicationSpiMultithreadedSelfTest.spis.get(randomNode2.id())).sendMessage((ClusterNode) it.next(), new GridTestMessage(randomNode2.id(), GridTcpCommunicationSpiMultithreadedSelfTest.this.msgId.getAndIncrement(), 0L));
                            }
                        } catch (IgniteException e) {
                            GridTcpCommunicationSpiMultithreadedSelfTest.this.log.warning(">>> Oops, unable to send message (safe to ignore).", e);
                        }
                        cyclicBarrier.await();
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                } catch (BrokenBarrierException e3) {
                    GridTcpCommunicationSpiMultithreadedSelfTest.this.info("Wait on barrier failed: " + e3);
                    Thread.currentThread().interrupt();
                }
            }
        }, 20, "message-sender");
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        IgniteInternalFuture<?> multithreadedAsync2 = multithreadedAsync(new Runnable() { // from class: org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithreadedSelfTest.4
            @Override // java.lang.Runnable
            public void run() {
                while (atomicBoolean.get() && !Thread.currentThread().isInterrupted()) {
                    try {
                        U.sleep(75L);
                        ((TcpCommunicationSpi) GridTcpCommunicationSpiMultithreadedSelfTest.spis.get(randomNode2.id())).onNodeLeft(randomNode.id());
                    } catch (IgniteInterruptedCheckedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        }, 1);
        multithreadedAsync.get();
        atomicBoolean.set(false);
        multithreadedAsync2.get();
        Iterator<CommunicationSpi<Message>> it = spis.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Collection) GridTestUtils.getFieldValue((GridNioServer) U.field(it.next(), "nioSrvr"), "sessions")).iterator();
            while (it2.hasNext()) {
                final GridNioRecoveryDescriptor outRecoveryDescriptor = ((GridNioSession) it2.next()).outRecoveryDescriptor();
                if (outRecoveryDescriptor != null) {
                    GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithreadedSelfTest.5
                        public boolean apply() {
                            return outRecoveryDescriptor.messagesRequests().isEmpty();
                        }
                    }, 10000L);
                    assertEquals("Unexpected messages: " + outRecoveryDescriptor.messagesRequests(), 0, outRecoveryDescriptor.messagesRequests().size());
                }
            }
        }
    }

    public void testPassThroughPerformance() throws Exception {
        reject = true;
        info(">>> Starting pass through performance test. <<<");
        assertEquals("Invalid listener count", getSpiCount(), lsnrs.size());
        final AtomicInteger atomicInteger = new AtomicInteger();
        long currentTimeMillis = System.currentTimeMillis();
        multithreadedAsync(new Runnable() { // from class: org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithreadedSelfTest.6
            /* JADX WARN: Type inference failed for: r0v16, types: [org.apache.ignite.spi.communication.GridTestMessage, java.io.Serializable] */
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ClusterNode clusterNode = (ClusterNode) GridTcpCommunicationSpiMultithreadedSelfTest.nodes.get(0);
                    ClusterNode clusterNode2 = (ClusterNode) GridTcpCommunicationSpiMultithreadedSelfTest.nodes.get(1);
                    CommunicationSpi communicationSpi = (CommunicationSpi) GridTcpCommunicationSpiMultithreadedSelfTest.spis.get(clusterNode.id());
                    while (atomicInteger.getAndIncrement() < 5000) {
                        ?? gridTestMessage = new GridTestMessage(clusterNode.id(), GridTcpCommunicationSpiMultithreadedSelfTest.this.msgId.getAndIncrement(), 0L);
                        gridTestMessage.payload(new byte[GridIoManagerBenchmark0.CONCUR_MSGS]);
                        communicationSpi.sendMessage(clusterNode2, (Serializable) gridTestMessage);
                    }
                } catch (IgniteException e) {
                    TestCase.fail("Unable to send message: " + e.getMessage());
                }
            }
        }, 5, "message-sender").get();
        info(">>> Sent all messages in " + (System.currentTimeMillis() - currentTimeMillis) + " milliseconds");
        assertEquals("Invalid count of messages was sent", 5000L, this.msgId.get());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ClusterNode randomNode(Random random) {
        return nodes.get(random.nextInt(nodes.size()));
    }

    private CommunicationSpi<Message> newCommunicationSpi() {
        TcpCommunicationSpi tcpCommunicationSpi = new TcpCommunicationSpi();
        if (!this.useShmem) {
            tcpCommunicationSpi.setSharedMemoryPort(-1);
        }
        tcpCommunicationSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
        tcpCommunicationSpi.setIdleConnectionTimeout(2000L);
        return tcpCommunicationSpi;
    }

    private int getSpiCount() {
        return 3;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void beforeTestsStarted() throws Exception {
        spis.clear();
        nodes.clear();
        spiRsrcs.clear();
        lsnrs.clear();
        HashMap hashMap = new HashMap();
        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(this.log));
        timeoutProcessor.start(true);
        timeoutProcessor.onKernalStart(true);
        for (int i = 0; i < getSpiCount(); i++) {
            CommunicationSpi<Message> newCommunicationSpi = newCommunicationSpi();
            GridTestUtils.setFieldValue(newCommunicationSpi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
            IgniteTestResources igniteTestResources = new IgniteTestResources();
            GridTestNode gridTestNode = new GridTestNode(igniteTestResources.getNodeId());
            gridTestNode.order(i);
            GridSpiTestContext initSpiContext = initSpiContext();
            initSpiContext.timeoutProcessor(timeoutProcessor);
            initSpiContext.setLocalNode(gridTestNode);
            info(">>> Initialized context: nodeId=" + initSpiContext.localNode().id());
            spiRsrcs.add(igniteTestResources);
            igniteTestResources.inject(newCommunicationSpi);
            MessageListener messageListener = new MessageListener(igniteTestResources.getNodeId());
            newCommunicationSpi.setListener(messageListener);
            lsnrs.put(igniteTestResources.getNodeId(), messageListener);
            info("Lsnrs: " + lsnrs);
            gridTestNode.setAttributes(newCommunicationSpi.getNodeAttributes());
            gridTestNode.setAttribute("org.apache.ignite.macs", F.concat(U.allLocalMACs(), ", "));
            nodes.add(gridTestNode);
            newCommunicationSpi.spiStart(getTestIgniteInstanceName() + (i + 1));
            spis.put(igniteTestResources.getNodeId(), newCommunicationSpi);
            newCommunicationSpi.onContextInitialized(initSpiContext);
            hashMap.put(gridTestNode, initSpiContext);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            for (ClusterNode clusterNode : nodes) {
                if (!clusterNode.equals(entry.getKey())) {
                    ((GridSpiTestContext) entry.getValue()).remoteNodes().add(clusterNode);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        super.afterTest();
        for (MessageListener messageListener : lsnrs.values()) {
            messageListener.rcvdMsgs.clear();
            messageListener.rmtMsgCnt.set(0);
        }
        Iterator<CommunicationSpi<Message>> it = spis.values().iterator();
        while (it.hasNext()) {
            final ConcurrentMap concurrentMap = (ConcurrentMap) U.field(it.next(), "clients");
            if (!$assertionsDisabled && !GridTestUtils.waitForCondition(new PA() { // from class: org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithreadedSelfTest.7
                public boolean apply() {
                    for (GridCommunicationClient[] gridCommunicationClientArr : concurrentMap.values()) {
                        for (GridCommunicationClient gridCommunicationClient : gridCommunicationClientArr) {
                            if (gridCommunicationClient != null) {
                                return false;
                            }
                        }
                    }
                    return true;
                }
            }, getTestTimeout())) {
                throw new AssertionError("Clients: " + concurrentMap);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTestsStopped() throws Exception {
        if (timeoutProcessor != null) {
            timeoutProcessor.onKernalStop(true);
            timeoutProcessor.stop(true);
            timeoutProcessor = null;
        }
        for (CommunicationSpi<Message> communicationSpi : spis.values()) {
            communicationSpi.onContextDestroyed();
            communicationSpi.setListener((CommunicationListener) null);
            communicationSpi.spiStop();
        }
        Iterator<IgniteTestResources> it = spiRsrcs.iterator();
        while (it.hasNext()) {
            it.next().stopThreads();
        }
        lsnrs.clear();
        spiRsrcs.clear();
        spis.clear();
        nodes.clear();
    }

    static {
        $assertionsDisabled = !GridTcpCommunicationSpiMultithreadedSelfTest.class.desiredAssertionStatus();
        spiRsrcs = new ArrayList();
        spis = new ConcurrentHashMap();
        lsnrs = new HashMap();
        nodes = new ArrayList();
        GridIoMessageFactory.registerCustom((short) 200, new CO<Message>() { // from class: org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithreadedSelfTest.1
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Message m1584apply() {
                return new GridTestMessage();
            }
        });
    }
}
