package io.atomix.raft;

import com.google.common.collect.Maps;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.raft.RaftServer;
import io.atomix.raft.cluster.RaftMember;
import io.atomix.raft.metrics.RaftRoleMetrics;
import io.atomix.raft.partition.RaftPartitionConfig;
import io.atomix.raft.primitive.TestMember;
import io.atomix.raft.protocol.PollRequest;
import io.atomix.raft.protocol.TestRaftProtocolFactory;
import io.atomix.raft.protocol.TestRaftServerProtocol;
import io.atomix.raft.roles.LeaderRole;
import io.atomix.raft.roles.RaftRole;
import io.atomix.raft.snapshot.TestSnapshotStore;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.storage.log.entry.InitialEntry;
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
import io.camunda.zeebe.journal.CorruptedJournalException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthReport;
import java.io.File;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.jodah.concurrentunit.ConcurrentTestCase;
import org.apache.commons.lang3.RandomStringUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/raft/RaftTest.class */
public class RaftTest extends ConcurrentTestCase {
    private static final Logger LOGGER = LoggerFactory.getLogger(RaftTest.class);
    private volatile int nextId;
    private volatile List<RaftMember> members;
    private volatile TestRaftProtocolFactory protocolFactory;
    private volatile ThreadContext context;
    private Path directory;

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private volatile List<RaftServer> servers = new ArrayList();
    private volatile long position = 0;
    private final Map<MemberId, TestRaftServerProtocol> serverProtocols = Maps.newConcurrentMap();

    /* loaded from: input_file:io/atomix/raft/RaftTest$LatchFailureListener.class */
    private static class LatchFailureListener implements FailureListener {
        private final CountDownLatch latch;

        LatchFailureListener(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public void onFailure(HealthReport healthReport) {
            this.latch.countDown();
        }

        public void onRecovered() {
        }

        public void onUnrecoverableFailure(HealthReport healthReport) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/raft/RaftTest$TestAppendListener.class */
    public static final class TestAppendListener implements ZeebeLogAppender.AppendListener {
        private final CompletableFuture<Long> commitFuture = new CompletableFuture<>();

        private TestAppendListener() {
        }

        public void onWriteError(Throwable th) {
            Assert.fail("Unexpected write error: " + th.getMessage());
        }

        public void onCommit(long j, long j2) {
            this.commitFuture.complete(Long.valueOf(j));
        }

        public void onCommitError(long j, Throwable th) {
            Assert.fail("Unexpected write error: " + th.getMessage());
        }

        public long awaitCommit() throws Exception {
            return this.commitFuture.get(30L, TimeUnit.SECONDS).longValue();
        }
    }

    @Before
    @After
    public void clearTests() throws Exception {
        this.servers.forEach(raftServer -> {
            try {
                if (raftServer.isRunning()) {
                    raftServer.shutdown().get(10L, TimeUnit.SECONDS);
                }
            } catch (Exception e) {
            }
        });
        this.directory = this.temporaryFolder.newFolder().toPath();
        if (this.context != null) {
            this.context.close();
        }
        this.members = new ArrayList();
        this.nextId = 0;
        this.servers = new ArrayList();
        this.context = new SingleThreadContext("raft-test-messaging-%d");
        this.protocolFactory = new TestRaftProtocolFactory();
    }

    private List<RaftServer> createServers(int i) throws Throwable {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            this.members.add(nextMember(RaftMember.Type.ACTIVE));
        }
        CountDownLatch countDownLatch = new CountDownLatch(i);
        for (int i3 = 0; i3 < i; i3++) {
            RaftServer createServer = createServer(this.members.get(i3).memberId());
            if (this.members.get(i3).getType() == RaftMember.Type.ACTIVE) {
                CompletableFuture bootstrap = createServer.bootstrap((Collection) this.members.stream().map((v0) -> {
                    return v0.memberId();
                }).collect(Collectors.toList()));
                Objects.requireNonNull(countDownLatch);
                bootstrap.thenRun(countDownLatch::countDown);
            }
            arrayList.add(createServer);
        }
        Assertions.assertThat(countDownLatch.await(30 * i, TimeUnit.SECONDS)).isTrue();
        return arrayList;
    }

    private RaftMember nextMember(RaftMember.Type type) {
        return new TestMember(nextNodeId(), type);
    }

