package io.atomix.cluster.messaging.impl;

import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.TestBootstrapService;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.impl.DefaultClusterMembershipService;
import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.Subscription;
import io.atomix.cluster.messaging.UnicastService;
import io.atomix.cluster.protocol.SwimMembershipProtocol;
import io.atomix.utils.Managed;
import io.atomix.utils.Version;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.junit.AutoCloseResources;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

@AutoCloseResources
/* loaded from: input_file:io/atomix/cluster/messaging/impl/DefaultClusterEventServiceTest.class */
public class DefaultClusterEventServiceTest {
    private static final Serializer SERIALIZER = Serializer.using(Namespaces.BASIC);
    private CountDownLatch membersDiscovered;
    private final TestMessagingServiceFactory messagingServiceFactory = new TestMessagingServiceFactory();
    private final TestUnicastServiceFactory unicastServiceFactory = new TestUnicastServiceFactory();
    private final Map<Integer, Managed> managedMemberShipServices = new HashMap();
    private final Map<Integer, Managed> managedEventService = new HashMap();

    @AutoCloseResources.AutoCloseResource
    private final MeterRegistry meterRegistry = new SimpleMeterRegistry();

    private Member buildNode(int i) {
        return Member.builder(String.valueOf(i)).withHost("localhost").withPort(i).build();
    }

    private Collection<Node> buildBootstrapNodes(int i) {
        return (Collection) IntStream.range(1, i + 1).mapToObj(i2 -> {
            return Node.builder().withId(String.valueOf(i2)).withAddress(Address.from("localhost", i2)).build();
        }).collect(Collectors.toList());
    }

    private ClusterEventService buildServices(int i, Collection<Node> collection) {
        Member buildNode = buildNode(i);
        MessagingService messagingService = (MessagingService) this.messagingServiceFactory.newMessagingService(buildNode.address()).start().join();
        TestBootstrapService testBootstrapService = new TestBootstrapService(messagingService, (UnicastService) this.unicastServiceFactory.newUnicastService(buildNode.address()).start().join());
        Managed defaultClusterMembershipService = new DefaultClusterMembershipService(buildNode, Version.from("1.0.0"), new DefaultNodeDiscoveryService(testBootstrapService, buildNode, new BootstrapDiscoveryProvider(collection)), testBootstrapService, SwimMembershipProtocol.builder(this.meterRegistry).build());
        this.managedMemberShipServices.put(Integer.valueOf(i), defaultClusterMembershipService);
        defaultClusterMembershipService.addListener(clusterMembershipEvent -> {
            this.membersDiscovered.countDown();
        });
        Managed defaultClusterEventService = new DefaultClusterEventService((ClusterMembershipService) defaultClusterMembershipService.start().join(), messagingService);
        this.managedEventService.put(Integer.valueOf(i), defaultClusterEventService);
        return (ClusterEventService) defaultClusterEventService.start().join();
    }

