/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.raft.roles;

import com.google.common.base.Preconditions;
import io.atomix.raft.RaftException;
import io.atomix.raft.RaftServer;
import io.atomix.raft.cluster.RaftMember;
import io.atomix.raft.cluster.impl.DefaultRaftMember;
import io.atomix.raft.cluster.impl.RaftMemberContext;
import io.atomix.raft.impl.RaftContext;
import io.atomix.raft.metrics.LeaderAppenderMetrics;
import io.atomix.raft.protocol.AppendResponse;
import io.atomix.raft.protocol.ConfigureRequest;
import io.atomix.raft.protocol.ConfigureResponse;
import io.atomix.raft.protocol.InstallRequest;
import io.atomix.raft.protocol.InstallResponse;
import io.atomix.raft.protocol.RaftRequest;
import io.atomix.raft.protocol.RaftResponse;
import io.atomix.raft.protocol.ReplicatableJournalRecord;
import io.atomix.raft.protocol.VersionedAppendRequest;
import io.atomix.raft.roles.LeaderRole;
import io.atomix.raft.snapshot.impl.SnapshotChunkImpl;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.system.Configuration;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotChunk;
import io.camunda.zeebe.snapshots.SnapshotChunkReader;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;

final class LeaderAppender {
    private static final int MIN_BACKOFF_FAILURE_COUNT = 5;
    private final int maxBatchSizePerAppend;
    private final Logger log;
    private final RaftContext raft;
    private boolean open = true;
    private final LeaderAppenderMetrics metrics;
    private final long leaderTime;
    private final long leaderIndex;
    private final long electionTimeout;
    private final NavigableMap<Long, CompletableFuture<Long>> appendFutures = new TreeMap<Long, CompletableFuture<Long>>();
    private final List<TimestampedFuture<Long>> heartbeatFutures = new ArrayList<TimestampedFuture<Long>>();
    private final long heartbeatTime;
    private final int minStepDownFailureCount;
    private final long maxQuorumResponseTimeout;

    LeaderAppender(LeaderRole leader) {
        this.raft = (RaftContext)Preconditions.checkNotNull((Object)leader.raft, (Object)"context cannot be null");
        this.log = ContextualLoggerFactory.getLogger(this.getClass(), (LoggerContext)LoggerContext.builder(RaftServer.class).addValue((Object)this.raft.getName()).build());
        this.metrics = new LeaderAppenderMetrics(this.raft.getName(), this.raft.getMeterRegistry());
        this.maxBatchSizePerAppend = this.raft.getMaxAppendBatchSize();
        this.leaderTime = System.currentTimeMillis();
        this.leaderIndex = this.raft.getLog().isEmpty() ? this.raft.getLog().getFirstIndex() : this.raft.getLog().getLastIndex() + 1L;
        this.heartbeatTime = this.leaderTime;
        this.electionTimeout = this.raft.getElectionTimeout().toMillis();
        this.minStepDownFailureCount = this.raft.getMinStepDownFailureCount();
        this.maxQuorumResponseTimeout = this.raft.getMaxQuorumResponseTimeout().isZero() ? this.electionTimeout * 2L : this.raft.getMaxQuorumResponseTimeout().toMillis();
    }

    private VersionedAppendRequest buildAppendRequest(RaftMemberContext member, long lastIndex) {
        if (!this.hasMoreEntries(member)) {
            return this.buildAppendEmptyRequest(member);
        }
        if (member.getFailureCount() > 0) {
            return this.buildAppendEmptyRequest(member);
        }
        return this.buildAppendEntriesRequest(member, lastIndex);
    }

    private VersionedAppendRequest buildAppendEmptyRequest(RaftMemberContext member) {
        IndexedRaftLogEntry prevEntry = member.getCurrentEntry();
        DefaultRaftMember leader = this.raft.getLeader();
        return this.builderWithPreviousEntry(prevEntry).withTerm(this.raft.getTerm()).withLeader(leader.memberId()).withEntries(Collections.emptyList()).withCommitIndex(this.raft.getCommitIndex()).build();
    }

