/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.raft;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.raft.RaftApplicationEntryCommittedPositionListener;
import io.atomix.raft.RaftCommitListener;
import io.atomix.raft.RaftServer;
import io.atomix.raft.cluster.RaftMember;
import io.atomix.raft.impl.RaftContext;
import io.atomix.raft.partition.RaftPartitionConfig;
import io.atomix.raft.primitive.TestMember;
import io.atomix.raft.protocol.PersistedRaftRecord;
import io.atomix.raft.protocol.RaftServerProtocol;
import io.atomix.raft.protocol.ReplicatableJournalRecord;
import io.atomix.raft.protocol.TestRaftProtocolFactory;
import io.atomix.raft.protocol.TestRaftServerProtocol;
import io.atomix.raft.roles.LeaderRole;
import io.atomix.raft.roles.RaftRole;
import io.atomix.raft.snapshot.InMemorySnapshot;
import io.atomix.raft.snapshot.TestSnapshotStore;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.storage.log.entry.ApplicationEntry;
import io.atomix.raft.storage.log.entry.RaftEntry;
import io.atomix.raft.storage.log.entry.SerializedApplicationEntry;
import io.atomix.raft.zeebe.EntryValidator;
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.atomix.utils.AbstractIdentifier;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotStore;
import io.camunda.zeebe.snapshots.ReceivableSnapshotStore;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.micrometer.MicrometerUtil;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.mockito.Mockito;

