package org.apache.ignite.network.scalecube;

import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.transport.api.Transport;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
import org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageTypes;
import org.apache.ignite.network.TestMessagesFactory;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.class */
class ItScaleCubeNetworkMessagingTest {
    private Cluster testCluster;
    private final TestMessagesFactory messageFactory = new TestMessagesFactory();

    /* renamed from: org.apache.ignite.network.scalecube.ItScaleCubeNetworkMessagingTest$1Data, reason: invalid class name */
    /* loaded from: input_file:org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest$1Data.class */
    class C1Data {
        private final TestMessage message;
        private final NetworkAddress sender;
        private final Long correlationId;

        private C1Data(TestMessage testMessage, NetworkAddress networkAddress, Long l) {
            this.message = testMessage;
            this.sender = networkAddress;
            this.correlationId = l;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest$Cluster.class */
    public static final class Cluster {
        final List<ClusterService> members;
        private final CountDownLatch startupLatch;
        private final NodeFinder nodeFinder;

        Cluster(int i, TestInfo testInfo) {
            this.startupLatch = new CountDownLatch(i - 1);
            List<NetworkAddress> findLocalAddresses = ClusterServiceTestUtils.findLocalAddresses(3344, 3344 + i);
            this.nodeFinder = new StaticNodeFinder(findLocalAddresses);
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            this.members = (List) findLocalAddresses.stream().map(networkAddress -> {
                return startNode(testInfo, networkAddress, atomicBoolean.getAndSet(false));
            }).collect(Collectors.toUnmodifiableList());
        }

        private ClusterService startNode(TestInfo testInfo, NetworkAddress networkAddress, boolean z) {
            ClusterService clusterService = ClusterServiceTestUtils.clusterService(testInfo, networkAddress.port(), this.nodeFinder);
            if (z) {
                clusterService.topologyService().addEventHandler(new TopologyEventHandler() { // from class: org.apache.ignite.network.scalecube.ItScaleCubeNetworkMessagingTest.Cluster.1
                    public void onAppeared(ClusterNode clusterNode) {
                        Cluster.this.startupLatch.countDown();
                    }
                });
            }
            return clusterService;
        }

        void startAwait() throws InterruptedException {
            this.members.forEach((v0) -> {
                v0.start();
            });
            if (!this.startupLatch.await(3L, TimeUnit.SECONDS)) {
                throw new AssertionError();
            }
        }

        void shutdown() {
            this.members.forEach((v0) -> {
                v0.stop();
            });
        }
    }

    ItScaleCubeNetworkMessagingTest() {
    }

    @AfterEach
    public void tearDown() {
        this.testCluster.shutdown();
    }

    @Test
    public void messageWasSentToAllMembersSuccessfully(TestInfo testInfo) throws Exception {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.testCluster = new Cluster(3, testInfo);
        for (ClusterService clusterService : this.testCluster.members) {
            clusterService.messagingService().addMessageHandler(TestMessageTypes.class, (networkMessage, networkAddress, l) -> {
                concurrentHashMap.put(clusterService.localConfiguration().getName(), (TestMessage) networkMessage);
                countDownLatch.countDown();
            });
        }
        this.testCluster.startAwait();
        TestMessage build = this.messageFactory.testMessage().msg("Message from Alice").build();
        ClusterService clusterService2 = this.testCluster.members.get(0);
        Iterator it = clusterService2.topologyService().allMembers().iterator();
        while (it.hasNext()) {
            clusterService2.messagingService().weakSend((ClusterNode) it.next(), build);
        }
        Assertions.assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        Stream<R> map = this.testCluster.members.stream().map(clusterService3 -> {
            return clusterService3.localConfiguration().getName();
        });
        Objects.requireNonNull(concurrentHashMap);
        map.map((v1) -> {
            return r1.get(v1);
        }).forEach(testMessage -> {
            MatcherAssert.assertThat(testMessage.msg(), Matchers.is(build.msg()));
        });
    }

    @Test
    public void testShutdown(TestInfo testInfo) throws Exception {
        testShutdown0(testInfo, false);
    }

    @Test
    public void testForcefulShutdown(TestInfo testInfo) throws Exception {
        testShutdown0(testInfo, true);
    }

    @Test
    public void testSendMessageToSelf(TestInfo testInfo) throws Exception {
        this.testCluster = new Cluster(1, testInfo);
        this.testCluster.startAwait();
        ClusterService clusterService = this.testCluster.members.get(0);
        ClusterNode localMember = clusterService.topologyService().localMember();
        CompletableFuture completableFuture = new CompletableFuture();
        clusterService.messagingService().addMessageHandler(TestMessageTypes.class, (networkMessage, networkAddress, l) -> {
            completableFuture.complete(new C1Data((TestMessage) networkMessage, networkAddress, l));
        });
        TestMessage build = this.messageFactory.testMessage().msg("request").build();
        clusterService.messagingService().send(localMember, build);
        C1Data c1Data = (C1Data) completableFuture.get(3L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(c1Data.message.msg(), Matchers.is(build.msg()));
        MatcherAssert.assertThat(c1Data.sender.consistentId(), Matchers.is(localMember.name()));
        Assertions.assertNull(c1Data.correlationId);
    }

    @Test
    public void testInvokeMessageToSelf(TestInfo testInfo) throws Exception {
        this.testCluster = new Cluster(1, testInfo);
        this.testCluster.startAwait();
        ClusterService clusterService = this.testCluster.members.get(0);
        ClusterNode localMember = clusterService.topologyService().localMember();
        TestMessage build = this.messageFactory.testMessage().msg("request").build();
        TestMessage build2 = this.messageFactory.testMessage().msg("response").build();
        clusterService.messagingService().addMessageHandler(TestMessageTypes.class, (networkMessage, networkAddress, l) -> {
            if (networkMessage.equals(build)) {
                clusterService.messagingService().respond(localMember, build2, l.longValue());
            }
        });
        CompletableFuture invoke = clusterService.messagingService().invoke(localMember, build, 1000L);
        Class<TestMessage> cls = TestMessage.class;
        Objects.requireNonNull(TestMessage.class);
        MatcherAssert.assertThat(((TestMessage) invoke.thenApply((v1) -> {
            return r1.cast(v1);
        }).get(3L, TimeUnit.SECONDS)).msg(), Matchers.is(build2.msg()));
    }

    @Test
    public void testInvokeDuringStop(TestInfo testInfo) throws InterruptedException {
        this.testCluster = new Cluster(2, testInfo);
        this.testCluster.startAwait();
        ClusterService clusterService = this.testCluster.members.get(0);
        ClusterService clusterService2 = this.testCluster.members.get(1);
        CompletableFuture invoke = clusterService.messagingService().invoke(clusterService2.topologyService().localMember(), this.messageFactory.testMessage().build(), 1000L);
        CompletableFuture invoke2 = clusterService.messagingService().invoke(clusterService2.topologyService().localMember(), this.messageFactory.testMessage().build(), 1000L);
        clusterService.stop();
        MatcherAssert.assertThat(((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            invoke.get(1L, TimeUnit.SECONDS);
        })).getCause(), Matchers.instanceOf(NodeStoppingException.class));
        MatcherAssert.assertThat(((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            invoke2.get(1L, TimeUnit.SECONDS);
        })).getCause(), Matchers.instanceOf(NodeStoppingException.class));
    }