    private VersionedAppendRequest.Builder builderWithPreviousEntry(IndexedRaftLogEntry prevEntry) {
        long prevIndex = 0L;
        long prevTerm = 0L;
        if (prevEntry != null) {
            prevIndex = prevEntry.index();
            prevTerm = prevEntry.term();
        } else {
            PersistedSnapshot currentSnapshot = this.raft.getCurrentSnapshot();
            if (currentSnapshot != null) {
                prevIndex = currentSnapshot.getIndex();
                prevTerm = currentSnapshot.getTerm();
            }
        }
        return VersionedAppendRequest.builder().withPrevLogTerm(prevTerm).withPrevLogIndex(prevIndex);
    }

    private VersionedAppendRequest buildAppendEntriesRequest(RaftMemberContext member, long lastIndex) {
        IndexedRaftLogEntry prevEntry = member.getCurrentEntry();
        DefaultRaftMember leader = this.raft.getLeader();
        VersionedAppendRequest.Builder builder = this.builderWithPreviousEntry(prevEntry).withTerm(this.raft.getTerm()).withLeader(leader.memberId()).withCommitIndex(this.raft.getCommitIndex());
        ArrayList<ReplicatableJournalRecord> entries = new ArrayList<ReplicatableJournalRecord>();
        int size = 0;
        while (this.hasMoreEntries(member)) {
            IndexedRaftLogEntry entry = member.nextEntry();
            ReplicatableJournalRecord replicatableRecord = entry.getReplicatableJournalRecord();
            entries.add(replicatableRecord);
            if (entry.index() != lastIndex && (size += replicatableRecord.approximateSize()) < this.maxBatchSizePerAppend) continue;
            break;
        }
        return builder.withEntries(entries).build();
    }

    private void sendAppendRequest(RaftMemberContext member, VersionedAppendRequest request) {
        if (request.entries().isEmpty() && !member.canHeartbeat()) {
            return;
        }
        member.startAppend();
        long timestamp = System.currentTimeMillis();
        this.log.trace("Sending {} to {}", (Object)request, (Object)member.getMember().memberId());
        this.raft.getProtocol().append(member.getMember().memberId(), request).whenCompleteAsync((response, error) -> {
            if (this.open) {
                long appendLatency = System.currentTimeMillis() - timestamp;
                this.metrics.appendComplete(appendLatency, (String)((Object)member.getMember().memberId().id()));
                if (!request.entries().isEmpty()) {
                    member.completeAppend(appendLatency);
                } else {
                    member.completeAppend();
                }
                if (error == null) {
                    this.log.trace("Received {} from {}", response, (Object)member.getMember().memberId());
                    this.handleAppendResponse(member, request, (AppendResponse)response, timestamp);
                } else {
                    this.handleAppendResponseFailure(member, request, (Throwable)error);
                }
            }
        }, (Executor)this.raft.getThreadContext());
        if (!request.entries().isEmpty() && this.hasMoreEntries(member)) {
            this.appendEntries(member);
        }
    }

    private void succeedAttempt(RaftMemberContext member) {
        member.resetFailureCount();
    }

    private void updateConfigurationIndex(RaftMemberContext member, AppendResponse response) {
        long configIndex = response.configurationIndex();
        if (configIndex == 0L) {
            return;
        }
        member.setConfigIndex(configIndex);
    }

    private void updateMatchIndex(RaftMemberContext member, AppendResponse response) {
        member.setMatchIndex(response.lastLogIndex());
        this.observeRemainingMemberEntries(member);
    }

    private void resetMatchIndex(RaftMemberContext member, AppendResponse response) {
        if (response.lastLogIndex() < member.getMatchIndex()) {
            this.log.trace("Reset match index for {} to {}", (Object)member, (Object)member.getMatchIndex());
            member.setMatchIndex(response.lastLogIndex());
            this.observeRemainingMemberEntries(member);
        }
    }

