/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.messaging.impl;

import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.TestBootstrapService;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.discovery.ManagedNodeDiscoveryService;
import io.atomix.cluster.discovery.NodeDiscoveryProvider;
import io.atomix.cluster.impl.DefaultClusterMembershipService;
import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.Subscription;
import io.atomix.cluster.messaging.UnicastService;
import io.atomix.cluster.messaging.impl.DefaultClusterEventService;
import io.atomix.cluster.messaging.impl.TestMessagingServiceFactory;
import io.atomix.cluster.messaging.impl.TestUnicastServiceFactory;
import io.atomix.cluster.protocol.SwimMembershipProtocol;
import io.atomix.utils.Managed;
import io.atomix.utils.Version;
import io.atomix.utils.event.EventListener;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.junit.AutoCloseResources;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

@AutoCloseResources
public class DefaultClusterEventServiceTest {
    private static final Serializer SERIALIZER = Serializer.using((Namespace)Namespaces.BASIC);
    private final TestMessagingServiceFactory messagingServiceFactory = new TestMessagingServiceFactory();
    private final TestUnicastServiceFactory unicastServiceFactory = new TestUnicastServiceFactory();
    private final Map<Integer, Managed> managedMemberShipServices = new HashMap<Integer, Managed>();
    private final Map<Integer, Managed> managedEventService = new HashMap<Integer, Managed>();
    private CountDownLatch membersDiscovered;
    @AutoCloseResources.AutoCloseResource
    private final MeterRegistry meterRegistry = new SimpleMeterRegistry();

    private Member buildNode(int memberId) {
        return Member.builder((String)String.valueOf(memberId)).withHost("localhost").withPort(memberId).build();
    }

    private Collection<Node> buildBootstrapNodes(int nodes) {
        return IntStream.range(1, nodes + 1).mapToObj(id -> Node.builder().withId(String.valueOf(id)).withAddress(Address.from((String)"localhost", (int)id)).build()).collect(Collectors.toList());
    }

    private ClusterEventService buildServices(int memberId, Collection<Node> bootstrapLocations) {
        Member localMember = this.buildNode(memberId);
        MessagingService messagingService = (MessagingService)this.messagingServiceFactory.newMessagingService(localMember.address()).start().join();
        TestBootstrapService bootstrapService1 = new TestBootstrapService(messagingService, (UnicastService)this.unicastServiceFactory.newUnicastService(localMember.address()).start().join());
        DefaultClusterMembershipService managedClusterMembershipService = new DefaultClusterMembershipService(localMember, Version.from((String)"1.0.0"), (ManagedNodeDiscoveryService)new DefaultNodeDiscoveryService((BootstrapService)bootstrapService1, (Node)localMember, (NodeDiscoveryProvider)new BootstrapDiscoveryProvider(bootstrapLocations)), (BootstrapService)bootstrapService1, SwimMembershipProtocol.builder((MeterRegistry)this.meterRegistry).build());
        this.managedMemberShipServices.put(memberId, (Managed)managedClusterMembershipService);
        managedClusterMembershipService.addListener((EventListener)((ClusterMembershipEventListener)event -> this.membersDiscovered.countDown()));
        ClusterMembershipService clusterMembershipService = (ClusterMembershipService)managedClusterMembershipService.start().join();
        DefaultClusterEventService clusterEventingService1 = new DefaultClusterEventService(clusterMembershipService, messagingService);
        this.managedEventService.put(memberId, (Managed)clusterEventingService1);
        return (ClusterEventService)clusterEventingService1.start().join();
    }

    @After
    public void tearDown() {
        CompletableFuture.allOf((CompletableFuture[])this.managedMemberShipServices.values().stream().map(Managed::stop).toArray(CompletableFuture[]::new)).join();
        CompletableFuture.allOf((CompletableFuture[])this.managedEventService.values().stream().map(Managed::stop).toArray(CompletableFuture[]::new)).join();
    }