    @Test
    public void testMessageGroupsHandlers(TestInfo testInfo) throws Exception {
        this.testCluster = new Cluster(2, testInfo);
        this.testCluster.startAwait();
        ClusterService clusterService = this.testCluster.members.get(0);
        ClusterService clusterService2 = this.testCluster.members.get(1);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        clusterService.messagingService().addMessageHandler(TestMessageTypes.class, (networkMessage, networkAddress, l) -> {
            Assertions.assertTrue(completableFuture.complete(networkMessage));
        });
        clusterService.messagingService().addMessageHandler(TestMessageTypes.class, (networkMessage2, networkAddress2, l2) -> {
            Assertions.assertTrue(completableFuture2.complete(networkMessage2));
        });
        clusterService.messagingService().addMessageHandler(NetworkMessageTypes.class, (networkMessage3, networkAddress3, l3) -> {
            if (networkMessage3 instanceof FieldDescriptorMessage) {
                Assertions.assertTrue(completableFuture3.complete(networkMessage3));
            }
        });
        TestMessage build = this.messageFactory.testMessage().msg("foo").build();
        FieldDescriptorMessage build2 = new NetworkMessagesFactory().fieldDescriptorMessage().build();
        clusterService2.messagingService().send(clusterService.topologyService().localMember(), build).get(1L, TimeUnit.SECONDS);
        clusterService2.messagingService().send(clusterService.topologyService().localMember(), build2).get(1L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(completableFuture, CompletableFutureMatcher.willBe(Matchers.equalTo(build)));
        MatcherAssert.assertThat(completableFuture2, CompletableFutureMatcher.willBe(Matchers.equalTo(build)));
        MatcherAssert.assertThat(completableFuture3, CompletableFutureMatcher.willBe(Matchers.equalTo(build2)));
    }

    private void testShutdown0(TestInfo testInfo, boolean z) throws Exception {
        this.testCluster = new Cluster(2, testInfo);
        this.testCluster.startAwait();
        ClusterService clusterService = this.testCluster.members.get(0);
        ClusterService clusterService2 = this.testCluster.members.get(1);
        final String name = clusterService.localConfiguration().getName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        clusterService2.topologyService().addEventHandler(new TopologyEventHandler() { // from class: org.apache.ignite.network.scalecube.ItScaleCubeNetworkMessagingTest.1
            public void onDisappeared(ClusterNode clusterNode) {
                if (name.equals(clusterNode.name())) {
                    countDownLatch.countDown();
                }
            }
        });
        if (z) {
            stopForcefully(clusterService);
        } else {
            clusterService.stop();
        }
        Assertions.assertTrue(countDownLatch.await(z ? 10L : 3L, TimeUnit.SECONDS));
        Assertions.assertEquals(1, clusterService2.topologyService().allMembers().size());
    }

    private static void stopForcefully(ClusterService clusterService) throws Exception {
        Field declaredField = clusterService.getClass().getDeclaredField("val$clusterSvc");
        declaredField.setAccessible(true);
        ClusterService clusterService2 = (ClusterService) declaredField.get(clusterService);
        Field declaredField2 = clusterService2.getClass().getDeclaredField("cluster");
        declaredField2.setAccessible(true);
        ClusterImpl clusterImpl = (ClusterImpl) declaredField2.get(clusterService2);
        Field declaredField3 = clusterImpl.getClass().getDeclaredField("transport");
        declaredField3.setAccessible(true);
        Transport transport = (Transport) declaredField3.get(clusterImpl);
        Method declaredMethod = transport.getClass().getDeclaredMethod("stop", new Class[0]);
        declaredMethod.setAccessible(true);
        ((Mono) declaredMethod.invoke(transport, new Object[0])).block();
    }
}