    private void observeRemainingMemberEntries(RaftMemberContext member) {
        this.metrics.observeRemainingEntries((String)((Object)member.getMember().memberId().id()), this.raft.getLog().getLastIndex() - member.getMatchIndex());
    }

    private void resetNextIndex(RaftMemberContext member, AppendResponse response) {
        long nextIndex = response.lastLogIndex() + 1L;
        this.resetNextIndex(member, nextIndex);
    }

    private void resetNextIndex(RaftMemberContext member, long nextIndex) {
        member.reset(nextIndex);
        this.log.trace("Reset next index for {} to {}", (Object)member, (Object)nextIndex);
    }

    private void resetSnapshotIndex(RaftMemberContext member, AppendResponse response) {
        long snapshotIndex = response.lastSnapshotIndex();
        if (member.getSnapshotIndex() != snapshotIndex) {
            member.setSnapshotIndex(snapshotIndex);
            this.log.trace("Reset snapshot index for {} to {}", (Object)member, (Object)snapshotIndex);
        }
    }

    private ConfigureRequest buildConfigureRequest() {
        DefaultRaftMember leader = this.raft.getLeader();
        Configuration configuration = this.raft.getCluster().getConfiguration();
        return ConfigureRequest.builder().withTerm(this.raft.getTerm()).withLeader(leader.memberId()).withIndex(configuration.index()).withTime(configuration.time()).withNewMembers(configuration.newMembers()).withOldMembers(configuration.oldMembers()).build();
    }

    private void sendConfigureRequest(RaftMemberContext member, ConfigureRequest request) {
        this.log.debug("Configuring {} : {}", (Object)member.getMember().memberId(), (Object)request);
        member.startConfigure();
        long timestamp = System.currentTimeMillis();
        this.log.trace("Sending {} to {}", (Object)request, (Object)member.getMember().memberId());
        this.raft.getProtocol().configure(member.getMember().memberId(), request).whenCompleteAsync((response, error) -> {
            if (this.open) {
                member.completeConfigure();
                if (error == null) {
                    this.log.trace("Received {} from {}", response, (Object)member.getMember().memberId());
                    this.handleConfigureResponse(member, request, (ConfigureResponse)response, timestamp);
                } else {
                    if (this.log.isTraceEnabled()) {
                        this.log.debug("Failed to configure {}", (Object)member.getMember().memberId(), error);
                    } else {
                        this.log.debug("Failed to configure {}", (Object)member.getMember().memberId());
                    }
                    this.handleConfigureResponseFailure(member, request, (Throwable)error);
                }
            }
        }, (Executor)this.raft.getThreadContext());
    }

    protected void handleConfigureResponseFailure(RaftMemberContext member, ConfigureRequest request, Throwable error) {
        this.failAttempt(member, request, error);
    }

    protected void handleConfigureResponseOk(RaftMemberContext member, ConfigureRequest request, ConfigureResponse response) {
        this.succeedAttempt(member);
        member.setConfigTerm(request.term());
        member.setConfigIndex(request.index());
        this.appendEntries(member);
    }