    @Test
    public void shouldBroadcast() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(6);
        Collection<Node> bootstrapLocations = this.buildBootstrapNodes(3);
        ClusterEventService eventService1 = this.buildServices(1, bootstrapLocations);
        ClusterEventService eventService2 = this.buildServices(2, bootstrapLocations);
        ClusterEventService eventService3 = this.buildServices(3, bootstrapLocations);
        this.membersDiscovered.await();
        CopyOnWriteArraySet events = new CopyOnWriteArraySet();
        CountDownLatch latch = new CountDownLatch(3);
        String topic = "test-topic";
        eventService1.subscribe("test-topic", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {
            Assert.assertEquals((Object)"Hello world!", (Object)message);
            events.add(1);
            latch.countDown();
        }, MoreExecutors.directExecutor()).join();
        eventService2.subscribe("test-topic", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {
            Assert.assertEquals((Object)"Hello world!", (Object)message);
            events.add(2);
            latch.countDown();
        }, MoreExecutors.directExecutor()).join();
        eventService3.subscribe("test-topic", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {
            Assert.assertEquals((Object)"Hello world!", (Object)message);
            events.add(3);
            latch.countDown();
        }, MoreExecutors.directExecutor()).join();
        TestUtil.waitUntil(() -> eventService3.getSubscribers("test-topic").containsAll(Set.of(MemberId.from((String)"1"), MemberId.from((String)"2"))));
        eventService3.broadcast("test-topic", (Object)"Hello world!", arg_0 -> ((Serializer)SERIALIZER).encode(arg_0));
        Assertions.assertThat((boolean)latch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals((long)3L, (long)events.size());
    }

    @Test
    public void shouldSubscribeMultipleTopics() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(4);
        Collection<Node> bootstrapLocations = this.buildBootstrapNodes(2);
        ClusterEventService eventService1 = this.buildServices(1, bootstrapLocations);
        ClusterEventService eventService2 = this.buildServices(2, bootstrapLocations);
        this.membersDiscovered.await();
        CopyOnWriteArraySet events = new CopyOnWriteArraySet();
        CountDownLatch latch = new CountDownLatch(2);
        String topic1 = "test-topic1";
        String topic2 = "test-topic2";
        eventService1.subscribe("test-topic1", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {
            Assert.assertEquals((Object)"Hello world!", (Object)message);
            events.add(1);
            latch.countDown();
        }, MoreExecutors.directExecutor()).join();
        eventService1.subscribe("test-topic2", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {
            Assert.assertEquals((Object)"Hello world!", (Object)message);
            events.add(2);
            latch.countDown();
        }, MoreExecutors.directExecutor()).join();
        TestUtil.waitUntil(() -> eventService2.getSubscribers("test-topic1").contains(MemberId.from((String)"1")));
        TestUtil.waitUntil(() -> eventService2.getSubscribers("test-topic2").contains(MemberId.from((String)"1")));
        eventService2.broadcast("test-topic1", (Object)"Hello world!", arg_0 -> ((Serializer)SERIALIZER).encode(arg_0));
        eventService2.broadcast("test-topic2", (Object)"Hello world!", arg_0 -> ((Serializer)SERIALIZER).encode(arg_0));
        Assertions.assertThat((boolean)latch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals((long)2L, (long)events.size());
    }

    @Test
    public void shouldBroadcastToMultipleLocalSubscriptionsForSameTopic() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(0);
        Collection<Node> bootstrapLocations = this.buildBootstrapNodes(1);
        ClusterEventService eventService1 = this.buildServices(1, bootstrapLocations);
        CopyOnWriteArraySet events = new CopyOnWriteArraySet();
        CountDownLatch latch = new CountDownLatch(2);
        String topic = "test-topic1";
        eventService1.subscribe("test-topic1", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {
            Assert.assertEquals((Object)"Hello world!", (Object)message);
            events.add(1);
            latch.countDown();
        }, MoreExecutors.directExecutor()).join();
        eventService1.subscribe("test-topic1", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {
            Assert.assertEquals((Object)"Hello world!", (Object)message);
            events.add(2);
            latch.countDown();
        }, MoreExecutors.directExecutor()).join();
        eventService1.broadcast("test-topic1", (Object)"Hello world!", arg_0 -> ((Serializer)SERIALIZER).encode(arg_0));
        Assertions.assertThat((boolean)latch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals((long)2L, (long)events.size());
        Assertions.assertThat(events).containsExactlyInAnyOrder((Object[])new Integer[]{1, 2});
    }