public final class RaftRule
extends ExternalResource {
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private volatile int nextId;
    private volatile List<RaftMember> members;
    private Map<String, Long> memberLog;
    private final Map<String, RaftServer> servers = new HashMap<String, RaftServer>();
    private volatile TestRaftProtocolFactory protocolFactory;
    private volatile ThreadContext context;
    private Path directory;
    private final int nodeCount;
    private volatile long highestCommit;
    private final AtomicReference<CommitAwaiter> commitAwaiterRef = new AtomicReference();
    private long position;
    private EntryValidator entryValidator = new EntryValidator.NoopEntryValidator();
    private Map<String, AtomicReference<InMemorySnapshot>> snapshots;
    private Map<String, TestSnapshotStore> snapshotStores;
    private final Configurator configurator;
    private final Random random = new Random();
    private final MeterRegistry meterRegistry = new SimpleMeterRegistry();

    private RaftRule(int nodeCount, Configurator configurator) {
        this.nodeCount = nodeCount;
        this.configurator = configurator;
    }

    public static RaftRule withBootstrappedNodes(int nodeCount, Configurator configurator) {
        if (nodeCount < 1) {
            throw new IllegalArgumentException("Expected to have at least one node to configure.");
        }
        return new RaftRule(nodeCount, configurator);
    }

    public static RaftRule withBootstrappedNodes(int nodeCount) {
        return new RaftRule(nodeCount, new Configurator(){});
    }

    public RaftRule setEntryValidator(EntryValidator entryValidator) {
        this.entryValidator = entryValidator;
        return this;
    }

    public Statement apply(Statement base, Description description) {
        Statement statement = super.apply(base, description);
        return this.temporaryFolder.apply(statement, description);
    }

    protected void before() throws Throwable {
        this.directory = this.temporaryFolder.newFolder().toPath();
        this.position = 0L;
        this.members = new ArrayList<RaftMember>();
        this.memberLog = new ConcurrentHashMap<String, Long>();
        this.snapshotStores = new HashMap<String, TestSnapshotStore>();
        this.snapshots = new HashMap<String, AtomicReference<InMemorySnapshot>>();
        this.nextId = 0;
        this.context = new SingleThreadContext("raft-test-messaging-%d");
        this.protocolFactory = new TestRaftProtocolFactory();
        if (this.nodeCount > 0) {
            this.createServers(this.nodeCount, this.configurator);
        }
    }

    protected void after() {
        try {
            CompletableFuture.allOf((CompletableFuture[])this.servers.values().stream().map(RaftServer::shutdown).toArray(CompletableFuture[]::new)).get(30L, TimeUnit.SECONDS);
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.servers.clear();
        this.context.close();
        this.context = null;
        this.members.clear();
        this.nextId = 0;
        this.protocolFactory = null;
        this.highestCommit = 0L;
        this.commitAwaiterRef.set(null);
        this.memberLog.clear();
        this.memberLog = null;
        this.position = 0L;
        this.directory = null;
        MicrometerUtil.close((MeterRegistry)this.meterRegistry);
    }

    private RaftMember nextMember(RaftMember.Type type) {
        return new TestMember(this.nextNodeId(), type);
    }

    private MemberId nextNodeId() {
        return MemberId.from((String)String.valueOf(++this.nextId));
    }

    private List<RaftServer> createServers(int nodes, Configurator configurator) throws Exception {
        ArrayList<RaftServer> servers = new ArrayList<RaftServer>();
        for (int i = 0; i < nodes; ++i) {
            this.members.add(this.nextMember(RaftMember.Type.ACTIVE));
        }
        CountDownLatch latch = new CountDownLatch(nodes);
        for (int i = 0; i < nodes; ++i) {
            RaftMember raftMember = this.members.get(i);
            RaftServer server = this.createServer(raftMember.memberId(), configurator);
            ((CompletableFuture)server.bootstrap((Collection)this.members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenAccept(this::addCommitListener)).thenRun(latch::countDown);
            servers.add(server);
        }
        latch.await(30L, TimeUnit.SECONDS);
        return servers;
    }

    public String shutdownFollower() throws Exception {
        RaftServer follower = this.getFollower().orElseThrow();
        this.shutdownServer(follower);
        return follower.name();
    }

    public Set<String> getNodes() {
        return this.servers.keySet();
    }

    public void joinCluster(String nodeId) throws Exception {
        RaftMember member = this.getRaftMember(nodeId);
        ((CompletableFuture)this.createServer(member.memberId(), this.configurator).bootstrap(this.getMemberIds()).thenAccept(this::addCommitListener)).get(30L, TimeUnit.SECONDS);
    }

    public void bootstrapNode(String nodeId) throws Exception {
        this.bootstrapNode(nodeId, this.configurator);
    }

    public CompletableFuture<Void> bootstrapNodeAsync(String nodeId) {
        return this.bootstrapNodeAsync(nodeId, this.configurator);
    }

    public CompletableFuture<Void> bootstrapNodeAsync(String nodeId, Configurator configurator) {
        RaftMember member = this.getRaftMember(nodeId);
        return this.createServer(member.memberId(), configurator).bootstrap(this.getMemberIds()).thenAccept(this::addCommitListener);
    }

    public void bootstrapNode(String nodeId, Configurator configurator) throws Exception {
        this.bootstrapNodeAsync(nodeId, configurator).get(30L, TimeUnit.SECONDS);
    }

    public void bootstrapNodeWithMemberIds(String nodeId, List<MemberId> memberIds) throws Exception {
        RaftMember member = this.getRaftMember(nodeId);
        ((CompletableFuture)this.createServer(member.memberId(), this.configurator).bootstrap(memberIds).thenAccept(this::addCommitListener)).get(30L, TimeUnit.SECONDS);
    }

    public String shutdownLeader() throws Exception {
        RaftServer leader = this.getLeader().orElseThrow();
        this.shutdownServer(leader);
        return leader.name();
    }

    public void restartLeader() throws Exception {
        this.awaitNewLeader();
        String leader = this.shutdownLeader();
        this.joinCluster(leader);
    }

    public List<MemberId> getMemberIds() {
        return this.members.stream().map(RaftMember::memberId).collect(Collectors.toList());
    }

    public Collection<RaftServer> getServers() {
        return this.servers.values();
    }

    public RaftServer getServer(String id) {
        return this.servers.get(id);
    }

    public void shutdownServer(RaftServer raftServer) throws Exception {
        this.shutdownServer(raftServer.name());
    }

    public void shutdownServer(String nodeName) throws Exception {
        this.servers.remove(nodeName).shutdown().get(30L, TimeUnit.SECONDS);
        this.memberLog.remove(nodeName);
        this.snapshotStores.remove(nodeName);
    }

    private RaftMember getRaftMember(String memberId) {
        return this.members.stream().filter(member -> ((String)((Object)member.memberId().id())).equals(memberId)).findFirst().orElseThrow();
    }

    public void takeSnapshot(long index) {
        this.takeSnapshot(index, 1);
    }

    public void takeSnapshot(long index, int size) {
        this.awaitNewLeader();
        for (RaftServer raftServer : this.servers.values()) {
            this.takeSnapshot(raftServer, index, size);
        }
    }

    public void takeCompactingSnapshot(long index) {
        this.takeCompactingSnapshot(index, 1);
    }

    public void takeCompactingSnapshot(long index, int size) {
        this.awaitNewLeader();
        for (RaftServer raftServer : this.servers.values()) {
            this.takeCompactingSnapshot(raftServer, index, size);
        }
    }

    public Optional<PersistedSnapshot> takeCompactingSnapshot(RaftServer raftServer, long index, int size) {
        RaftContext raftContext = raftServer.getContext();
        RaftLog raftLog = raftContext.getLog();
        long previousFirstIndex = raftLog.getFirstIndex();
        Optional<PersistedSnapshot> snapshot = this.takeSnapshot(raftServer, index, size);
        Awaitility.await((String)"until compaction has occurred").untilAsserted(() -> Assertions.assertThat((long)raftLog.getFirstIndex()).isGreaterThan(previousFirstIndex));
        return snapshot;
    }

    public Optional<PersistedSnapshot> takeSnapshot(RaftServer raftServer, long index, int size) {
        if (!raftServer.isRunning()) {
            return Optional.empty();
        }
        RaftContext raftContext = raftServer.getContext();
        MemberId memberId = raftServer.cluster().getLocalMember().memberId();
        TestSnapshotStore snapshotStore = this.getSnapshotStore((String)((Object)memberId.id()));
        return Optional.of(InMemorySnapshot.newPersistedSnapshot(Integer.parseInt((String)((Object)memberId.id())), index, raftContext.getTerm(), size, snapshotStore));
    }

    private TestSnapshotStore getSnapshotStore(String memberId) {
        return this.snapshotStores.get(memberId);
    }

    private AtomicReference<InMemorySnapshot> getOrCreatePersistedSnapshot(String memberId) {
        return this.snapshots.computeIfAbsent(memberId, i -> new AtomicReference());
    }

    public boolean allNodesHaveSnapshotWithIndex(long index) {
        return this.servers.values().stream().map(RaftServer::getContext).map(RaftContext::getPersistedSnapshotStore).map(PersistedSnapshotStore::getCurrentSnapshotIndex).filter(idx -> idx == index).count() == (long)this.servers.values().size();
    }

    public PersistedSnapshot getSnapshotFromLeader() {
        RaftServer leader = this.getLeader().orElseThrow();
        RaftContext context = leader.getContext();
        ReceivableSnapshotStore snapshotStore = context.getPersistedSnapshotStore();
        return (PersistedSnapshot)snapshotStore.getLatestSnapshot().orElseThrow();
    }

    public PersistedSnapshot getSnapshotOnNode(String nodeId) {
        RaftServer raftServer = this.servers.get(nodeId);
        RaftContext context = raftServer.getContext();
        ReceivableSnapshotStore snapshotStore = context.getPersistedSnapshotStore();
        return (PersistedSnapshot)snapshotStore.getLatestSnapshot().orElseThrow();
    }

    public void awaitNewLeader() {
        this.waitUntil(() -> this.getLeader().isPresent(), 100);
    }

    private void addCommitListener(final RaftServer raftServer) {
        raftServer.getContext().addCommitListener(new RaftCommitListener(){

            public void onCommit(long index) {
                CommitAwaiter commitAwaiter;
                long currentIndex = index;
                RaftRule.this.memberLog.put(raftServer.name(), currentIndex);
                if (RaftRule.this.highestCommit < currentIndex) {
                    RaftRule.this.highestCommit = currentIndex;
                }
                if ((commitAwaiter = RaftRule.this.commitAwaiterRef.get()) != null && commitAwaiter.reachedCommit(currentIndex)) {
                    RaftRule.this.commitAwaiterRef.set(null);
                }
            }
        });
    }

    public Map<String, List<IndexedRaftLogEntry>> getMemberLogs() {
        HashMap<String, List<IndexedRaftLogEntry>> memberLogs = new HashMap<String, List<IndexedRaftLogEntry>>();
        for (RaftServer server : this.servers.values()) {
            if (!server.isRunning()) continue;
            RaftLog log = server.getContext().getLog();
            ArrayList<CopiedRaftLogEntry> entryList = new ArrayList<CopiedRaftLogEntry>();
            try (RaftLogReader raftLogReader = log.openUncommittedReader();){
                while (raftLogReader.hasNext()) {
                    IndexedRaftLogEntry indexedEntry = (IndexedRaftLogEntry)raftLogReader.next();
                    entryList.add(CopiedRaftLogEntry.of(indexedEntry));
                }
            }
            memberLogs.put(server.name(), entryList);
        }
        return memberLogs;
    }

    public void awaitSameLogSizeOnAllNodes(long lastIndex) {
        Awaitility.await((String)"awaitSameLogSizeOnAllNodes").until(() -> this.memberLog.values().stream().distinct().collect(Collectors.toList()), lastIndexes -> lastIndexes.size() == 1 && (Long)lastIndexes.get(0) == lastIndex);
    }

    private void waitUntil(BooleanSupplier condition, int retries) {
        this.waitUntil(condition, retries, () -> null);
    }

    private void waitUntil(BooleanSupplier condition, int retries, Supplier<String> errorMessage) {
        try {
            while (!condition.getAsBoolean() && retries > 0) {
                Thread.sleep(100L);
                --retries;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)condition.getAsBoolean()).as(errorMessage.get(), new Object[0])).isTrue();
    }

    public void awaitCommit(long commitIndex) throws Exception {
        this.awaitCommit(commitIndex, Duration.ofSeconds(30L));
    }

    public void awaitCommit(long commitIndex, Duration timeout) throws Exception {
        if (this.highestCommit >= commitIndex) {
            return;
        }
        CommitAwaiter commitAwaiter = new CommitAwaiter(commitIndex);
        this.commitAwaiterRef.set(commitAwaiter);
        commitAwaiter.awaitCommit(timeout);
    }

    public RaftServer createServer(MemberId memberId) {
        return this.createServer(memberId, this.configurator);
    }

    private RaftServer createServer(MemberId memberId, Configurator configurator) {
        TestRaftServerProtocol protocol = this.protocolFactory.newServerProtocol(memberId);
        RaftStorage storage = this.createStorage(memberId, configurator);
        RaftServer.Builder builder = RaftServer.builder((MemberId)memberId).withPartitionConfig(new RaftPartitionConfig().setElectionTimeout(Duration.ofSeconds(1L)).setHeartbeatInterval(Duration.ofMillis(100L))).withMembershipService((ClusterMembershipService)Mockito.mock(ClusterMembershipService.class)).withProtocol((RaftServerProtocol)protocol).withEntryValidator(this.entryValidator).withMeterRegistry(this.meterRegistry).withStorage(storage);
        configurator.configure(memberId, builder);
        RaftServer server = (RaftServer)builder.build();
        this.servers.put((String)((Object)memberId.id()), server);
        return server;
    }

    private RaftStorage createStorage(MemberId memberId, Configurator configurator) {
        File memberDirectory = this.getMemberDirectory(this.directory, memberId.toString());
        TestSnapshotStore snapshotStore = new TestSnapshotStore(this.getOrCreatePersistedSnapshot((String)((Object)memberId.id())));
        this.snapshotStores.put((String)((Object)memberId.id()), snapshotStore);
        configurator.configure(snapshotStore);
        RaftStorage.Builder builder = RaftStorage.builder((MeterRegistry)this.meterRegistry).withDirectory(memberDirectory).withMaxSegmentSize(10240).withFreeDiskSpace(100L).withSnapshotStore((ReceivableSnapshotStore)snapshotStore);
        return builder.build();
    }

    private File getMemberDirectory(Path directory, String s) {
        return new File(directory.toFile(), s);
    }

    public Optional<RaftServer> getLeader() {
        return this.servers.values().stream().filter(s -> s.getRole() == RaftServer.Role.LEADER).findFirst();
    }

    public Optional<RaftServer> getFollower() {
        return this.servers.values().stream().filter(s -> s.getRole() == RaftServer.Role.FOLLOWER).findFirst();
    }

    public long appendEntries(int count) throws Exception {
        for (int i = 0; i < count - 1; ++i) {
            this.appendEntry();
        }
        return this.appendEntry();
    }

    public long appendEntry() throws Exception {
        RaftServer leader = this.getLeader().orElseThrow();
        return this.appendEntry(leader, 1024);
    }

    public TestAppendListener appendEntryAsync() {
        RaftRole raftRole = this.getLeader().orElseThrow().getContext().getRaftRole();
        if (raftRole instanceof LeaderRole) {
            return this.appendEntry(1024, (LeaderRole)raftRole);
        }
        throw new IllegalStateException("Expected Leader to be a LeaderRole, was: " + String.valueOf(raftRole));
    }

    private long appendEntry(RaftServer leader, int entrySize) throws Exception {
        RaftRole raftRole = leader.getContext().getRaftRole();
        if (raftRole instanceof LeaderRole) {
            TestAppendListener testAppendListener = this.appendEntry(entrySize, (LeaderRole)raftRole);
            return testAppendListener.awaitCommit();
        }
        throw new IllegalArgumentException("Expected to append entry on leader, " + leader.getContext().getName() + " was not the leader!");
    }

    private TestAppendListener appendEntry(int entrySize, LeaderRole leaderRole) {
        TestAppendListener appendListener = new TestAppendListener();
        ++this.position;
        byte[] bytes = new byte[entrySize];
        this.random.nextBytes(bytes);
        leaderRole.appendEntry(this.position, this.position + 10L, ByteBuffer.wrap(bytes), (ZeebeLogAppender.AppendListener)appendListener);
        this.position += 10L;
        return appendListener;
    }

    public String toString() {
        return "RaftRule with " + this.nodeCount + " nodes.";
    }

    public void triggerDataLossOnNode(String node) throws IOException {
        String member = this.members.stream().map(RaftMember::memberId).map(AbstractIdentifier::id).filter(id -> id.equals(node)).findAny().orElseThrow();
        File memberDirectory = this.getMemberDirectory(this.directory, member);
        boolean deletedMemberDirectory = false;
        while (!deletedMemberDirectory) {
            try {
                FileUtil.deleteFolderIfExists((Path)memberDirectory.toPath());
                deletedMemberDirectory = true;
            }
            catch (DirectoryNotEmptyException e) {
                FileUtil.deleteFolderIfExists((Path)memberDirectory.toPath());
            }
        }
        this.snapshots.remove(node);
    }

    public PersistedSnapshotStore getPersistedSnapshotStore(String followerB) {
        return this.servers.get(followerB).getContext().getPersistedSnapshotStore();
    }

    public void addCommitListener(RaftCommitListener raftCommitListener) {
        this.servers.forEach((id, raft) -> raft.getContext().addCommitListener(raftCommitListener));
    }

    public void addCommittedEntryListener(RaftApplicationEntryCommittedPositionListener raftApplicationEntryCommittedPositionListener) {
        this.servers.forEach((id, raft) -> raft.getContext().addCommittedEntryListener(raftApplicationEntryCommittedPositionListener));
    }

    public void partition(RaftServer follower) {
        this.protocolFactory.partition(follower.cluster().getLocalMember().memberId());
    }

    public void reconnect(RaftServer follower) {
        this.protocolFactory.heal(follower.cluster().getLocalMember().memberId());
    }

    public static interface Configurator {
        default public void configure(MemberId id, RaftServer.Builder builder) {
        }

        default public void configure(TestSnapshotStore snapshotStore) {
        }
    }

    private record CopiedRaftLogEntry(long index, long term, RaftEntry entry) implements IndexedRaftLogEntry
    {
        private static CopiedRaftLogEntry of(IndexedRaftLogEntry entry) {
            RaftEntry copiedEntry;
            RaftEntry raftEntry = entry.entry();
            if (raftEntry instanceof SerializedApplicationEntry) {
                SerializedApplicationEntry app = (SerializedApplicationEntry)raftEntry;
                copiedEntry = new SerializedApplicationEntry(app.lowestPosition(), app.highestPosition(), BufferUtil.cloneBuffer((DirectBuffer)app.data()));
            } else {
                copiedEntry = entry.entry();
            }
            return new CopiedRaftLogEntry(entry.index(), entry.term(), copiedEntry);
        }

        public ApplicationEntry getApplicationEntry() {
            return (ApplicationEntry)this.entry;
        }

        public PersistedRaftRecord getPersistedRaftRecord() {
            throw new UnsupportedOperationException();
        }

        public ReplicatableJournalRecord getReplicatableJournalRecord() {
            throw new UnsupportedOperationException();
        }
    }

    private static final class CommitAwaiter {
        private final long awaitedIndex;
        private final CountDownLatch latch = new CountDownLatch(1);

        public CommitAwaiter(long index) {
            this.awaitedIndex = index;
        }

        public boolean reachedCommit(long currentIndex) {
            if (this.awaitedIndex <= currentIndex) {
                this.latch.countDown();
                return true;
            }
            return false;
        }

        public void awaitCommit(Duration timeout) throws Exception {
            this.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
        }

        public void awaitCommit() throws Exception {
            this.latch.await(30L, TimeUnit.SECONDS);
        }
    }

    public static final class TestAppendListener
    implements ZeebeLogAppender.AppendListener {
        private final CompletableFuture<Long> commitFuture = new CompletableFuture();

        public void onWriteError(Throwable error) {
            this.commitFuture.completeExceptionally(error);
        }

        public void onCommit(long index, long highestPosition) {
            this.commitFuture.complete(index);
        }

        public void onCommitError(long index, Throwable error) {
            this.commitFuture.completeExceptionally(error);
        }

        public long awaitCommit() throws Exception {
            return this.commitFuture.get(30L, TimeUnit.SECONDS);
        }

        public long awaitCommit(Duration duration) throws Exception {
            return this.commitFuture.get(duration.toMillis(), TimeUnit.MILLISECONDS);
        }
    }
}