    @After
    public void tearDown() {
        CompletableFuture.allOf((CompletableFuture[]) this.managedMemberShipServices.values().stream().map((v0) -> {
            return v0.stop();
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
        CompletableFuture.allOf((CompletableFuture[]) this.managedEventService.values().stream().map((v0) -> {
            return v0.stop();
        }).toArray(i2 -> {
            return new CompletableFuture[i2];
        })).join();
    }

    @Test
    public void shouldBroadcast() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(6);
        Collection<Node> buildBootstrapNodes = buildBootstrapNodes(3);
        ClusterEventService buildServices = buildServices(1, buildBootstrapNodes);
        ClusterEventService buildServices2 = buildServices(2, buildBootstrapNodes);
        ClusterEventService buildServices3 = buildServices(3, buildBootstrapNodes);
        this.membersDiscovered.await();
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        buildServices.subscribe("test-topic", serializer::decode, str -> {
            Assert.assertEquals("Hello world!", str);
            copyOnWriteArraySet.add(1);
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        Serializer serializer2 = SERIALIZER;
        Objects.requireNonNull(serializer2);
        buildServices2.subscribe("test-topic", serializer2::decode, str2 -> {
            Assert.assertEquals("Hello world!", str2);
            copyOnWriteArraySet.add(2);
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        Serializer serializer3 = SERIALIZER;
        Objects.requireNonNull(serializer3);
        buildServices3.subscribe("test-topic", serializer3::decode, str3 -> {
            Assert.assertEquals("Hello world!", str3);
            copyOnWriteArraySet.add(3);
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        TestUtil.waitUntil(() -> {
            return buildServices3.getSubscribers("test-topic").containsAll(Set.of(MemberId.from("1"), MemberId.from("2")));
        });
        Serializer serializer4 = SERIALIZER;
        Objects.requireNonNull(serializer4);
        buildServices3.broadcast("test-topic", "Hello world!", (v1) -> {
            return r3.encode(v1);
        });
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals(3L, copyOnWriteArraySet.size());
    }

    @Test
    public void shouldSubscribeMultipleTopics() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(4);
        Collection<Node> buildBootstrapNodes = buildBootstrapNodes(2);
        ClusterEventService buildServices = buildServices(1, buildBootstrapNodes);
        ClusterEventService buildServices2 = buildServices(2, buildBootstrapNodes);
        this.membersDiscovered.await();
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        buildServices.subscribe("test-topic1", serializer::decode, str -> {
            Assert.assertEquals("Hello world!", str);
            copyOnWriteArraySet.add(1);
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        Serializer serializer2 = SERIALIZER;
        Objects.requireNonNull(serializer2);
        buildServices.subscribe("test-topic2", serializer2::decode, str2 -> {
            Assert.assertEquals("Hello world!", str2);
            copyOnWriteArraySet.add(2);
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        TestUtil.waitUntil(() -> {
            return buildServices2.getSubscribers("test-topic1").contains(MemberId.from("1"));
        });
        TestUtil.waitUntil(() -> {
            return buildServices2.getSubscribers("test-topic2").contains(MemberId.from("1"));
        });
        Serializer serializer3 = SERIALIZER;
        Objects.requireNonNull(serializer3);
        buildServices2.broadcast("test-topic1", "Hello world!", (v1) -> {
            return r3.encode(v1);
        });
        Serializer serializer4 = SERIALIZER;
        Objects.requireNonNull(serializer4);
        buildServices2.broadcast("test-topic2", "Hello world!", (v1) -> {
            return r3.encode(v1);
        });
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals(2L, copyOnWriteArraySet.size());
    }

    @Test
    public void shouldBroadcastToMultipleLocalSubscriptionsForSameTopic() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(0);
        ClusterEventService buildServices = buildServices(1, buildBootstrapNodes(1));
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        buildServices.subscribe("test-topic1", serializer::decode, str -> {
            Assert.assertEquals("Hello world!", str);
            copyOnWriteArraySet.add(1);
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        Serializer serializer2 = SERIALIZER;
        Objects.requireNonNull(serializer2);
        buildServices.subscribe("test-topic1", serializer2::decode, str2 -> {
            Assert.assertEquals("Hello world!", str2);
            copyOnWriteArraySet.add(2);
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        Serializer serializer3 = SERIALIZER;
        Objects.requireNonNull(serializer3);
        buildServices.broadcast("test-topic1", "Hello world!", (v1) -> {
            return r3.encode(v1);
        });
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals(2L, copyOnWriteArraySet.size());
        Assertions.assertThat(copyOnWriteArraySet).containsExactlyInAnyOrder(new Integer[]{1, 2});
    }

    @Test
    public void shouldNotCloseOtherSubscriptions() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(0);
        ClusterEventService buildServices = buildServices(1, buildBootstrapNodes(1));
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        Subscription subscription = (Subscription) buildServices.subscribe("test-topic1", serializer::decode, str -> {
        }, MoreExecutors.directExecutor()).join();
        Serializer serializer2 = SERIALIZER;
        Objects.requireNonNull(serializer2);
        buildServices.subscribe("test-topic1", serializer2::decode, str2 -> {
            Assert.assertEquals("Hello world!", str2);
            copyOnWriteArraySet.add(2);
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        subscription.close().join();
        Serializer serializer3 = SERIALIZER;
        Objects.requireNonNull(serializer3);
        buildServices.broadcast("test-topic1", "Hello world!", (v1) -> {
            return r3.encode(v1);
        });
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals(1L, copyOnWriteArraySet.size());
    }

    @Test
    public void shouldBroadcastAfterRestart() throws InterruptedException {
        this.membersDiscovered = new CountDownLatch(4);
        Collection<Node> buildBootstrapNodes = buildBootstrapNodes(2);
        ClusterEventService buildServices = buildServices(1, buildBootstrapNodes);
        ClusterEventService buildServices2 = buildServices(2, buildBootstrapNodes);
        this.membersDiscovered.await();
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        buildServices.subscribe("test-topic", serializer::decode, str -> {
            Assert.assertEquals("Hello world!", str);
            copyOnWriteArraySet.add(1);
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        TestUtil.waitUntil(() -> {
            return buildServices2.getSubscribers("test-topic").contains(MemberId.from("1"));
        });
        this.managedMemberShipServices.get(2).stop().join();
        this.managedEventService.get(2).stop().join();
        ClusterEventService buildServices3 = buildServices(2, buildBootstrapNodes);
        TestUtil.waitUntil(() -> {
            return buildServices3.getSubscribers("test-topic").contains(MemberId.from("1"));
        });
        Serializer serializer2 = SERIALIZER;
        Objects.requireNonNull(serializer2);
        buildServices3.broadcast("test-topic", "Hello world!", (v1) -> {
            return r3.encode(v1);
        });
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assert.assertEquals(1L, copyOnWriteArraySet.size());
    }

    @Test
    public void shouldLogHandlerFailuresWithoutCrashing() throws InterruptedException {
        ClusterEventService buildServices = buildServices(1, buildBootstrapNodes(1));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference("");
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        buildServices.subscribe("test", serializer::decode, str -> {
            atomicReference.set(str);
            if (atomicInteger.getAndIncrement() == 0) {
                throw new RuntimeException("e");
            }
            countDownLatch.countDown();
        }, MoreExecutors.directExecutor()).join();
        buildServices.broadcast("test", "foo");
        buildServices.broadcast("test", "bar");
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals("bar", atomicReference.get());
    }
}