    private MemberId nextNodeId() {
        int i = this.nextId + 1;
        this.nextId = i;
        return MemberId.from(String.valueOf(i));
    }

    private RaftServer createServer(MemberId memberId) {
        return createServer(memberId, builder -> {
            return builder.withStorage(createStorage(memberId));
        });
    }

    private RaftServer createServer(MemberId memberId, Function<RaftServer.Builder, RaftServer.Builder> function) {
        TestRaftServerProtocol newServerProtocol = this.protocolFactory.newServerProtocol(memberId);
        RaftServer raftServer = (RaftServer) function.apply(RaftServer.builder(memberId).withMembershipService((ClusterMembershipService) Mockito.mock(ClusterMembershipService.class)).withProtocol(newServerProtocol).withPartitionConfig(new RaftPartitionConfig().setElectionTimeout(Duration.ofSeconds(1L)).setHeartbeatInterval(Duration.ofMillis(100L)))).build();
        this.serverProtocols.put(memberId, newServerProtocol);
        this.servers.add(raftServer);
        return raftServer;
    }

    private RaftStorage createStorage(MemberId memberId) {
        return createStorage(memberId, Function.identity());
    }

    private RaftStorage createStorage(MemberId memberId, Function<RaftStorage.Builder, RaftStorage.Builder> function) {
        return function.apply(RaftStorage.builder().withDirectory(new File(this.directory.toFile(), memberId.toString())).withSnapshotStore(new TestSnapshotStore(new AtomicReference())).withMaxSegmentSize(10240)).build();
    }

    @Test
    @Ignore
    public void testTransferLeadership() throws Throwable {
        List<RaftServer> createServers = createServers(3);
        awaitAppendEntries(getLeader(createServers).orElseThrow(), 1000);
        RaftServer raftServer = createServers.stream().filter((v0) -> {
            return v0.isFollower();
        }).findFirst().get();
        raftServer.promote().thenRun(() -> {
            this.resume();
        });
        await(15000L, 1001);
        Assertions.assertThat(raftServer.isLeader()).isTrue();
    }

    @Test
    public void testDemoteLeader() throws Throwable {
        List<RaftServer> createServers = createServers(3);
        RaftServer orElseThrow = createServers.stream().filter((v0) -> {
            return v0.isLeader();
        }).findFirst().orElseThrow();
        MemberId memberId = orElseThrow.cluster().getLocalMember().memberId();
        RaftServer orElseThrow2 = createServers.stream().filter((v0) -> {
            return v0.isFollower();
        }).findFirst().orElseThrow();
        MemberId memberId2 = orElseThrow2.cluster().getLocalMember().memberId();
        orElseThrow2.cluster().getMember(memberId).addTypeChangeListener(type -> {
            threadAssertEquals(type, RaftMember.Type.PASSIVE);
            resume();
            LOGGER.debug("Leader {} changed to passive on {}", memberId, memberId2);
        });
        orElseThrow.cluster().getLocalMember().demote(RaftMember.Type.PASSIVE).whenComplete((r6, th) -> {
            threadAssertNull(th);
            resume();
            LOGGER.debug("Leader {} demoted to passive", memberId);
        });
        await(15000L, 2);
    }

    @Test
    public void testTwoOfThreeNodeSubmitCommand() throws Throwable {
        testSubmitCommand(2, 3);
    }

    private void testSubmitCommand(int i, int i2) throws Throwable {
        appendEntry(getLeader(createServers(i, i2)).orElseThrow());
    }

    private List<RaftServer> createServers(int i, int i2) throws Throwable {
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i2; i3++) {
            this.members.add(nextMember(RaftMember.Type.ACTIVE));
        }
        for (int i4 = 0; i4 < i; i4++) {
            RaftServer createServer = createServer(this.members.get(i4).memberId());
            if (this.members.get(i4).getType() == RaftMember.Type.ACTIVE) {
                createServer.bootstrap((Collection) this.members.stream().map((v0) -> {
                    return v0.memberId();
                }).collect(Collectors.toList())).thenRun(() -> {
                    this.resume();
                });
            }
            arrayList.add(createServer);
        }
        await(30000 * i, i);
        return arrayList;
    }

    @Test
    public void testThreeOfFourNodeSubmitCommand() throws Throwable {
        testSubmitCommand(3, 4);
    }