    @Test
    public void shouldNotCloseOtherSubscriptions() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(0);
        Collection<Node> bootstrapLocations = this.buildBootstrapNodes(1);
        ClusterEventService eventService1 = this.buildServices(1, bootstrapLocations);
        CopyOnWriteArraySet events = new CopyOnWriteArraySet();
        CountDownLatch latch = new CountDownLatch(1);
        String topic = "test-topic1";
        Subscription subscriptionToClose = (Subscription)eventService1.subscribe("test-topic1", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {}, MoreExecutors.directExecutor()).join();
        eventService1.subscribe("test-topic1", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {
            Assert.assertEquals((Object)"Hello world!", (Object)message);
            events.add(2);
            latch.countDown();
        }, MoreExecutors.directExecutor()).join();
        subscriptionToClose.close().join();
        eventService1.broadcast("test-topic1", (Object)"Hello world!", arg_0 -> ((Serializer)SERIALIZER).encode(arg_0));
        Assertions.assertThat((boolean)latch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals((long)1L, (long)events.size());
    }

    @Test
    public void shouldBroadcastAfterRestart() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(4);
        Collection<Node> bootstrapLocations = this.buildBootstrapNodes(2);
        ClusterEventService eventService1 = this.buildServices(1, bootstrapLocations);
        ClusterEventService eventService2 = this.buildServices(2, bootstrapLocations);
        this.membersDiscovered.await();
        CopyOnWriteArraySet events = new CopyOnWriteArraySet();
        CountDownLatch latch = new CountDownLatch(1);
        String topic = "test-topic";
        eventService1.subscribe("test-topic", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), message -> {
            Assert.assertEquals((Object)"Hello world!", (Object)message);
            events.add(1);
            latch.countDown();
        }, MoreExecutors.directExecutor()).join();
        TestUtil.waitUntil(() -> eventService2.getSubscribers("test-topic").contains(MemberId.from((String)"1")));
        this.managedMemberShipServices.get(2).stop().join();
        this.managedEventService.get(2).stop().join();
        ClusterEventService eventService2Restarted = this.buildServices(2, bootstrapLocations);
        TestUtil.waitUntil(() -> eventService2Restarted.getSubscribers("test-topic").contains(MemberId.from((String)"1")));
        eventService2Restarted.broadcast("test-topic", (Object)"Hello world!", arg_0 -> ((Serializer)SERIALIZER).encode(arg_0));
        Assertions.assertThat((boolean)latch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals((long)1L, (long)events.size());
    }

    @Test
    public void shouldLogHandlerFailuresWithoutCrashing() throws InterruptedException {
        Collection<Node> bootstrapLocations = this.buildBootstrapNodes(1);
        ClusterEventService eventService1 = this.buildServices(1, bootstrapLocations);
        AtomicInteger eventsCounter = new AtomicInteger(0);
        CountDownLatch awaitCompletion = new CountDownLatch(1);
        AtomicReference<String> received = new AtomicReference<String>("");
        eventService1.subscribe("test", arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), s -> {
            received.set((String)s);
            if (eventsCounter.getAndIncrement() == 0) {
                throw new RuntimeException("e");
            }
            awaitCompletion.countDown();
        }, MoreExecutors.directExecutor()).join();
        eventService1.broadcast("test", (Object)"foo");
        eventService1.broadcast("test", (Object)"bar");
        awaitCompletion.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((Object)"bar", (Object)received.get());
    }
}

