package io.atomix.cluster.protocol;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.TestBootstrapService;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.discovery.NodeDiscoveryService;
import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.UnicastService;
import io.atomix.cluster.messaging.impl.TestMessagingServiceFactory;
import io.atomix.cluster.messaging.impl.TestUnicastServiceFactory;
import io.atomix.cluster.protocol.GroupMembershipEvent;
import io.atomix.cluster.protocol.SwimMembershipProtocol;
import io.atomix.utils.Version;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.test.util.junit.AutoCloseResources;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;
import net.jodah.concurrentunit.ConcurrentTestCase;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

@AutoCloseResources
/* loaded from: input_file:io/atomix/cluster/protocol/SwimProtocolTest.class */
public class SwimProtocolTest extends ConcurrentTestCase {
    private static final Duration GOSSIP_INTERVAL = Duration.ofMillis(25);
    private static final Duration PROBE_INTERVAL = Duration.ofMillis(100);
    private static final Duration PROBE_TIMEOUT = Duration.ofMillis(200);
    private static final Duration FAILURE_INTERVAL = Duration.ofMillis(1000);
    private static final Duration SYNC_INTERVAL = Duration.ofMillis(1000);
    private Member member1;
    private Member member2;
    private Member member3;
    private Collection<Member> members;
    private Collection<Node> nodes;
    private final Version version1 = Version.from("1.0.0");
    private final Version version2 = Version.from("2.0.0");
    private final Map<MemberId, SwimMembershipProtocol> protocols = Maps.newConcurrentMap();
    private TestMessagingServiceFactory messagingServiceFactory = new TestMessagingServiceFactory();
    private TestUnicastServiceFactory unicastServiceFactory = new TestUnicastServiceFactory();
    private Map<MemberId, TestGroupMembershipEventListener> listeners = Maps.newConcurrentMap();

    @AutoCloseResources.AutoCloseResource
    private final MeterRegistry meterRegistry = new SimpleMeterRegistry();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/protocol/SwimProtocolTest$TestGroupMembershipEventListener.class */
    public static final class TestGroupMembershipEventListener implements GroupMembershipEventListener {
        private final BlockingDeque<GroupMembershipEvent> queue = new LinkedBlockingDeque(100);

        private TestGroupMembershipEventListener() {
        }

        public void event(GroupMembershipEvent groupMembershipEvent) {
            this.queue.add(groupMembershipEvent);
        }

        GroupMembershipEvent nextEvent() throws InterruptedException {
            return this.queue.poll(10L, TimeUnit.SECONDS);
        }

        public void clear() {
            this.queue.clear();
        }
    }

    private Member member(String str, String str2, int i, Version version) {
        return new SwimMembershipProtocol.SwimMember(MemberId.from(str), new Address(str2, i), (String) null, (String) null, (String) null, new Properties(), version, System.currentTimeMillis());
    }

    @Before
    public void reset() {
        this.messagingServiceFactory = new TestMessagingServiceFactory();
        this.unicastServiceFactory = new TestUnicastServiceFactory();
        this.member1 = member("1", "localhost", 5001, this.version1);
        this.member2 = member("2", "localhost", 5002, this.version1);
        this.member3 = member("3", "localhost", 5003, this.version1);
        this.members = Arrays.asList(this.member1, this.member2, this.member3);
        this.nodes = this.members;
        this.listeners = Maps.newConcurrentMap();
    }

    @After
    public void cleanup() {
        this.members.forEach(this::stopProtocol);
    }