    private Optional<InstallRequest> buildInstallRequest(RaftMemberContext member, PersistedSnapshot persistedSnapshot) {
        if (member.getNextSnapshotIndex() != persistedSnapshot.getIndex()) {
            try {
                SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();
                member.setSnapshotChunkReader(snapshotChunkReader);
            }
            catch (UncheckedIOException e) {
                this.log.warn("Expected to send Snapshot {} to {}. But could not open SnapshotChunkReader. Will retry.", new Object[]{persistedSnapshot.getId(), member, e});
                return Optional.empty();
            }
            member.setNextSnapshotIndex(persistedSnapshot.getIndex());
            member.setNextSnapshotChunk(null);
        }
        SnapshotChunkReader reader = member.getSnapshotChunkReader();
        try {
            if (member.getNextSnapshotChunk() != null) {
                reader.seek(member.getNextSnapshotChunk());
            } else {
                reader.reset();
            }
            if (!reader.hasNext()) {
                return Optional.empty();
            }
            SnapshotChunk chunk = (SnapshotChunk)reader.next();
            DefaultRaftMember leader = this.raft.getLeader();
            InstallRequest request = InstallRequest.builder().withCurrentTerm(this.raft.getTerm()).withLeader(leader.memberId()).withIndex(persistedSnapshot.getIndex()).withTerm(persistedSnapshot.getTerm()).withVersion(persistedSnapshot.version()).withData(new SnapshotChunkImpl(chunk).toByteBuffer()).withChunkId(ByteBuffer.wrap(chunk.getChunkName().getBytes())).withInitial(member.getNextSnapshotChunk() == null).withComplete(!reader.hasNext()).withNextChunkId(reader.nextId()).build();
            return Optional.of(request);
        }
        catch (UncheckedIOException e) {
            this.log.warn("Expected to send next chunk of Snapshot {} to {}. But could not read SnapshotChunk. Snapshot may have been deleted. Will retry.", new Object[]{persistedSnapshot.getId(), member.getMember().memberId(), e});
            member.setNextSnapshotIndex(0L);
            member.setNextSnapshotChunk(null);
            return Optional.empty();
        }
    }

    private void sendInstallRequest(RaftMemberContext member, InstallRequest request) {
        member.startInstall();
        long timestamp = System.currentTimeMillis();
        this.log.trace("Sending {} to {}", (Object)request, (Object)member.getMember().memberId());
        this.raft.getProtocol().install(member.getMember().memberId(), request).whenCompleteAsync((response, error) -> {
            if (this.open) {
                member.completeInstall();
                if (error == null) {
                    this.log.trace("Received {} from {}", response, (Object)member.getMember().memberId());
                    this.handleInstallResponse(member, request, (InstallResponse)response, timestamp);
                } else {
                    this.handleInstallResponseFailure(member, request, (Throwable)error);
                }
            }
        }, (Executor)this.raft.getThreadContext());
    }

    private void handleInstallResponseFailure(RaftMemberContext member, InstallRequest request, Throwable error) {
        boolean isTimeout;
        boolean bl = isTimeout = error instanceof TimeoutException || error != null && error.getCause() instanceof TimeoutException;
        if (!isTimeout) {
            member.setNextSnapshotIndex(0L);
            member.setNextSnapshotChunk(null);
        }
        this.failAttempt(member, request, error);
    }

    private void handleInstallResponseOk(RaftMemberContext member, InstallRequest request) {
        this.succeedAttempt(member);
        if (request.complete()) {
            member.setNextSnapshotIndex(0L);
            member.setNextSnapshotChunk(null);
            member.setSnapshotIndex(request.index());
            this.resetNextIndex(member, request.index() + 1L);
        } else {
            member.setNextSnapshotChunk(request.nextChunkId());
        }
        this.appendEntries(member);
    }

    private void handleInstallResponseError(RaftMemberContext member, InstallRequest request, InstallResponse response) {
        this.log.warn("Failed to send {} to member {}, with {}. Restart sending snapshot.", new Object[]{request, member.getMember().memberId(), response.error().toString()});
        member.setNextSnapshotIndex(0L);
        member.setNextSnapshotChunk(null);
    }

    public CompletableFuture<Long> appendEntries(long index) {
        this.raft.checkThread();
        if (index == 0L) {
            return this.appendEntries();
        }
        if (index <= this.raft.getCommitIndex()) {
            return CompletableFuture.completedFuture(index);
        }
        if (this.raft.getCluster().isSingleMemberCluster()) {
            this.raft.setCommitIndex(index);
            this.completeCommits(index);
            return CompletableFuture.completedFuture(index);
        }
        if (!this.open) {
            return CompletableFuture.failedFuture(new RaftException.NoLeader("Cannot replicate entries on closed leader", new Object[0]));
        }
        return this.appendFutures.computeIfAbsent(index, i -> {
            for (RaftMemberContext member : this.raft.getCluster().getReplicationTargets()) {
                this.appendEntries(member);
            }
            return new CompletableFuture();
        });
    }