    @Test
    public void testThreeOfFiveNodeSubmitCommand() throws Throwable {
        testSubmitCommand(3, 5);
    }

    @Test
    public void testThreeNodesSequentiallyStart() throws Throwable {
        for (int i = 0; i < 3; i++) {
            this.members.add(nextMember(RaftMember.Type.ACTIVE));
        }
        for (int i2 = 0; i2 < 3; i2++) {
            createServer(this.members.get(i2).memberId()).bootstrap((Collection) this.members.stream().map((v0) -> {
                return v0.memberId();
            }).collect(Collectors.toList())).thenRun(() -> {
                this.resume();
            });
            Thread.sleep(500L);
        }
        await(6000L, 3);
    }

    @Test
    public void testThreeNodeManyEventsDoNotMissHeartbeats() throws Throwable {
        createServers(3);
        RaftServer orElseThrow = getLeader(this.servers).orElseThrow();
        appendEntry(orElseThrow);
        double heartbeatMissCount = RaftRoleMetrics.getHeartbeatMissCount("1");
        appendEntries(orElseThrow, 1000);
        Assert.assertThat(Double.valueOf(0.0d), Is.is(Double.valueOf(RaftRoleMetrics.getHeartbeatMissCount("1") - heartbeatMissCount)));
    }

    @Test
    public void testRoleChangeNotificationAfterInitialEntryOnLeader() throws Throwable {
        List<RaftServer> createServers = createServers(3);
        RaftServer raftServer = getLeader(createServers).get();
        long term = raftServer.getTerm();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createServers.forEach(raftServer2 -> {
            raftServer2.addRoleChangeListener((role, j) -> {
                if (j > term) {
                    assertLastReadInitialEntry(role, j, raftServer2, countDownLatch);
                }
            });
        });
        raftServer.stepDown();
        Assertions.assertThat(countDownLatch.await(1000L, TimeUnit.SECONDS)).isTrue();
    }

    private Optional<RaftServer> getLeader(List<RaftServer> list) {
        return list.stream().filter(raftServer -> {
            return raftServer.getRole() == RaftServer.Role.LEADER;
        }).findFirst();
    }

    private List<RaftServer> getFollowers(List<RaftServer> list) {
        return (List) list.stream().filter(raftServer -> {
            return raftServer.getRole() == RaftServer.Role.FOLLOWER;
        }).collect(Collectors.toList());
    }

    private void assertLastReadInitialEntry(RaftServer.Role role, long j, RaftServer raftServer, CountDownLatch countDownLatch) {
        if (role == RaftServer.Role.LEADER) {
            RaftLog log = raftServer.getContext().getLog();
            RaftLogReader openCommittedReader = log.openCommittedReader();
            openCommittedReader.seek(log.getLastIndex());
            IndexedRaftLogEntry indexedRaftLogEntry = (IndexedRaftLogEntry) openCommittedReader.next();
            Assertions.assertThat(indexedRaftLogEntry.entry()).isInstanceOf(InitialEntry.class);
            Assertions.assertThat(indexedRaftLogEntry.term()).isEqualTo(j);
            countDownLatch.countDown();
        }
    }