    @Test
    public void shouldReceiveMemberAddedOnSingleNode() throws Exception {
        startProtocol(this.member1);
        checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member1);
        checkMembers(this.member1, this.member1);
    }

    @Test
    public void shouldReceiveMultipleEventsOnTwoNodeCluster() throws Exception {
        startProtocol(this.member1);
        startProtocol(this.member2);
        checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member2);
        checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member1);
        checkMembers(this.member2, this.member1, this.member2);
        checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member1);
        checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member2);
        checkMembers(this.member1, this.member1, this.member2);
    }

    @Test
    public void shouldReceiveMultipleEventsOnThreeNodeCluster() throws Exception {
        startProtocol(this.member1);
        startProtocol(this.member2);
        startProtocol(this.member3);
        checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member2);
        checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED);
        checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED);
        checkMembers(this.member2, this.member1, this.member2, this.member3);
        checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member1);
        checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED);
        checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED);
        checkMembers(this.member1, this.member1, this.member2, this.member3);
        checkEvent(this.member3, GroupMembershipEvent.Type.MEMBER_ADDED, this.member3);
        checkEvent(this.member3, GroupMembershipEvent.Type.MEMBER_ADDED);
        checkEvent(this.member3, GroupMembershipEvent.Type.MEMBER_ADDED);
        checkMembers(this.member3, this.member1, this.member2, this.member3);
    }

    @Test
    public void shouldRemoveNodeOnPartition() throws Exception {
        startProtocol(this.member1);
        startProtocol(this.member2);
        startProtocol(this.member3);
        awaitMembers(this.member3, this.member1, this.member2, this.member3);
        awaitMembers(this.member2, this.member1, this.member2, this.member3);
        awaitMembers(this.member1, this.member1, this.member2, this.member3);
        clearEvents(this.member1, this.member2, this.member3);
        partition(this.member3);
        checkEvent(this.member1, GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member3);
        checkEvent(this.member2, GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member3);
        checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_REMOVED, this.member3);
        checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_REMOVED, this.member3);
    }

    @Test
    public void testSwimProtocol() throws Exception {
        startProtocol(this.member1);
        startProtocol(this.member2);
        startProtocol(this.member3);
        awaitMembers(this.member3, this.member1, this.member2, this.member3);
        awaitMembers(this.member2, this.member1, this.member2, this.member3);
        awaitMembers(this.member1, this.member1, this.member2, this.member3);
        clearEvents(this.member1, this.member2, this.member3);
        partition(this.member3);
        awaitMembers(this.member2, this.member1, this.member2);
        awaitMembers(this.member1, this.member1, this.member2);
        clearEvents(this.member1, this.member2);
        checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member2));
        checkMembers(this.member3, this.member3);
        heal(this.member3);
        checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member3);
        checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member3);
        checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2));
        partition(this.member1, this.member2);
        heal(this.member1, this.member2);
        this.member1.properties().put("foo", "bar");
        checkEvent(this.member1, GroupMembershipEvent.Type.METADATA_CHANGED, this.member1);
        checkEvent(this.member2, GroupMembershipEvent.Type.METADATA_CHANGED, this.member1);
        checkEvent(this.member3, GroupMembershipEvent.Type.METADATA_CHANGED, this.member1);
    }

    @Test
    public void shouldRemoveOldMemberVersions() throws InterruptedException {
        startProtocol(this.member1);
        startProtocol(this.member2);
        awaitMembers(this.member2, this.member1, this.member2);
        awaitMembers(this.member1, this.member1, this.member2);
        clearEvents(this.member1, this.member2);
        stopProtocol(this.member2);
        Member member = member((String) this.member2.id().id(), this.member2.address().host(), this.member2.address().port(), this.version2);
        startProtocol(member);
        Awaitility.await().atMost(Duration.ofSeconds(2L)).untilAsserted(() -> {
            checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_REMOVED, this.member2);
        });
        checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, member);
    }

    @Test
    public void shouldSynchronizePeriodically() throws InterruptedException {
        startProtocol(this.member1);
        startProtocol(this.member2);
        SwimMembershipProtocol startProtocol = startProtocol(this.member3);
        checkEvents(this.member1, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member3));
        checkEvents(this.member2, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member3));
        checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member3));
        partition(this.member3);
        checkEvents(this.member1, new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member3), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member3));
        checkEvents(this.member2, new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member3), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member3));
        checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member2));
        this.member1.properties().put("newProperty", 1);
        checkEvents(this.member1, new GroupMembershipEvent(GroupMembershipEvent.Type.METADATA_CHANGED, this.member1));
        checkEvents(this.member2, new GroupMembershipEvent(GroupMembershipEvent.Type.METADATA_CHANGED, this.member1));
        Thread.sleep(GOSSIP_INTERVAL.toMillis());
        heal(this.member2, this.member3);
        checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member3);
        checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2));
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            return Boolean.valueOf(hasNewProperty(startProtocol));
        });
    }

    private boolean hasNewProperty(SwimMembershipProtocol swimMembershipProtocol) {
        Object obj;
        Member member = swimMembershipProtocol.getMember(this.member1.id());
        return (member == null || (obj = member.properties().get("newProperty")) == null || Integer.parseInt(obj.toString()) != 1) ? false : true;
    }

    private SwimMembershipProtocol startProtocol(Member member) {
        return startProtocol(member, UnaryOperator.identity());
    }

    private SwimMembershipProtocol startProtocol(Member member, UnaryOperator<SwimMembershipProtocolConfig> unaryOperator) {
        SwimMembershipProtocol swimMembershipProtocol = new SwimMembershipProtocol((SwimMembershipProtocolConfig) unaryOperator.apply(new SwimMembershipProtocolConfig().setGossipInterval(GOSSIP_INTERVAL).setProbeInterval(PROBE_INTERVAL).setProbeTimeout(PROBE_TIMEOUT).setFailureTimeout(FAILURE_INTERVAL).setSyncInterval(SYNC_INTERVAL)), this.meterRegistry);
        TestGroupMembershipEventListener testGroupMembershipEventListener = new TestGroupMembershipEventListener();
        this.listeners.put(member.id(), testGroupMembershipEventListener);
        swimMembershipProtocol.addListener(testGroupMembershipEventListener);
        TestBootstrapService testBootstrapService = new TestBootstrapService((MessagingService) this.messagingServiceFactory.newMessagingService(member.address()).start().join(), (UnicastService) this.unicastServiceFactory.newUnicastService(member.address()).start().join());
        BootstrapDiscoveryProvider bootstrapDiscoveryProvider = new BootstrapDiscoveryProvider(this.nodes);
        bootstrapDiscoveryProvider.join(testBootstrapService, member).join();
        swimMembershipProtocol.join(testBootstrapService, (NodeDiscoveryService) new DefaultNodeDiscoveryService(testBootstrapService, member, bootstrapDiscoveryProvider).start().join(), member).join();
        this.protocols.put(member.id(), swimMembershipProtocol);
        return swimMembershipProtocol;
    }

    private void stopProtocol(Member member) {
        SwimMembershipProtocol remove = this.protocols.remove(member.id());
        if (remove != null) {
            remove.leave(member).join();
        }
    }

    private void partition(Member member) {
        this.unicastServiceFactory.partition(member.address());
        this.messagingServiceFactory.partition(member.address());
    }

    private void partition(Member member, Member member2) {
        this.unicastServiceFactory.partition(member.address(), member2.address());
        this.messagingServiceFactory.partition(member.address(), member2.address());
    }

    private void heal(Member member) {
        this.unicastServiceFactory.heal(member.address());
        this.messagingServiceFactory.heal(member.address());
    }

    private void heal(Member member, Member member2) {
        this.unicastServiceFactory.heal(member.address(), member2.address());
        this.messagingServiceFactory.heal(member.address(), member2.address());
    }

    private void checkMembers(Member member, Member... memberArr) {
        Assert.assertEquals(Sets.newHashSet(memberArr), this.protocols.get(member.id()).getMembers());
    }

    private void awaitMembers(Member member, Member... memberArr) {
        SwimMembershipProtocol swimMembershipProtocol = this.protocols.get(member.id());
        HashSet newHashSet = Sets.newHashSet(memberArr);
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> {
            return Boolean.valueOf(newHashSet.equals(swimMembershipProtocol.getMembers()));
        });
    }

    private void clearEvents(Member... memberArr) {
        for (Member member : memberArr) {
            this.listeners.get(member.id()).clear();
        }
    }

    private void checkEvents(Member member, GroupMembershipEvent... groupMembershipEventArr) throws InterruptedException {
        HashMultiset create = HashMultiset.create(Arrays.asList(groupMembershipEventArr));
        for (int i = 0; i < groupMembershipEventArr.length; i++) {
            GroupMembershipEvent nextEvent = nextEvent(member);
            if (!create.remove(nextEvent)) {
                throw new AssertionError("Unexpected event " + String.valueOf(nextEvent));
            }
        }
    }

    private void checkEvent(Member member, GroupMembershipEvent.Type type) throws InterruptedException {
        checkEvent(member, type, null);
    }

    private void checkEvent(Member member, GroupMembershipEvent.Type type, Member member2) throws InterruptedException {
        GroupMembershipEvent nextEvent = nextEvent(member);
        Assertions.assertThat(nextEvent).isNotNull();
        Assertions.assertThat(nextEvent.type()).isEqualTo(type);
        if (member2 != null) {
            Assertions.assertThat(nextEvent.member()).isEqualTo(member2);
        }
    }

    private GroupMembershipEvent nextEvent(Member member) throws InterruptedException {
        TestGroupMembershipEventListener testGroupMembershipEventListener = this.listeners.get(member.id());
        if (testGroupMembershipEventListener != null) {
            return testGroupMembershipEventListener.nextEvent();
        }
        return null;
    }
}
