/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.raft;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.raft.DeterministicSingleThreadContext;
import io.atomix.raft.RaftThreadContextFactory;
import io.atomix.raft.impl.RaftContext;
import io.atomix.raft.partition.RaftElectionConfig;
import io.atomix.raft.partition.RaftPartitionConfig;
import io.atomix.raft.protocol.ControllableRaftServerProtocol;
import io.atomix.raft.protocol.RaftServerProtocol;
import io.atomix.raft.roles.LeaderRole;
import io.atomix.raft.roles.RaftRole;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.zeebe.EntryValidator;
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.camunda.zeebe.journal.JournalException;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl;
import io.camunda.zeebe.snapshots.ReceivableSnapshotStore;
import io.camunda.zeebe.snapshots.testing.TestFileBasedSnapshotStore;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.collection.Tuple;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.jmock.lib.concurrent.DeterministicScheduler;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ControllableRaftContexts {
    private static final Logger LOG = LoggerFactory.getLogger((String)"TEST");
    private final Map<MemberId, ControllableRaftServerProtocol> serverProtocols = new HashMap<MemberId, ControllableRaftServerProtocol>();
    private final Map<MemberId, Queue<Tuple<Runnable, CompletableFuture<?>>>> messageQueue = new HashMap();
    private final Map<MemberId, DeterministicSingleThreadContext> deterministicExecutors = new HashMap<MemberId, DeterministicSingleThreadContext>();
    private Path directory;
    private Random random;
    private final int nodeCount;
    private final Map<MemberId, RaftContext> raftServers = new HashMap<MemberId, RaftContext>();
    private final Map<MemberId, TestFileBasedSnapshotStore> snapshotStores = new HashMap<MemberId, TestFileBasedSnapshotStore>();
    private Duration electionTimeout;
    private Duration heartbeatTimeout;
    private int nextEntry = 0;
    private final NavigableMap<Long, MemberId> leadersAtTerms = new TreeMap<Long, MemberId>();
    private final ZeebeLogAppender.AppendListener appendListener = (ZeebeLogAppender.AppendListener)Mockito.mock(ZeebeLogAppender.AppendListener.class);
    private final DataLossChecker dataLossChecker = new DataLossChecker(this.appendListener);

    public ControllableRaftContexts(int nodeCount) {
        this.nodeCount = nodeCount;
    }

    public Map<MemberId, RaftContext> getRaftServers() {
        return this.raftServers;
    }

    public RaftContext getRaftContext(int memberId) {
        return this.raftServers.get(MemberId.from((String)String.valueOf(memberId)));
    }

    public RaftContext getRaftContext(MemberId memberId) {
        return this.raftServers.get(memberId);
    }

    public void setup(Path directory, Random random) throws Exception {
        this.directory = directory;
        this.random = random;
        if (this.nodeCount > 0) {
            this.createRaftContexts(this.nodeCount, random);
        }
        this.joinRaftServers();
        this.electionTimeout = this.getRaftContext(0).getElectionTimeout();
        this.heartbeatTimeout = this.getRaftContext(0).getHeartbeatInterval();
        this.tickHeartbeatTimeout(0);
    }

    public void shutdown() throws IOException {
        this.snapshotStores.forEach((m, store) -> store.close());
        this.snapshotStores.clear();
        this.raftServers.forEach((m, c) -> c.close());
        this.raftServers.clear();
        this.serverProtocols.clear();
        this.deterministicExecutors.forEach((m, e) -> e.close());
        this.deterministicExecutors.clear();
        this.messageQueue.clear();
        this.leadersAtTerms.clear();
        this.directory = null;
    }

    private void joinRaftServers() throws InterruptedException, ExecutionException, TimeoutException {
        HashSet futures = new HashSet();
        Map<MemberId, RaftContext> servers = this.getRaftServers();
        ArrayList<MemberId> serverIds = new ArrayList<MemberId>(servers.keySet());
        long electionTimeout = servers.get(MemberId.from((String)String.valueOf(0))).getElectionTimeout().toMillis();
        Collections.sort(serverIds);
        servers.forEach((memberId, raftContext) -> futures.add(raftContext.getCluster().bootstrap((Collection)serverIds)));
        this.runUntilDone(0);
        this.getDeterministicScheduler(MemberId.from((String)String.valueOf(0))).tick(2L * electionTimeout, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> joinFuture = CompletableFuture.allOf((CompletableFuture[])futures.toArray(CompletableFuture[]::new));
        while (!joinFuture.isDone()) {
            this.processAllMessage();
            this.runUntilDone();
        }
        joinFuture.get(1L, TimeUnit.SECONDS);
    }

    private void createRaftContexts(int nodeCount, Random random) {
        for (int i = 0; i < nodeCount; ++i) {
            this.createRaftContextForMember(random, i);
        }
    }

    private RaftContext createRaftContextForMember(Random random, int nodeId) {
        MemberId memberId = MemberId.from((String)String.valueOf(nodeId));
        TestFileBasedSnapshotStore snapshotStore = new TestFileBasedSnapshotStore(nodeId, this.getMemberDirectory(this.directory, memberId.toString()).toPath().resolve("snapshots"), (ConcurrencyControl)new TestConcurrencyControl());
        this.snapshotStores.put(memberId, snapshotStore);
        RaftContext raftContext = this.createRaftContext(memberId, random, this.createStorage(memberId, cfg -> cfg.withSnapshotStore((ReceivableSnapshotStore)snapshotStore)));
        this.raftServers.put(memberId, raftContext);
        return raftContext;
    }

    public RaftContext createRaftContext(MemberId memberId, Random random, RaftStorage storage) {
        RaftContext raft = new RaftContext((String)((Object)memberId.id()) + "-partition-1", 1, memberId, (ClusterMembershipService)Mockito.mock(ClusterMembershipService.class, (MockSettings)Mockito.withSettings().stubOnly()), (RaftServerProtocol)new ControllableRaftServerProtocol(memberId, this.serverProtocols, this.messageQueue), storage, this.getRaftThreadContextFactory(memberId), () -> random, RaftElectionConfig.ofPriorityElection((int)this.nodeCount, (int)(Integer.parseInt((String)((Object)memberId.id())) + 1)), new RaftPartitionConfig());
        raft.setEntryValidator((EntryValidator)new EntryValidator.NoopEntryValidator());
        return raft;
    }

    private RaftThreadContextFactory getRaftThreadContextFactory(MemberId memberId) {
        return (factory, uncaughtExceptionHandler) -> this.deterministicExecutors.computeIfAbsent(memberId, m -> (DeterministicSingleThreadContext)DeterministicSingleThreadContext.createContext());
    }

    private RaftStorage createStorage(MemberId memberId, Function<RaftStorage.Builder, RaftStorage.Builder> configurator) {
        File memberDirectory = this.getMemberDirectory(this.directory, memberId.toString());
        RaftStorage.Builder defaults = RaftStorage.builder().withDirectory(memberDirectory).withMaxSegmentSize(10240).withFreeDiskSpace(100L);
        return configurator.apply(defaults).build();
    }

    private File getMemberDirectory(Path directory, String s) {
        return new File(directory.toFile(), s);
    }

    public ControllableRaftServerProtocol getServerProtocol(MemberId memberId) {
        return this.serverProtocols.get(memberId);
    }

    public ControllableRaftServerProtocol getServerProtocol(int memberId) {
        return this.getServerProtocol(MemberId.from((String)String.valueOf(memberId)));
    }

    public DeterministicScheduler getDeterministicScheduler(MemberId memberId) {
        return this.deterministicExecutors.get(memberId).getDeterministicScheduler();
    }

    public DeterministicScheduler getDeterministicScheduler(int memberId) {
        return this.getDeterministicScheduler(MemberId.from((String)String.valueOf(memberId)));
    }

    public void runUntilDone() {
        Set<MemberId> serverIds = this.raftServers.keySet();
        serverIds.forEach(memberId -> this.getDeterministicScheduler((MemberId)memberId).runUntilIdle());
    }

    public void runUntilDone(int memberId) {
        this.getServerProtocol(memberId).receiveAll();
        this.getDeterministicScheduler(memberId).runUntilIdle();
    }

    public void runUntilDone(MemberId memberId) {
        this.getDeterministicScheduler(memberId).runUntilIdle();
    }

    public void runNextTask(MemberId memberId) {
        DeterministicScheduler scheduler = this.getDeterministicScheduler(memberId);
        if (!scheduler.isIdle()) {
            scheduler.runNextPendingCommand();
        }
    }

    public void processAllMessage() {
        Set<MemberId> serverIds = this.raftServers.keySet();
        serverIds.forEach(memberId -> this.getServerProtocol((MemberId)memberId).receiveAll());
    }

    public void processAllMessage(MemberId memberId) {
        this.getServerProtocol(memberId).receiveAll();
    }

    public void processNextMessage(MemberId memberId) {
        this.getServerProtocol(memberId).receiveNextMessage();
    }

    public void tickElectionTimeout(int memberId) {
        this.tick(memberId, this.electionTimeout);
    }

    public void tickElectionTimeout(MemberId memberId) {
        this.tick(memberId, this.electionTimeout);
    }

    public void tickHeartbeatTimeout(int memberId) {
        this.tick(memberId, this.heartbeatTimeout);
    }

    public void tickHeartbeatTimeout(MemberId memberId) {
        this.tick(memberId, this.heartbeatTimeout);
    }

    public void tickHeartbeatTimeout() {
        this.tick(this.heartbeatTimeout);
    }

    public void tick(Duration time) {
        Set<MemberId> serverIds = this.raftServers.keySet();
        serverIds.forEach(memberId -> this.tick((MemberId)memberId, time));
    }

    public void tick(int memberId, Duration time) {
        this.getDeterministicScheduler(memberId).tick(time.toMillis(), TimeUnit.MILLISECONDS);
        this.getServerProtocol(memberId).tick(time.toMillis());
    }

    public void tick(MemberId memberId, Duration time) {
        this.getDeterministicScheduler(memberId).tick(time.toMillis(), TimeUnit.MILLISECONDS);
        this.getServerProtocol(memberId).tick(time.toMillis());
    }

    private void clientAppend(MemberId memberId) {
        RaftRole role = this.getRaftContext(memberId).getRaftRole();
        if (role instanceof LeaderRole) {
            LeaderRole leaderRole = (LeaderRole)role;
            LOG.info("Appending on leader {}", (Object)memberId.id());
            ByteBuffer data = ByteBuffer.allocate(4).putInt(0, this.nextEntry++);
            leaderRole.appendEntry((long)this.nextEntry, (long)this.nextEntry, data, (ZeebeLogAppender.AppendListener)this.dataLossChecker);
        }
    }

    public void clientAppendOnLeader() {
        MemberId leader;
        Optional leaderTerm = this.leadersAtTerms.keySet().stream().max(Long::compareTo);
        if (leaderTerm.isPresent() && (leader = (MemberId)this.leadersAtTerms.get(leaderTerm.get())) != null) {
            this.clientAppend(leader);
        }
    }

    public void snapshotAndCompact(MemberId memberId) {
        RaftContext raftContext = this.raftServers.get(memberId);
        TestFileBasedSnapshotStore testSnapshotStore = this.snapshotStores.get(memberId);
        long startIndex = Math.max(raftContext.getLog().getFirstIndex(), testSnapshotStore.getCurrentSnapshotIndex() + 1L);
        if (startIndex >= raftContext.getCommitIndex()) {
            return;
        }
        long snapshotIndex = this.random.nextLong(startIndex, raftContext.getCommitIndex());
        try (RaftLogReader reader = raftContext.getLog().openCommittedReader();){
            reader.seek(snapshotIndex);
            long term = ((IndexedRaftLogEntry)reader.next()).term();
            testSnapshotStore.newSnapshot(snapshotIndex, term, this.random.nextInt(1, 10), this.random);
            LOG.info("Snapshot taken at index {}. Current commit index is {}", (Object)snapshotIndex, (Object)raftContext.getCommitIndex());
        }
        raftContext.getThreadContext().execute(() -> raftContext.getLog().deleteUntil(snapshotIndex));
    }

    public void restart(MemberId memberId) {
        this.raftServers.get(memberId).close();
        this.deterministicExecutors.remove(memberId).close();
        this.snapshotStores.get(memberId).close();
        RaftContext newContext = this.createRaftContextForMember(this.random, Integer.parseInt((String)((Object)memberId.id())));
        newContext.getCluster().bootstrap(this.raftServers.keySet());
    }

    public void restartWithDataLoss(MemberId memberId) {
        LOG.info("Shutting down member {}", (Object)memberId.id());
        this.raftServers.get(memberId).close();
        this.deterministicExecutors.remove(memberId).close();
        RaftContext nodeBeforeRestart = this.raftServers.remove(memberId);
        this.waitUntilThereIsAnotherLeader(memberId);
        Path dataDirectory = nodeBeforeRestart.getStorage().directory().toPath();
        LOG.info("Deleting directory {} of member {}", (Object)dataDirectory, (Object)memberId.id());
        try {
            FileUtil.deleteFolderIfExists((Path)dataDirectory);
        }
        catch (IOException e) {
            LOG.error("Failed to delete directory {} of member {}", (Object)dataDirectory, (Object)memberId.id());
            throw new UncheckedIOException(e);
        }
        RaftContext newContext = this.createRaftContextForMember(this.random, Integer.parseInt((String)((Object)memberId.id())));
        newContext.getCluster().bootstrap(this.raftServers.keySet());
        this.raftServers.put(memberId, newContext);
        this.waitUntilMembersIsReadyIn(newContext, 100);
    }

    private void waitUntilThereIsAnotherLeader(MemberId memberId) {
        int steps = 500;
        while (steps-- > 0) {
            String currentLeader;
            Map.Entry<Long, MemberId> currentLeaderEntry = this.leadersAtTerms.lowerEntry(Long.MAX_VALUE);
            String string = currentLeader = currentLeaderEntry != null ? (String)((Object)currentLeaderEntry.getValue().id()) : null;
            if (!((String)((Object)memberId.id())).equals(currentLeader) && this.hasLeaderAtTheLatestTerm() && this.hasReplicatedAllEntries()) {
                MemberId leader = this.leadersAtTerms.lowerEntry(Long.MAX_VALUE).getValue();
                LOG.info("Current leader is {}. ", (Object)leader.id());
                break;
            }
            this.tick(Duration.ofMillis(50L));
            this.processAllMessage();
            this.runUntilDone();
        }
        Assertions.assertThat((boolean)this.hasLeaderAtTheLatestTerm()).isTrue();
        Assertions.assertThat((boolean)this.hasReplicatedAllEntries()).isTrue();
    }

    private void waitUntilMembersIsReadyIn(RaftContext member, int steps) {
        int iter = steps;
        while (member.getState() != RaftContext.State.READY && iter-- > 0) {
            this.runUntilDone();
            this.processAllMessage();
            this.tickHeartbeatTimeout();
        }
        ((AbstractComparableAssert)Assertions.assertThat((Comparable)member.getState()).describedAs("Member should be ready in %d steps, if there are no failures or timeouts".formatted(steps), new Object[0])).isEqualTo((Object)RaftContext.State.READY);
    }

    public void assertAllLogsEqual() {
        Map readers = this.raftServers.values().stream().collect(Collectors.toMap(Function.identity(), s -> s.getLog().openCommittedReader()));
        long commitIndexOnLeader = this.raftServers.values().stream().map(RaftContext::getCommitIndex).max(Long::compareTo).orElseThrow();
        for (long index = this.raftServers.values().stream().map(s -> s.getLog().getFirstIndex()).min(Long::compareTo).orElse(1L) - 1L; index < commitIndexOnLeader; ++index) {
            long nextIndex = index + 1L;
            Map<String, IndexedRaftLogEntry> entries = readers.keySet().stream().filter(s -> ((RaftLogReader)readers.get(s)).hasNext()).filter(s -> s.getLog().getFirstIndex() <= nextIndex).collect(Collectors.toMap(RaftContext::getName, s -> (IndexedRaftLogEntry)((RaftLogReader)readers.get(s)).next()));
            ((AbstractLongAssert)Assertions.assertThat((long)entries.values().stream().distinct().count()).withFailMessage(() -> "Expected to find the same entry at a committed index on all nodes, but found %s".formatted(entries))).isLessThanOrEqualTo(1L);
        }
        readers.values().forEach(RaftLogReader::close);
    }

    public void assertAtMostOneLeader() {
        this.raftServers.values().forEach(this::updateAndVerifyLeaderTerm);
    }

    private void updateAndVerifyLeaderTerm(RaftContext s) {
        long term = s.getTerm();
        if (s.getLeader() != null) {
            MemberId leader = s.getLeader().memberId();
            if (this.leadersAtTerms.containsKey(term)) {
                MemberId knownLeader = (MemberId)this.leadersAtTerms.get(term);
                ((AbstractComparableAssert)Assertions.assertThat((Comparable)knownLeader).withFailMessage("Found two leaders %s %s at term %s", new Object[]{knownLeader, leader, term})).isEqualTo((Object)leader);
            } else {
                this.leadersAtTerms.put(term, leader);
            }
        }
    }

    public boolean hasLeaderAtTheLatestTerm() {
        this.assertAtMostOneLeader();
        Long currentTerm = this.raftServers.values().stream().map(RaftContext::getTerm).max(Long::compareTo).orElseThrow();
        return this.leadersAtTerms.get(currentTerm) != null;
    }

    boolean hasCommittedAllEntries() {
        return this.hasCommittedAllEntries(this.raftServers);
    }

    boolean hasCommittedAllEntries(Map<MemberId, RaftContext> raftServers) {
        return raftServers.values().stream().allMatch(s -> {
            IndexedRaftLogEntry lastCommittedEntry = this.getLastCommittedEntry((RaftContext)s);
            IndexedRaftLogEntry lastUncommittedEntry = this.getLastUncommittedEntry((RaftContext)s);
            return lastUncommittedEntry != null && lastUncommittedEntry.equals((Object)lastCommittedEntry);
        });
    }

    boolean hasReplicatedAllEntries() {
        return this.hasReplicatedAllEntries(this.raftServers);
    }

    boolean hasReplicatedAllEntries(Map<MemberId, RaftContext> raftMembers) {
        boolean allReplicasHaveSameLastIndex = raftMembers.values().stream().map(RaftContext::getLog).map(RaftLog::getLastIndex).distinct().count() == 1L;
        boolean allReplicasHaveSameLastEntry = raftMembers.values().stream().map(this::getLastUncommittedEntry).distinct().count() == 1L;
        return allReplicasHaveSameLastIndex && allReplicasHaveSameLastEntry;
    }

    void assertAllEntriesCommittedAndReplicatedToAll() {
        this.assertAllEntriesCommittedAndReplicatedToAll(this.raftServers);
    }

    void assertAllEntriesCommittedAndReplicatedToAll(Map<MemberId, RaftContext> raftServers) {
        raftServers.forEach((memberId, raftServer) -> {
            IndexedRaftLogEntry lastCommittedEntry = this.getLastCommittedEntry((RaftContext)raftServer);
            IndexedRaftLogEntry lastUncommittedEntry = this.getLastUncommittedEntry((RaftContext)raftServer);
            ((ObjectAssert)Assertions.assertThat((Object)lastCommittedEntry).describedAs("All entries should be committed in %s", new Object[]{memberId.id()})).isEqualTo((Object)lastUncommittedEntry);
        });
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.hasReplicatedAllEntries(raftServers)).describedAs("All entries are replicated to all followers", new Object[0])).isTrue();
    }

    private IndexedRaftLogEntry getLastUncommittedEntry(RaftContext s) {
        try (RaftLogReader uncommittedReader = s.getLog().openUncommittedReader();){
            uncommittedReader.seekToLast();
            if (uncommittedReader.hasNext()) {
                IndexedRaftLogEntry indexedRaftLogEntry = (IndexedRaftLogEntry)uncommittedReader.next();
                return indexedRaftLogEntry;
            }
            IndexedRaftLogEntry indexedRaftLogEntry = null;
            return indexedRaftLogEntry;
        }
    }

    private IndexedRaftLogEntry getLastCommittedEntry(RaftContext s) {
        try (RaftLogReader committedReader = s.getLog().openCommittedReader();){
            committedReader.seekToLast();
            if (committedReader.hasNext()) {
                IndexedRaftLogEntry indexedRaftLogEntry = (IndexedRaftLogEntry)committedReader.next();
                return indexedRaftLogEntry;
            }
            IndexedRaftLogEntry indexedRaftLogEntry = null;
            return indexedRaftLogEntry;
        }
    }

    public void assertNoGapsInLog() {
        this.raftServers.keySet().forEach(this::assertNoGapsInLog);
    }

    private void assertNoGapsInLog(MemberId memberId) {
        long firstIndex;
        RaftContext s = this.raftServers.get(memberId);
        long nextIndex = firstIndex = s.getLog().getFirstIndex();
        try (RaftLogReader reader = s.getLog().openCommittedReader();){
            while (reader.hasNext()) {
                ((AbstractLongAssert)Assertions.assertThat((long)((IndexedRaftLogEntry)reader.next()).index()).describedAs("There is no gap in the log %s", new Object[]{memberId.id()})).isEqualTo(nextIndex);
                ++nextIndex;
            }
        }
        if (firstIndex != 1L) {
            long currentSnapshotIndex = this.snapshotStores.get(memberId).getCurrentSnapshotIndex();
            ((AbstractLongAssert)Assertions.assertThat((long)currentSnapshotIndex).describedAs("The log is compacted in %s. Hence a snapshot must exist.", new Object[0])).isGreaterThanOrEqualTo(firstIndex - 1L);
        }
    }

    public void assertAllMembersAreReady() {
        Assertions.assertThat((boolean)this.allMembersAreReady()).isTrue();
    }

    public boolean allMembersAreReady() {
        return this.raftServers.values().stream().map(RaftContext::getState).filter(state -> !RaftContext.State.READY.equals(state)).findAny().isEmpty();
    }

    public void assertNoJournalAppendErrors() {
        ((ZeebeLogAppender.AppendListener)Mockito.verify((Object)this.appendListener, (VerificationMode)Mockito.times((int)0))).onWriteError((Throwable)ArgumentMatchers.any(JournalException.class));
    }

    public void assertNoDataLoss() {
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.dataLossChecker.hasDataLoss()).withFailMessage(this.dataLossChecker.getFailMessage(), new Object[0])).isFalse();
    }

    static class DataLossChecker
    implements ZeebeLogAppender.AppendListener {
        final ZeebeLogAppender.AppendListener delegate;
        final Map<Long, Long> committedIndexToChecksumMap = new HashMap<Long, Long>();
        final Map<Long, Long> pendingWriteToBeCommitted = new HashMap<Long, Long>();
        private String failMessage = "";
        private boolean dataloss = false;

        DataLossChecker(ZeebeLogAppender.AppendListener delegate) {
            this.delegate = delegate;
        }

        public void onWrite(IndexedRaftLogEntry indexed) {
            long index = indexed.index();
            long entryChecksum = indexed.getPersistedRaftRecord().checksum();
            this.pendingWriteToBeCommitted.put(index, entryChecksum);
            this.delegate.onWrite(indexed);
        }

        public void onWriteError(Throwable error) {
            this.delegate.onWriteError(error);
        }

        public void onCommit(long index, long highestPosition) {
            if (this.committedIndexToChecksumMap.containsKey(index) && this.pendingWriteToBeCommitted.containsKey(index) && !this.pendingWriteToBeCommitted.get(index).equals(this.committedIndexToChecksumMap.get(index))) {
                this.failMessage = "Committed entry at index %d checksum %d is being overwritten by entry with checksum %d".formatted(index, this.committedIndexToChecksumMap.get(index), this.pendingWriteToBeCommitted.get(index));
                LOG.info(this.failMessage);
                this.dataloss = true;
            }
            this.committedIndexToChecksumMap.put(index, this.pendingWriteToBeCommitted.remove(index));
            this.delegate.onCommit(index, highestPosition);
        }

        public void onCommitError(long index, Throwable error) {
            this.delegate.onCommitError(index, error);
        }

        public String getFailMessage() {
            return this.failMessage;
        }

        public boolean hasDataLoss() {
            return this.dataloss;
        }
    }
}

