/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.raft;

import com.google.common.collect.Maps;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.raft.RaftServer;
import io.atomix.raft.cluster.RaftMember;
import io.atomix.raft.metrics.RaftRoleMetrics;
import io.atomix.raft.partition.RaftPartitionConfig;
import io.atomix.raft.primitive.TestMember;
import io.atomix.raft.protocol.PollRequest;
import io.atomix.raft.protocol.RaftServerProtocol;
import io.atomix.raft.protocol.TestRaftProtocolFactory;
import io.atomix.raft.protocol.TestRaftServerProtocol;
import io.atomix.raft.roles.LeaderRole;
import io.atomix.raft.roles.RaftRole;
import io.atomix.raft.snapshot.InMemorySnapshot;
import io.atomix.raft.snapshot.TestSnapshotStore;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.storage.log.entry.InitialEntry;
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
import io.camunda.zeebe.journal.CorruptedJournalException;
import io.camunda.zeebe.snapshots.ReceivableSnapshotStore;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthReport;
import java.io.File;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.jodah.concurrentunit.ConcurrentTestCase;
import org.apache.commons.lang3.RandomStringUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RaftTest
extends ConcurrentTestCase {
    private static final Logger LOGGER = LoggerFactory.getLogger(RaftTest.class);
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private volatile int nextId;
    private volatile List<RaftMember> members;
    private volatile List<RaftServer> servers = new ArrayList<RaftServer>();
    private volatile TestRaftProtocolFactory protocolFactory;
    private volatile ThreadContext context;
    private volatile long position = 0L;
    private Path directory;
    private final Map<MemberId, TestRaftServerProtocol> serverProtocols = Maps.newConcurrentMap();

    @Before
    @After
    public void clearTests() throws Exception {
        this.servers.forEach(s -> {
            try {
                if (s.isRunning()) {
                    s.shutdown().get(10L, TimeUnit.SECONDS);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        this.directory = this.temporaryFolder.newFolder().toPath();
        if (this.context != null) {
            this.context.close();
        }
        this.members = new ArrayList<RaftMember>();
        this.nextId = 0;
        this.servers = new ArrayList<RaftServer>();
        this.context = new SingleThreadContext("raft-test-messaging-%d");
        this.protocolFactory = new TestRaftProtocolFactory();
    }

    private List<RaftServer> createServers(int nodes) throws Throwable {
        ArrayList<RaftServer> servers = new ArrayList<RaftServer>();
        for (int i = 0; i < nodes; ++i) {
            this.members.add(this.nextMember(RaftMember.Type.ACTIVE));
        }
        CountDownLatch latch = new CountDownLatch(nodes);
        for (int i = 0; i < nodes; ++i) {
            RaftServer server = this.createServer(this.members.get(i).memberId());
            if (this.members.get(i).getType() == RaftMember.Type.ACTIVE) {
                server.bootstrap((Collection)this.members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(latch::countDown);
            }
            servers.add(server);
        }
        Assertions.assertThat((boolean)latch.await(30L * (long)nodes, TimeUnit.SECONDS)).isTrue();
        return servers;
    }

    private RaftMember nextMember(RaftMember.Type type) {
        return new TestMember(this.nextNodeId(), type);
    }

    private MemberId nextNodeId() {
        return MemberId.from((String)String.valueOf(++this.nextId));
    }

    private RaftServer createServer(MemberId memberId) {
        return this.createServer(memberId, b -> b.withStorage(this.createStorage(memberId)));
    }

    private RaftServer createServer(MemberId memberId, Function<RaftServer.Builder, RaftServer.Builder> configurator) {
        TestRaftServerProtocol protocol = this.protocolFactory.newServerProtocol(memberId);
        RaftServer.Builder defaults = RaftServer.builder((MemberId)memberId).withMembershipService((ClusterMembershipService)Mockito.mock(ClusterMembershipService.class)).withProtocol((RaftServerProtocol)protocol).withPartitionConfig(new RaftPartitionConfig().setElectionTimeout(Duration.ofSeconds(1L)).setHeartbeatInterval(Duration.ofMillis(100L)));
        RaftServer server = (RaftServer)configurator.apply(defaults).build();
        this.serverProtocols.put(memberId, protocol);
        this.servers.add(server);
        return server;
    }

    private RaftStorage createStorage(MemberId memberId) {
        return this.createStorage(memberId, Function.identity());
    }

    private RaftStorage createStorage(MemberId memberId, Function<RaftStorage.Builder, RaftStorage.Builder> configurator) {
        File directory = new File(this.directory.toFile(), memberId.toString());
        RaftStorage.Builder defaults = RaftStorage.builder().withDirectory(directory).withSnapshotStore((ReceivableSnapshotStore)new TestSnapshotStore(new AtomicReference<InMemorySnapshot>())).withMaxSegmentSize(10240);
        return configurator.apply(defaults).build();
    }

    @Test
    @Ignore
    public void testTransferLeadership() throws Throwable {
        List<RaftServer> servers = this.createServers(3);
        RaftServer leader = this.getLeader(servers).orElseThrow();
        this.awaitAppendEntries(leader, 1000);
        RaftServer follower = servers.stream().filter(RaftServer::isFollower).findFirst().get();
        follower.promote().thenRun(() -> this.resume());
        this.await(15000L, 1001);
        Assertions.assertThat((boolean)follower.isLeader()).isTrue();
    }

    @Test
    public void testDemoteLeader() throws Throwable {
        List<RaftServer> servers = this.createServers(3);
        RaftServer leader = servers.stream().filter(RaftServer::isLeader).findFirst().orElseThrow();
        MemberId leaderId = leader.cluster().getLocalMember().memberId();
        RaftServer follower = servers.stream().filter(RaftServer::isFollower).findFirst().orElseThrow();
        MemberId followerId = follower.cluster().getLocalMember().memberId();
        follower.cluster().getMember(leaderId).addTypeChangeListener(t -> {
            this.threadAssertEquals(t, RaftMember.Type.PASSIVE);
            this.resume();
            LOGGER.debug("Leader {} changed to passive on {}", (Object)leaderId, (Object)followerId);
        });
        leader.cluster().getLocalMember().demote(RaftMember.Type.PASSIVE).whenComplete((ignored, error) -> {
            this.threadAssertNull(error);
            this.resume();
            LOGGER.debug("Leader {} demoted to passive", (Object)leaderId);
        });
        this.await(15000L, 2);
    }

    @Test
    public void testTwoOfThreeNodeSubmitCommand() throws Throwable {
        this.testSubmitCommand(2, 3);
    }

    private void testSubmitCommand(int live, int total) throws Throwable {
        Optional<RaftServer> leader = this.getLeader(this.createServers(live, total));
        this.appendEntry(leader.orElseThrow());
    }

    private List<RaftServer> createServers(int live, int total) throws Throwable {
        int i;
        ArrayList<RaftServer> servers = new ArrayList<RaftServer>();
        for (i = 0; i < total; ++i) {
            this.members.add(this.nextMember(RaftMember.Type.ACTIVE));
        }
        for (i = 0; i < live; ++i) {
            RaftServer server = this.createServer(this.members.get(i).memberId());
            if (this.members.get(i).getType() == RaftMember.Type.ACTIVE) {
                server.bootstrap((Collection)this.members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(() -> this.resume());
            }
            servers.add(server);
        }
        this.await(30000L * (long)live, live);
        return servers;
    }

    @Test
    public void testThreeOfFourNodeSubmitCommand() throws Throwable {
        this.testSubmitCommand(3, 4);
    }

    @Test
    public void testThreeOfFiveNodeSubmitCommand() throws Throwable {
        this.testSubmitCommand(3, 5);
    }

    @Test
    public void testThreeNodesSequentiallyStart() throws Throwable {
        int i;
        for (i = 0; i < 3; ++i) {
            this.members.add(this.nextMember(RaftMember.Type.ACTIVE));
        }
        for (i = 0; i < 3; ++i) {
            RaftServer server = this.createServer(this.members.get(i).memberId());
            server.bootstrap((Collection)this.members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(() -> this.resume());
            Thread.sleep(500L);
        }
        this.await(6000L, 3);
    }

    @Test
    public void testThreeNodeManyEventsDoNotMissHeartbeats() throws Throwable {
        this.createServers(3);
        RaftServer leader = this.getLeader(this.servers).orElseThrow();
        this.appendEntry(leader);
        double startMissedHeartBeats = RaftRoleMetrics.getHeartbeatMissCount((String)"1");
        this.appendEntries(leader, 1000);
        double missedHeartBeats = RaftRoleMetrics.getHeartbeatMissCount((String)"1");
        Assert.assertThat((Object)0.0, (Matcher)Is.is((Object)(missedHeartBeats - startMissedHeartBeats)));
    }

    @Test
    public void testRoleChangeNotificationAfterInitialEntryOnLeader() throws Throwable {
        List<RaftServer> servers = this.createServers(3);
        RaftServer previousLeader = this.getLeader(servers).get();
        long previousLeaderTerm = previousLeader.getTerm();
        CountDownLatch transitionCompleted = new CountDownLatch(1);
        servers.forEach(server -> server.addRoleChangeListener((role, term) -> {
            if (term > previousLeaderTerm) {
                this.assertLastReadInitialEntry(role, term, (RaftServer)server, transitionCompleted);
            }
        }));
        previousLeader.stepDown();
        Assertions.assertThat((boolean)transitionCompleted.await(1000L, TimeUnit.SECONDS)).isTrue();
    }

    private Optional<RaftServer> getLeader(List<RaftServer> servers) {
        return servers.stream().filter(s -> s.getRole() == RaftServer.Role.LEADER).findFirst();
    }

    private List<RaftServer> getFollowers(List<RaftServer> servers) {
        return servers.stream().filter(s -> s.getRole() == RaftServer.Role.FOLLOWER).collect(Collectors.toList());
    }

    private void assertLastReadInitialEntry(RaftServer.Role role, long term, RaftServer server, CountDownLatch transitionCompleted) {
        if (role == RaftServer.Role.LEADER) {
            RaftLog raftLog = server.getContext().getLog();
            RaftLogReader raftLogReader = raftLog.openCommittedReader();
            raftLogReader.seek(raftLog.getLastIndex());
            IndexedRaftLogEntry entry = (IndexedRaftLogEntry)raftLogReader.next();
            Assertions.assertThat((Object)entry.entry()).isInstanceOf(InitialEntry.class);
            Assertions.assertThat((long)entry.term()).isEqualTo(term);
            transitionCompleted.countDown();
        }
    }

    @Test
    public void testNotifyOnFailure() throws Throwable {
        List<RaftServer> servers = this.createServers(1);
        RaftServer server = servers.get(0);
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch secondLatch = new CountDownLatch(1);
        server.addFailureListener((FailureListener)new LatchFailureListener(firstLatch));
        server.addFailureListener((FailureListener)new LatchFailureListener(secondLatch));
        server.getContext().getThreadContext().execute(() -> {
            throw new RuntimeException("injected failure");
        });
        Assertions.assertThat((boolean)firstLatch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat((boolean)secondLatch.await(1L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat((Comparable)server.getRole()).isEqualTo((Object)RaftServer.Role.INACTIVE);
    }

    @Test
    public void shouldDetectCorruptionOnStart() throws Throwable {
        RaftServer leader = this.createServers(1).get(0);
        File directory = leader.getContext().getStorage().directory();
        this.appendEntry(leader);
        Optional<File> optLog = Arrays.stream(directory.listFiles()).filter(f -> f.getName().endsWith(".log")).findFirst();
        Assertions.assertThat(optLog).isPresent();
        File log = optLog.get();
        leader.shutdown().join();
        Files.writeString(log.toPath(), (CharSequence)"i am become corrupt, destroyer of worlds", new OpenOption[0]);
        MemberId memberId = this.members.get(0).memberId();
        AssertionsForClassTypes.assertThatThrownBy(() -> this.recreateServer(leader, memberId)).isInstanceOf(CorruptedJournalException.class);
    }

    private RaftServer recreateServer(RaftServer server, MemberId memberId) {
        Function<RaftStorage.Builder, RaftStorage.Builder> storageConfig = c -> c.withDirectory(server.getContext().getStorage().directory());
        RaftStorage storage = this.createStorage(memberId, storageConfig);
        Function<RaftServer.Builder, RaftServer.Builder> serverConfig = b -> b.withStorage(storage);
        return this.createServer(memberId, serverConfig);
    }

    @Test
    public void shouldTriggerHeartbeatTimeouts() throws Throwable {
        List<RaftServer> servers = this.createServers(3);
        Awaitility.await((String)"A leader exists").until(() -> servers.stream().filter(server -> server.isLeader()).findAny().isPresent());
        List<RaftServer> followers = this.getFollowers(servers);
        RaftServer follower = followers.get(0);
        MemberId followerId = follower.getContext().getCluster().getLocalMember().memberId();
        LongAdder pollCount = new LongAdder();
        TestRaftServerProtocol followerServer = this.serverProtocols.get(followerId);
        followerServer.interceptRequest(PollRequest.class, r -> {
            pollCount.increment();
            return CompletableFuture.failedFuture(new ConnectException());
        });
        this.protocolFactory.blockMessagesTo(followerId);
        long timeout = follower.getContext().getElectionTimeout().multipliedBy(4L).toMillis();
        Awaitility.await().timeout(Duration.ofMillis(timeout)).untilAdder(pollCount, Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(2L)));
    }

    @Test
    public void shouldReSendPollRequestOnTimeouts() throws Throwable {
        List<RaftServer> servers = this.createServers(3);
        List<RaftServer> followers = this.getFollowers(servers);
        MemberId followerId = followers.get(0).getContext().getCluster().getLocalMember().memberId();
        LongAdder pollCount = new LongAdder();
        TestRaftServerProtocol followerServer = this.serverProtocols.get(followerId);
        this.protocolFactory.blockMessagesTo(followerId);
        followerServer.interceptRequest(PollRequest.class, r -> {
            pollCount.increment();
            return CompletableFuture.failedFuture(new TimeoutException());
        });
        Awaitility.await().timeout(Duration.ofSeconds(5L)).untilAdder(pollCount, Matchers.greaterThan((Comparable)Long.valueOf(2L)));
        pollCount.reset();
        Awaitility.await().timeout(Duration.ofSeconds(5L)).untilAdder(pollCount, Matchers.greaterThan((Comparable)Long.valueOf(2L)));
    }

    @Test
    public void shouldNotifyListenerWhenNoTransitionIsOngoing() throws Throwable {
        CountDownLatch listenerLatch = new CountDownLatch(1);
        AtomicReference<Object> roleWithinListener = new AtomicReference<Object>(null);
        AtomicLong termWithinListener = new AtomicLong(-1L);
        RaftServer server = this.createServers(1).get(0);
        Assertions.assertThat((boolean)server.isLeader()).isTrue();
        server.addRoleChangeListener((role, term) -> {
            roleWithinListener.set(role);
            termWithinListener.set(term);
            listenerLatch.countDown();
        });
        Assertions.assertThat((boolean)listenerLatch.await(10L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat((Comparable)((RaftServer.Role)roleWithinListener.get())).isEqualTo((Object)server.getRole());
        Assertions.assertThat((long)termWithinListener.get()).isEqualTo(server.getTerm());
    }

    private void appendEntries(RaftServer leader, int count) {
        for (int i = 0; i < count; ++i) {
            this.appendEntryAsync(leader, 1024);
        }
    }

    private long appendEntry(RaftServer leader) throws Exception {
        return this.appendEntry(leader, 1024);
    }

    private long appendEntry(RaftServer leader, int entrySize) throws Exception {
        RaftRole raftRole = leader.getContext().getRaftRole();
        if (raftRole instanceof LeaderRole) {
            TestAppendListener appendListener = this.appendEntry(entrySize, (LeaderRole)raftRole);
            return appendListener.awaitCommit();
        }
        throw new IllegalArgumentException("Expected to append entry on leader, " + leader.getContext().getName() + " was not the leader!");
    }

    private void appendEntryAsync(RaftServer leader, int entrySize) {
        RaftRole raftRole = leader.getContext().getRaftRole();
        if (raftRole instanceof LeaderRole) {
            this.appendEntry(entrySize, (LeaderRole)raftRole);
            return;
        }
        throw new IllegalArgumentException("Expected to append entry on leader, " + leader.getContext().getName() + " was not the leader!");
    }

    private TestAppendListener appendEntry(int entrySize, LeaderRole leaderRole) {
        TestAppendListener appendListener = new TestAppendListener();
        ++this.position;
        leaderRole.appendEntry(this.position, this.position + 10L, ByteBuffer.wrap(RandomStringUtils.random((int)entrySize).getBytes()), (ZeebeLogAppender.AppendListener)appendListener);
        this.position += 10L;
        return appendListener;
    }

    private void awaitAppendEntries(RaftServer newLeader, int i) throws Exception {
        this.appendEntries(newLeader, i - 1);
        this.appendEntry(newLeader);
    }

    private static class LatchFailureListener
    implements FailureListener {
        private final CountDownLatch latch;

        LatchFailureListener(CountDownLatch latch) {
            this.latch = latch;
        }

        public void onFailure(HealthReport report) {
            this.latch.countDown();
        }

        public void onRecovered() {
        }

        public void onUnrecoverableFailure(HealthReport report) {
        }
    }

    private static final class TestAppendListener
    implements ZeebeLogAppender.AppendListener {
        private final CompletableFuture<Long> commitFuture = new CompletableFuture();

        private TestAppendListener() {
        }

        public void onWriteError(Throwable error) {
            Assert.fail((String)("Unexpected write error: " + error.getMessage()));
        }

        public void onCommit(long index, long highestPosition) {
            this.commitFuture.complete(index);
        }

        public void onCommitError(long index, Throwable error) {
            Assert.fail((String)("Unexpected write error: " + error.getMessage()));
        }

        public long awaitCommit() throws Exception {
            return this.commitFuture.get(30L, TimeUnit.SECONDS);
        }
    }
}