    @Test
    public void testNotifyOnFailure() throws Throwable {
        RaftServer raftServer = createServers(1).get(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        raftServer.addFailureListener(new LatchFailureListener(countDownLatch));
        raftServer.addFailureListener(new LatchFailureListener(countDownLatch2));
        raftServer.getContext().getThreadContext().execute(() -> {
            throw new RuntimeException("injected failure");
        });
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(countDownLatch2.await(1L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(raftServer.getRole()).isEqualTo(RaftServer.Role.INACTIVE);
    }

    @Test
    public void shouldDetectCorruptionOnStart() throws Throwable {
        RaftServer raftServer = createServers(1).get(0);
        File directory = raftServer.getContext().getStorage().directory();
        appendEntry(raftServer);
        Optional findFirst = Arrays.stream(directory.listFiles()).filter(file -> {
            return file.getName().endsWith(".log");
        }).findFirst();
        Assertions.assertThat(findFirst).isPresent();
        File file2 = (File) findFirst.get();
        raftServer.shutdown().join();
        Files.writeString(file2.toPath(), "i am become corrupt, destroyer of worlds", new OpenOption[0]);
        MemberId memberId = this.members.get(0).memberId();
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            recreateServer(raftServer, memberId);
        }).isInstanceOf(CorruptedJournalException.class);
    }

    private RaftServer recreateServer(RaftServer raftServer, MemberId memberId) {
        RaftStorage createStorage = createStorage(memberId, builder -> {
            return builder.withDirectory(raftServer.getContext().getStorage().directory());
        });
        return createServer(memberId, builder2 -> {
            return builder2.withStorage(createStorage);
        });
    }

    @Test
    public void shouldTriggerHeartbeatTimeouts() throws Throwable {
        RaftServer raftServer = getFollowers(createServers(3)).get(0);
        MemberId memberId = raftServer.getContext().getCluster().getLocalMember().memberId();
        LongAdder longAdder = new LongAdder();
        this.serverProtocols.get(memberId).interceptRequest(PollRequest.class, pollRequest -> {
            longAdder.increment();
            return CompletableFuture.failedFuture(new ConnectException());
        });
        this.protocolFactory.blockMessagesTo(memberId);
        Awaitility.await().timeout(Duration.ofMillis(raftServer.getContext().getElectionTimeout().multipliedBy(4L).toMillis())).untilAdder(longAdder, Matchers.greaterThan(2L));
    }

    @Test
    public void shouldReSendPollRequestOnTimeouts() throws Throwable {
        MemberId memberId = getFollowers(createServers(3)).get(0).getContext().getCluster().getLocalMember().memberId();
        LongAdder longAdder = new LongAdder();
        TestRaftServerProtocol testRaftServerProtocol = this.serverProtocols.get(memberId);
        this.protocolFactory.blockMessagesTo(memberId);
        testRaftServerProtocol.interceptRequest(PollRequest.class, pollRequest -> {
            longAdder.increment();
            return CompletableFuture.failedFuture(new TimeoutException());
        });
        Awaitility.await().timeout(Duration.ofSeconds(5L)).untilAdder(longAdder, Matchers.greaterThan(2L));
        longAdder.reset();
        Awaitility.await().timeout(Duration.ofSeconds(5L)).untilAdder(longAdder, Matchers.greaterThan(2L));
    }

    @Test
    public void shouldNotifyListenerWhenNoTransitionIsOngoing() throws Throwable {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicLong atomicLong = new AtomicLong(-1L);
        RaftServer raftServer = createServers(1).get(0);
        Assertions.assertThat(raftServer.isLeader()).isTrue();
        raftServer.addRoleChangeListener((role, j) -> {
            atomicReference.set(role);
            atomicLong.set(j);
            countDownLatch.countDown();
        });
        Assertions.assertThat(countDownLatch.await(10L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat((RaftServer.Role) atomicReference.get()).isEqualTo(raftServer.getRole());
        Assertions.assertThat(atomicLong.get()).isEqualTo(raftServer.getTerm());
    }

    private void appendEntries(RaftServer raftServer, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            appendEntryAsync(raftServer, 1024);
        }
    }

    private long appendEntry(RaftServer raftServer) throws Exception {
        return appendEntry(raftServer, 1024);
    }

    private long appendEntry(RaftServer raftServer, int i) throws Exception {
        RaftRole raftRole = raftServer.getContext().getRaftRole();
        if (raftRole instanceof LeaderRole) {
            return appendEntry(i, (LeaderRole) raftRole).awaitCommit();
        }
        throw new IllegalArgumentException("Expected to append entry on leader, " + raftServer.getContext().getName() + " was not the leader!");
    }

    private void appendEntryAsync(RaftServer raftServer, int i) {
        RaftRole raftRole = raftServer.getContext().getRaftRole();
        if (!(raftRole instanceof LeaderRole)) {
            throw new IllegalArgumentException("Expected to append entry on leader, " + raftServer.getContext().getName() + " was not the leader!");
        }
        appendEntry(i, (LeaderRole) raftRole);
    }

    private TestAppendListener appendEntry(int i, LeaderRole leaderRole) {
        TestAppendListener testAppendListener = new TestAppendListener();
        this.position++;
        leaderRole.appendEntry(this.position, this.position + 10, ByteBuffer.wrap(RandomStringUtils.random(i).getBytes()), testAppendListener);
        this.position += 10;
        return testAppendListener;
    }

    private void awaitAppendEntries(RaftServer raftServer, int i) throws Exception {
        appendEntries(raftServer, i - 1);
        appendEntry(raftServer);
    }
}
