package org.apache.ignite.network.scalecube;

import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.transport.api.Transport;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageFactory;
import org.apache.ignite.network.TestMessageSerializationFactory;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.class */
class ITScaleCubeNetworkMessagingTest {
    private Cluster testCluster;

    /* renamed from: org.apache.ignite.network.scalecube.ITScaleCubeNetworkMessagingTest$1Data, reason: invalid class name */
    /* loaded from: input_file:org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest$1Data.class */
    class C1Data {
        private final NetworkMessage message;
        private final ClusterNode sender;
        private final String correlationId;

        private C1Data(NetworkMessage networkMessage, ClusterNode clusterNode, String str) {
            this.message = networkMessage;
            this.sender = clusterNode;
            this.correlationId = str;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest$Cluster.class */
    public static final class Cluster {
        private final ClusterServiceFactory networkFactory = new TestScaleCubeClusterServiceFactory();
        private final MessageSerializationRegistry serializationRegistry = new MessageSerializationRegistry().registerFactory(1, new ScaleCubeMessageSerializationFactory()).registerFactory(3, new TestMessageSerializationFactory());
        final List<ClusterService> members;
        private final CountDownLatch startupLatch;

        Cluster(int i) {
            this.startupLatch = new CountDownLatch(i - 1);
            int i2 = 3344;
            List list = (List) IntStream.range(0, i).mapToObj(i3 -> {
                return String.format("localhost:%d", Integer.valueOf(i2 + i3));
            }).collect(Collectors.toUnmodifiableList());
            this.members = (List) IntStream.range(0, i).mapToObj(i4 -> {
                return startNode("Node #" + i4, i2 + i4, list, i4 == 0);
            }).collect(Collectors.toUnmodifiableList());
        }

        private ClusterService startNode(String str, int i, List<String> list, boolean z) {
            ClusterService createClusterService = this.networkFactory.createClusterService(new ClusterLocalConfiguration(str, i, list, this.serializationRegistry));
            if (z) {
                createClusterService.topologyService().addEventHandler(new TopologyEventHandler() { // from class: org.apache.ignite.network.scalecube.ITScaleCubeNetworkMessagingTest.Cluster.1
                    public void onAppeared(ClusterNode clusterNode) {
                        Cluster.this.startupLatch.countDown();
                    }

                    public void onDisappeared(ClusterNode clusterNode) {
                    }
                });
            }
            return createClusterService;
        }

        void startAwait() throws InterruptedException {
            this.members.forEach((v0) -> {
                v0.start();
            });
            if (!this.startupLatch.await(3L, TimeUnit.SECONDS)) {
                throw new AssertionError();
            }
        }

        void shutdown() {
            this.members.forEach((v0) -> {
                v0.shutdown();
            });
        }
    }

    ITScaleCubeNetworkMessagingTest() {
    }

    @AfterEach
    public void afterEach() {
        this.testCluster.shutdown();
    }

    @Test
    public void messageWasSentToAllMembersSuccessfully() throws Exception {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.testCluster = new Cluster(3);
        for (ClusterService clusterService : this.testCluster.members) {
            clusterService.messagingService().addMessageHandler((networkMessage, clusterNode, str) -> {
                concurrentHashMap.put(clusterService.localConfiguration().getName(), networkMessage);
                countDownLatch.countDown();
            });
        }
        this.testCluster.startAwait();
        TestMessage build = TestMessageFactory.testMessage().msg("Message from Alice").build();
        ClusterService clusterService2 = this.testCluster.members.get(0);
        Iterator it = clusterService2.topologyService().allMembers().iterator();
        while (it.hasNext()) {
            clusterService2.messagingService().weakSend((ClusterNode) it.next(), build);
        }
        Assertions.assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        Stream<R> map = this.testCluster.members.stream().map(clusterService3 -> {
            return clusterService3.localConfiguration().getName();
        });
        Objects.requireNonNull(concurrentHashMap);
        map.map((v1) -> {
            return r1.get(v1);
        }).forEach(networkMessage2 -> {
            MatcherAssert.assertThat(networkMessage2, Matchers.is(build));
        });
    }

    @Test
    public void testShutdown() throws Exception {
        testShutdown0(false);
    }

    @Test
    public void testForcefulShutdown() throws Exception {
        testShutdown0(true);
    }

    @Test
    public void testSendMessageToSelf() throws Exception {
        this.testCluster = new Cluster(1);
        this.testCluster.startAwait();
        ClusterService clusterService = this.testCluster.members.get(0);
        ClusterNode localMember = clusterService.topologyService().localMember();
        CompletableFuture completableFuture = new CompletableFuture();
        clusterService.messagingService().addMessageHandler((networkMessage, clusterNode, str) -> {
            completableFuture.complete(new C1Data(networkMessage, clusterNode, str));
        });
        TestMessage build = TestMessageFactory.testMessage().msg("request").build();
        clusterService.messagingService().send(localMember, build, "foobar");
        C1Data c1Data = (C1Data) completableFuture.get(3L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(c1Data.message, Matchers.is(build));
        MatcherAssert.assertThat(c1Data.sender, Matchers.is(localMember));
        MatcherAssert.assertThat(c1Data.correlationId, Matchers.is("foobar"));
    }

    @Test
    public void testInvokeMessageToSelf() throws Exception {
        this.testCluster = new Cluster(1);
        this.testCluster.startAwait();
        ClusterService clusterService = this.testCluster.members.get(0);
        ClusterNode localMember = clusterService.topologyService().localMember();
        TestMessage build = TestMessageFactory.testMessage().msg("request").build();
        TestMessage build2 = TestMessageFactory.testMessage().msg("response").build();
        clusterService.messagingService().addMessageHandler((networkMessage, clusterNode, str) -> {
            if (networkMessage.equals(build)) {
                clusterService.messagingService().send(localMember, build2, str);
            }
        });
        CompletableFuture invoke = clusterService.messagingService().invoke(localMember, build, 1000L);
        Class<TestMessage> cls = TestMessage.class;
        Objects.requireNonNull(TestMessage.class);
        MatcherAssert.assertThat((TestMessage) invoke.thenApply((v1) -> {
            return r1.cast(v1);
        }).get(3L, TimeUnit.SECONDS), Matchers.is(build2));
    }

    private void testShutdown0(boolean z) throws Exception {
        this.testCluster = new Cluster(2);
        this.testCluster.startAwait();
        ClusterService clusterService = this.testCluster.members.get(0);
        ClusterService clusterService2 = this.testCluster.members.get(1);
        final String name = clusterService.localConfiguration().getName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        clusterService2.topologyService().addEventHandler(new TopologyEventHandler() { // from class: org.apache.ignite.network.scalecube.ITScaleCubeNetworkMessagingTest.1
            public void onAppeared(ClusterNode clusterNode) {
            }

            public void onDisappeared(ClusterNode clusterNode) {
                if (name.equals(clusterNode.name())) {
                    countDownLatch.countDown();
                }
            }
        });
        if (z) {
            stopForcefully(clusterService);
        } else {
            clusterService.shutdown();
        }
        Assertions.assertTrue(countDownLatch.await(z ? 10L : 3L, TimeUnit.SECONDS));
        Assertions.assertEquals(1, clusterService2.topologyService().allMembers().size());
    }

    private static void stopForcefully(ClusterService clusterService) throws Exception {
        Field declaredField = clusterService.getClass().getDeclaredField("val$cluster");
        declaredField.setAccessible(true);
        ClusterImpl clusterImpl = (ClusterImpl) declaredField.get(clusterService);
        Field declaredField2 = clusterImpl.getClass().getDeclaredField("transport");
        declaredField2.setAccessible(true);
        Transport transport = (Transport) declaredField2.get(clusterImpl);
        Method declaredMethod = transport.getClass().getDeclaredMethod("stop", new Class[0]);
        declaredMethod.setAccessible(true);
        ((Mono) declaredMethod.invoke(transport, new Object[0])).block();
    }
}