    public CompletableFuture<Long> appendEntries() {
        this.raft.checkThread();
        if (this.raft.getCluster().getReplicationTargets().isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        TimestampedFuture<Long> future = new TimestampedFuture<Long>();
        this.heartbeatFutures.add(future);
        for (RaftMemberContext member : this.raft.getCluster().getReplicationTargets()) {
            this.appendEntries(member);
        }
        return future;
    }

    private void completeCommits(long commitIndex) {
        NavigableMap<Long, CompletableFuture<Long>> completable = this.appendFutures.headMap(commitIndex, true);
        completable.forEach((index, future) -> {
            this.metrics.observeCommit();
            future.complete(index);
        });
        completable.clear();
        this.observeNonCommittedEntries(commitIndex);
    }

    private void handleAppendResponseFailure(RaftMemberContext member, VersionedAppendRequest request, Throwable error) {
        this.failHeartbeat();
        this.failAttempt(member, request, error);
    }

    private void failAttempt(RaftMemberContext member, RaftRequest request, Throwable error) {
        int failures = member.incrementFailureCount();
        if (failures <= 3 || failures % 100 == 0) {
            this.log.warn("{} to {} failed", new Object[]{request, member.getMember().memberId(), error});
        }
        this.failHeartbeat();
        long quorumResponseTime = System.currentTimeMillis() - Math.max(this.computeResponseTime(), this.leaderTime);
        if (member.getFailureCount() >= this.minStepDownFailureCount && quorumResponseTime > this.maxQuorumResponseTimeout) {
            this.log.warn("Suspected network partition after {} failures from {} over a period of time {} > {}, stepping down", new Object[]{member.getFailureCount(), member.getMember().memberId(), quorumResponseTime, this.maxQuorumResponseTimeout});
            this.raft.setLeader(null);
            this.raft.transition(RaftServer.Role.FOLLOWER);
        }
    }

    private void handleAppendResponse(RaftMemberContext member, VersionedAppendRequest request, AppendResponse response, long timestamp) {
        if (response.status() == RaftResponse.Status.OK) {
            this.handleAppendResponseOk(member, request, response);
        } else {
            this.handleAppendResponseError(member, request, response);
        }
        this.recordHeartbeat(member, timestamp);
    }

    private void handleAppendResponseOk(RaftMemberContext member, VersionedAppendRequest request, AppendResponse response) {
        this.succeedAttempt(member);
        this.updateConfigurationIndex(member, response);
        if (response.succeeded()) {
            member.appendSucceeded();
            this.updateMatchIndex(member, response);
            this.metrics.observeAppend((String)((Object)member.getMember().memberId().id()), request.entries().size(), request.entries().stream().mapToInt(ReplicatableJournalRecord::approximateSize).sum());
            this.commitEntries();
            if (this.hasMoreEntries(member)) {
                this.appendEntries(member);
            }
        } else if (response.term() > this.raft.getTerm()) {
            this.log.info("Received successful append response higher term ({} > {}) from {}, implying there is a new leader - transitioning to follower", new Object[]{response.term(), this.raft.getTerm(), member.getMember()});
            this.raft.setTerm(response.term());
            this.raft.setLeader(null);
            this.raft.transition(RaftServer.Role.FOLLOWER);
        } else {
            member.appendFailed();
            this.resetMatchIndex(member, response);
            this.resetNextIndex(member, response);
            this.resetSnapshotIndex(member, response);
            if (this.hasMoreEntries(member)) {
                this.appendEntries(member);
            }
        }
    }

    private void appendEntries(RaftMemberContext member) {
        if (!this.open) {
            return;
        }
        if (!member.isOpen()) {
            return;
        }
        if (!member.hasReplicationContext()) {
            member.openReplicationContext(this.raft.getLog());
        }
        if (member.getFailureCount() >= 5) {
            this.sendAppendRequest(member, this.buildAppendEmptyRequest(member));
        } else if (member.getConfigTerm() < this.raft.getTerm() || member.getConfigIndex() < this.raft.getCluster().getConfiguration().index()) {
            if (member.canConfigure()) {
                this.sendConfigureRequest(member, this.buildConfigureRequest());
            } else if (member.canHeartbeat()) {
                this.sendAppendRequest(member, this.buildAppendEmptyRequest(member));
            }
        } else if (member.getMember().getType() == RaftMember.Type.ACTIVE || member.getMember().getType() == RaftMember.Type.PROMOTABLE || member.getMember().getType() == RaftMember.Type.PASSIVE) {
            this.tryToReplicate(member);
        } else if (member.canAppend()) {
            this.sendAppendRequest(member, this.buildAppendRequest(member, -1L));
        }
    }

    private boolean hasMoreEntries(RaftMemberContext member) {
        return !member.hasReplicationContext() || member.hasNextEntry();
    }

    private void handleAppendResponseError(RaftMemberContext member, VersionedAppendRequest request, AppendResponse response) {
        if (response.term() > this.raft.getTerm()) {
            this.log.info("Received error append response with higher term ({} > {}) from {}, implying there is a new leader, transitioning to follower", new Object[]{response.term(), this.raft.getTerm(), member.getMember()});
            this.raft.setTerm(response.term());
            this.raft.setLeader(null);
            this.raft.transition(RaftServer.Role.FOLLOWER);
        } else {
            int failures = member.incrementFailureCount();
            if (failures <= 3 || failures % 100 == 0) {
                this.log.warn("{} to {} failed: {}", new Object[]{request, member.getMember().memberId(), response.error() != null ? response.error() : ""});
            }
        }
    }

    private void handleConfigureResponse(RaftMemberContext member, ConfigureRequest request, ConfigureResponse response, long timestamp) {
        if (response.status() == RaftResponse.Status.OK) {
            this.handleConfigureResponseOk(member, request, response);
        }
        this.recordHeartbeat(member, timestamp);
    }

    private void handleInstallResponse(RaftMemberContext member, InstallRequest request, InstallResponse response, long timestamp) {
        if (response.status() == RaftResponse.Status.OK) {
            this.handleInstallResponseOk(member, request);
        } else {
            this.handleInstallResponseError(member, request, response);
        }
        this.recordHeartbeat(member, timestamp);
    }

    public void close() {
        this.open = false;
        this.metrics.close();
        this.completeCommits(this.raft.getCommitIndex());
        this.appendFutures.forEach((index, future) -> future.completeExceptionally(new RaftException.AppendFailureException((long)index, "Leader stepping down")));
        this.heartbeatFutures.forEach(future -> future.completeExceptionally(new RaftException.ProtocolException("Failed to reach consensus", new Object[0])));
    }

    private void tryToReplicate(RaftMemberContext member) {
        if (this.shouldReplicateSnapshot(member)) {
            if (!member.canInstall()) {
                return;
            }
            this.replicateSnapshot(member);
        } else if (member.canAppend()) {
            this.replicateEvents(member);
        }
    }

    private boolean shouldReplicateSnapshot(RaftMemberContext member) {
        PersistedSnapshot persistedSnapshot = this.raft.getCurrentSnapshot();
        if (persistedSnapshot == null) {
            return false;
        }
        if (member.getSnapshotIndex() >= persistedSnapshot.getIndex()) {
            return false;
        }
        if (this.raft.getLog().getFirstIndex() > member.getCurrentIndex()) {
            return true;
        }
        long memberLag = persistedSnapshot.getIndex() - member.getCurrentIndex();
        return memberLag > (long)this.raft.getPreferSnapshotReplicationThreshold();
    }

    private void replicateSnapshot(RaftMemberContext member) {
        PersistedSnapshot persistedSnapshot = this.raft.getCurrentSnapshot();
        this.log.debug("Replicating snapshot {} to {}", (Object)persistedSnapshot.getIndex(), (Object)member.getMember().memberId());
        this.buildInstallRequest(member, persistedSnapshot).ifPresent(installRequest -> this.sendInstallRequest(member, (InstallRequest)installRequest));
    }

    private void replicateEvents(RaftMemberContext member) {
        this.sendAppendRequest(member, this.buildAppendRequest(member, -1L));
    }

    private void failHeartbeat() {
        this.raft.checkThread();
        long currentTimestamp = System.currentTimeMillis();
        Iterator<TimestampedFuture<Long>> iterator = this.heartbeatFutures.iterator();
        while (iterator.hasNext()) {
            TimestampedFuture<Long> future = iterator.next();
            if (currentTimestamp - future.timestamp <= this.electionTimeout) continue;
            future.completeExceptionally(new RaftException.ProtocolException("Failed to reach consensus", new Object[0]));
            iterator.remove();
        }
    }

    private void recordHeartbeat(RaftMemberContext member, long timestamp) {
        this.raft.checkThread();
        member.setHeartbeatTime(timestamp);
        member.setResponseTime(System.currentTimeMillis());
        long quorumHeartbeatTime = this.computeHeartbeatTime();
        long currentTimestamp = System.currentTimeMillis();
        Iterator<TimestampedFuture<Long>> iterator = this.heartbeatFutures.iterator();
        while (iterator.hasNext()) {
            TimestampedFuture<Long> future = iterator.next();
            if (future.timestamp < quorumHeartbeatTime) {
                future.complete(null);
                iterator.remove();
                continue;
            }
            if (currentTimestamp - future.timestamp <= this.electionTimeout) break;
            future.completeExceptionally(new RaftException.ProtocolException("Failed to reach consensus", new Object[0]));
            iterator.remove();
        }
        if (!this.heartbeatFutures.isEmpty()) {
            this.sendHeartbeats();
        }
    }

    private long computeHeartbeatTime() {
        return this.raft.getCluster().getQuorumFor(RaftMemberContext::getHeartbeatTime).orElseGet(System::currentTimeMillis);
    }

    private void sendHeartbeats() {
        for (RaftMemberContext member : this.raft.getCluster().getReplicationTargets()) {
            this.appendEntries(member);
        }
    }

    private void commitEntries() {
        this.raft.checkThread();
        long commitIndex = this.raft.getCluster().getQuorumFor(RaftMemberContext::getMatchIndex).orElseGet(() -> this.raft.getLog().getLastIndex());
        long previousCommitIndex = this.raft.getCommitIndex();
        if (commitIndex > 0L && commitIndex > previousCommitIndex && this.leaderIndex > 0L && commitIndex >= this.leaderIndex) {
            this.log.trace("Committed entries up to {}", (Object)commitIndex);
            this.raft.setCommitIndex(commitIndex);
            this.completeCommits(commitIndex);
        }
    }

    private long computeResponseTime() {
        return this.raft.getCluster().getQuorumFor(RaftMemberContext::getResponseTime).orElseGet(System::currentTimeMillis);
    }

    public long getIndex() {
        return this.leaderIndex;
    }

    public long getTime() {
        return this.heartbeatTime;
    }

    void observeNonCommittedEntries(long commitIndex) {
        this.metrics.observeNonCommittedEntries(this.raft.getLog().getLastIndex() - commitIndex);
    }

    private static class TimestampedFuture<T>
    extends CompletableFuture<T> {
        private final long timestamp;

        TimestampedFuture() {
            this(System.currentTimeMillis());
        }

        TimestampedFuture(long timestamp) {
            this.timestamp = timestamp;
        }
    }
}

