/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.protocol;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.TestBootstrapService;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.discovery.NodeDiscoveryProvider;
import io.atomix.cluster.discovery.NodeDiscoveryService;
import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.UnicastService;
import io.atomix.cluster.messaging.impl.TestMessagingServiceFactory;
import io.atomix.cluster.messaging.impl.TestUnicastServiceFactory;
import io.atomix.cluster.protocol.GroupMembershipEvent;
import io.atomix.cluster.protocol.GroupMembershipEventListener;
import io.atomix.cluster.protocol.SwimMembershipProtocol;
import io.atomix.cluster.protocol.SwimMembershipProtocolConfig;
import io.atomix.utils.Version;
import io.atomix.utils.event.EventListener;
import io.atomix.utils.net.Address;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;
import net.jodah.concurrentunit.ConcurrentTestCase;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SwimProtocolTest
extends ConcurrentTestCase {
    private static final Duration GOSSIP_INTERVAL = Duration.ofMillis(25L);
    private static final Duration PROBE_INTERVAL = Duration.ofMillis(100L);
    private static final Duration PROBE_TIMEOUT = Duration.ofMillis(200L);
    private static final Duration FAILURE_INTERVAL = Duration.ofMillis(1000L);
    private static final Duration SYNC_INTERVAL = Duration.ofMillis(1000L);
    private final Version version1 = Version.from((String)"1.0.0");
    private final Version version2 = Version.from((String)"2.0.0");
    private final Map<MemberId, SwimMembershipProtocol> protocols = Maps.newConcurrentMap();
    private TestMessagingServiceFactory messagingServiceFactory = new TestMessagingServiceFactory();
    private TestUnicastServiceFactory unicastServiceFactory = new TestUnicastServiceFactory();
    private Member member1;
    private Member member2;
    private Member member3;
    private Collection<Member> members;
    private Collection<Node> nodes;
    private Map<MemberId, TestGroupMembershipEventListener> listeners = Maps.newConcurrentMap();

    private Member member(String id, String host, int port, Version version) {
        return new SwimMembershipProtocol.SwimMember(MemberId.from((String)id), new Address(host, port), null, null, null, new Properties(), version, System.currentTimeMillis());
    }

    @Before
    public void reset() {
        this.messagingServiceFactory = new TestMessagingServiceFactory();
        this.unicastServiceFactory = new TestUnicastServiceFactory();
        this.member1 = this.member("1", "localhost", 5001, this.version1);
        this.member2 = this.member("2", "localhost", 5002, this.version1);
        this.member3 = this.member("3", "localhost", 5003, this.version1);
        this.members = Arrays.asList(this.member1, this.member2, this.member3);
        this.nodes = this.members;
        this.listeners = Maps.newConcurrentMap();
    }

    @After
    public void cleanup() {
        this.members.forEach(this::stopProtocol);
    }

    @Test
    public void shouldReceiveMemberAddedOnSingleNode() throws Exception {
        this.startProtocol(this.member1, this.member1.id().toString());
        this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member1);
        this.checkMembers(this.member1, this.member1);
    }

    @Test
    public void shouldReceiveMultipleEventsOnTwoNodeCluster() throws Exception {
        this.startProtocol(this.member1, this.member1.id().toString());
        this.startProtocol(this.member2, this.member2.id().toString());
        this.checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member2);
        this.checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member1);
        this.checkMembers(this.member2, this.member1, this.member2);
        this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member1);
        this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member2);
        this.checkMembers(this.member1, this.member1, this.member2);
    }

    @Test
    public void shouldReceiveMultipleEventsOnThreeNodeCluster() throws Exception {
        this.startProtocol(this.member1, this.member1.id().toString());
        this.startProtocol(this.member2, this.member2.id().toString());
        this.startProtocol(this.member3, this.member3.id().toString());
        this.checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member2);
        this.checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED);
        this.checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED);
        this.checkMembers(this.member2, this.member1, this.member2, this.member3);
        this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member1);
        this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED);
        this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED);
        this.checkMembers(this.member1, this.member1, this.member2, this.member3);
        this.checkEvent(this.member3, GroupMembershipEvent.Type.MEMBER_ADDED, this.member3);
        this.checkEvent(this.member3, GroupMembershipEvent.Type.MEMBER_ADDED);
        this.checkEvent(this.member3, GroupMembershipEvent.Type.MEMBER_ADDED);
        this.checkMembers(this.member3, this.member1, this.member2, this.member3);
    }

    @Test
    public void shouldRemoveNodeOnPartition() throws Exception {
        this.startProtocol(this.member1, this.member1.id().toString());
        this.startProtocol(this.member2, this.member2.id().toString());
        this.startProtocol(this.member3, this.member3.id().toString());
        this.awaitMembers(this.member3, this.member1, this.member2, this.member3);
        this.awaitMembers(this.member2, this.member1, this.member2, this.member3);
        this.awaitMembers(this.member1, this.member1, this.member2, this.member3);
        this.clearEvents(this.member1, this.member2, this.member3);
        this.partition(this.member3);
        this.checkEvent(this.member1, GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member3);
        this.checkEvent(this.member2, GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member3);
        this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_REMOVED, this.member3);
        this.checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_REMOVED, this.member3);
    }

    @Test
    public void testSwimProtocol() throws Exception {
        this.startProtocol(this.member1, this.member1.id().toString());
        this.startProtocol(this.member2, this.member2.id().toString());
        this.startProtocol(this.member3, this.member3.id().toString());
        this.awaitMembers(this.member3, this.member1, this.member2, this.member3);
        this.awaitMembers(this.member2, this.member1, this.member2, this.member3);
        this.awaitMembers(this.member1, this.member1, this.member2, this.member3);
        this.clearEvents(this.member1, this.member2, this.member3);
        this.partition(this.member3);
        this.awaitMembers(this.member2, this.member1, this.member2);
        this.awaitMembers(this.member1, this.member1, this.member2);
        this.clearEvents(this.member1, this.member2);
        this.checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member2));
        this.checkMembers(this.member3, this.member3);
        this.heal(this.member3);
        this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, this.member3);
        this.checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member3);
        this.checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2));
        this.partition(this.member1, this.member2);
        this.heal(this.member1, this.member2);
        this.member1.properties().put("foo", "bar");
        this.checkEvent(this.member1, GroupMembershipEvent.Type.METADATA_CHANGED, this.member1);
        this.checkEvent(this.member2, GroupMembershipEvent.Type.METADATA_CHANGED, this.member1);
        this.checkEvent(this.member3, GroupMembershipEvent.Type.METADATA_CHANGED, this.member1);
    }

    @Test
    public void shouldRemoveOldMemberVersions() throws InterruptedException {
        this.startProtocol(this.member1, this.member1.id().toString());
        this.startProtocol(this.member2, this.member2.id().toString());
        this.awaitMembers(this.member2, this.member1, this.member2);
        this.awaitMembers(this.member1, this.member1, this.member2);
        this.clearEvents(this.member1, this.member2);
        this.stopProtocol(this.member2);
        Member member = this.member((String)((Object)this.member2.id().id()), this.member2.address().host(), this.member2.address().port(), this.version2);
        this.startProtocol(member, "member2-" + (String)((Object)this.member2.id().id()));
        Awaitility.await().atMost(Duration.ofSeconds(2L)).untilAsserted(() -> this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_REMOVED, this.member2));
        this.checkEvent(this.member1, GroupMembershipEvent.Type.MEMBER_ADDED, member);
    }

    @Test
    public void shouldSynchronizePeriodically() throws InterruptedException {
        this.startProtocol(this.member1, this.member1.id().toString());
        this.startProtocol(this.member2, this.member2.id().toString());
        SwimMembershipProtocol protocol3 = this.startProtocol(this.member3, this.member3.id().toString());
        this.checkEvents(this.member1, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member3));
        this.checkEvents(this.member2, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member3));
        this.checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member3));
        this.partition(this.member3);
        this.checkEvents(this.member1, new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member3), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member3));
        this.checkEvents(this.member2, new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member3), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member3));
        this.checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member1), new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, this.member2), new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, this.member2));
        this.member1.properties().put("newProperty", (Object)1);
        this.checkEvents(this.member1, new GroupMembershipEvent(GroupMembershipEvent.Type.METADATA_CHANGED, this.member1));
        this.checkEvents(this.member2, new GroupMembershipEvent(GroupMembershipEvent.Type.METADATA_CHANGED, this.member1));
        Thread.sleep(GOSSIP_INTERVAL.toMillis());
        this.heal(this.member2, this.member3);
        this.checkEvent(this.member2, GroupMembershipEvent.Type.MEMBER_ADDED, this.member3);
        this.checkEvents(this.member3, new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.member2));
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> this.hasNewProperty(protocol3));
    }

    private boolean hasNewProperty(SwimMembershipProtocol protocol3) {
        Member memberOne = protocol3.getMember(this.member1.id());
        if (memberOne != null) {
            Object newProperty = memberOne.properties().get("newProperty");
            return newProperty != null && Integer.parseInt(newProperty.toString()) == 1;
        }
        return false;
    }

    private SwimMembershipProtocol startProtocol(Member member, String actorSchedulerName) {
        return this.startProtocol(member, UnaryOperator.identity(), actorSchedulerName);
    }

    private SwimMembershipProtocol startProtocol(Member member, UnaryOperator<SwimMembershipProtocolConfig> configurator, String actorSchedulerName) {
        SwimMembershipProtocol protocol = new SwimMembershipProtocol((SwimMembershipProtocolConfig)configurator.apply(new SwimMembershipProtocolConfig().setGossipInterval(GOSSIP_INTERVAL).setProbeInterval(PROBE_INTERVAL).setProbeTimeout(PROBE_TIMEOUT).setFailureTimeout(FAILURE_INTERVAL).setSyncInterval(SYNC_INTERVAL)), "testingActorSchedulerName");
        TestGroupMembershipEventListener listener = new TestGroupMembershipEventListener();
        this.listeners.put(member.id(), listener);
        protocol.addListener((EventListener)listener);
        TestBootstrapService bootstrap = new TestBootstrapService((MessagingService)this.messagingServiceFactory.newMessagingService(member.address()).start().join(), (UnicastService)this.unicastServiceFactory.newUnicastService(member.address()).start().join());
        BootstrapDiscoveryProvider provider = new BootstrapDiscoveryProvider(this.nodes);
        provider.join((BootstrapService)bootstrap, (Node)member).join();
        NodeDiscoveryService discovery = (NodeDiscoveryService)new DefaultNodeDiscoveryService((BootstrapService)bootstrap, (Node)member, (NodeDiscoveryProvider)provider).start().join();
        protocol.join((BootstrapService)bootstrap, discovery, member).join();
        this.protocols.put(member.id(), protocol);
        return protocol;
    }

    private void stopProtocol(Member member) {
        SwimMembershipProtocol protocol = this.protocols.remove(member.id());
        if (protocol != null) {
            protocol.leave(member).join();
        }
    }

    private void partition(Member member) {
        this.unicastServiceFactory.partition(member.address());
        this.messagingServiceFactory.partition(member.address());
    }

    private void partition(Member member1, Member member2) {
        this.unicastServiceFactory.partition(member1.address(), member2.address());
        this.messagingServiceFactory.partition(member1.address(), member2.address());
    }

    private void heal(Member member) {
        this.unicastServiceFactory.heal(member.address());
        this.messagingServiceFactory.heal(member.address());
    }

    private void heal(Member member1, Member member2) {
        this.unicastServiceFactory.heal(member1.address(), member2.address());
        this.messagingServiceFactory.heal(member1.address(), member2.address());
    }

    private void checkMembers(Member member, Member ... members) {
        SwimMembershipProtocol protocol = this.protocols.get(member.id());
        Assert.assertEquals((Object)Sets.newHashSet((Object[])members), (Object)protocol.getMembers());
    }

    private void awaitMembers(Member member, Member ... members) {
        SwimMembershipProtocol protocol = this.protocols.get(member.id());
        HashSet expectedMembers = Sets.newHashSet((Object[])members);
        Awaitility.await().atMost(Duration.ofSeconds(5L)).until(() -> expectedMembers.equals(protocol.getMembers()));
    }

    private void clearEvents(Member ... members) {
        for (Member member : members) {
            this.listeners.get(member.id()).clear();
        }
    }

    private void checkEvents(Member member, GroupMembershipEvent ... types) throws InterruptedException {
        HashMultiset events = HashMultiset.create(Arrays.asList(types));
        for (int i = 0; i < types.length; ++i) {
            GroupMembershipEvent event = this.nextEvent(member);
            if (!events.remove((Object)event)) {
                throw new AssertionError((Object)("Unexpected event " + String.valueOf(event)));
            }
        }
    }

    private void checkEvent(Member member, GroupMembershipEvent.Type type) throws InterruptedException {
        this.checkEvent(member, type, null);
    }

    private void checkEvent(Member member, GroupMembershipEvent.Type type, Member value) throws InterruptedException {
        GroupMembershipEvent event = this.nextEvent(member);
        Assertions.assertThat((Object)event).isNotNull();
        Assertions.assertThat((Comparable)((GroupMembershipEvent.Type)event.type())).isEqualTo((Object)type);
        if (value != null) {
            Assertions.assertThat((Object)event.member()).isEqualTo((Object)value);
        }
    }

    private GroupMembershipEvent nextEvent(Member member) throws InterruptedException {
        TestGroupMembershipEventListener listener = this.listeners.get(member.id());
        return listener != null ? listener.nextEvent() : null;
    }

    private static final class TestGroupMembershipEventListener
    implements GroupMembershipEventListener {
        private final BlockingDeque<GroupMembershipEvent> queue = new LinkedBlockingDeque<GroupMembershipEvent>(100);

        private TestGroupMembershipEventListener() {
        }

        public void event(GroupMembershipEvent event) {
            this.queue.add(event);
        }

        GroupMembershipEvent nextEvent() throws InterruptedException {
            return this.queue.poll(10L, TimeUnit.SECONDS);
        }

        public void clear() {
            this.queue.clear();
        }
    }
}

