package org.apache.ratis.grpc.server;

import java.io.IOException;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.shaded.com.codahale.metrics.Timer;
import org.apache.logging.log4j.util.ProcessIdUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.leader.LogAppenderBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AutoCloseableReadWriteLock;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ratis/grpc/server/GrpcLogAppender.class */
public class GrpcLogAppender extends LogAppenderBase {
    public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class);
    private final RequestMap pendingRequests;
    private final int maxPendingRequestsNum;
    private long callId;
    private volatile boolean firstResponseReceived;
    private final boolean installSnapshotEnabled;
    private final TimeDuration requestTimeoutDuration;
    private final TimeoutScheduler scheduler;
    private volatile StreamObserver<RaftProtos.AppendEntriesRequestProto> appendLogRequestObserver;
    private final GrpcServerMetrics grpcServerMetrics;
    private final AutoCloseableReadWriteLock lock;
    private final StackTraceElement caller;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/grpc/server/GrpcLogAppender$AppendEntriesRequest.class */
    public static class AppendEntriesRequest {
        private final Timer timer;
        private volatile Timer.Context timerContext;
        private final long callId;
        private final TermIndex previousLog;
        private final int entriesCount;
        private final TermIndex lastEntry;

        AppendEntriesRequest(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto, RaftPeerId raftPeerId, GrpcServerMetrics grpcServerMetrics) {
            this.callId = appendEntriesRequestProto.getServerRequest().getCallId();
            this.previousLog = appendEntriesRequestProto.hasPreviousLog() ? TermIndex.valueOf(appendEntriesRequestProto.getPreviousLog()) : null;
            this.entriesCount = appendEntriesRequestProto.getEntriesCount();
            this.lastEntry = this.entriesCount > 0 ? TermIndex.valueOf(appendEntriesRequestProto.getEntries(this.entriesCount - 1)) : null;
            this.timer = grpcServerMetrics.getGrpcLogAppenderLatencyTimer(raftPeerId.toString(), isHeartbeat());
            grpcServerMetrics.onRequestCreate(isHeartbeat());
        }

        long getCallId() {
            return this.callId;
        }

        TermIndex getPreviousLog() {
            return this.previousLog;
        }

        void startRequestTimer() {
            this.timerContext = this.timer.time();
        }

        void stopRequestTimer() {
            this.timerContext.stop();
        }

        boolean isHeartbeat() {
            return this.entriesCount == 0;
        }

        public String toString() {
            return JavaUtils.getClassSimpleName(getClass()) + ":cid=" + this.callId + ",entriesCount=" + this.entriesCount + ",lastEntry=" + this.lastEntry;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ratis/grpc/server/GrpcLogAppender$AppendLogResponseHandler.class */
    public class AppendLogResponseHandler implements StreamObserver<RaftProtos.AppendEntriesReplyProto> {
        private final String name;

        private AppendLogResponseHandler() {
            this.name = GrpcLogAppender.this.getFollower().getName() + ProcessIdUtil.DEFAULT_PROCESSID + JavaUtils.getClassSimpleName(getClass());
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onNext(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
            AppendEntriesRequest remove = GrpcLogAppender.this.pendingRequests.remove(appendEntriesReplyProto);
            if (remove != null) {
                remove.stopRequestTimer();
            }
            if (GrpcLogAppender.LOG.isDebugEnabled()) {
                Logger logger = GrpcLogAppender.LOG;
                Object[] objArr = new Object[4];
                objArr[0] = this;
                objArr[1] = GrpcLogAppender.this.firstResponseReceived ? OzoneConsts.OZONE_ACL_ALL : "the first";
                objArr[2] = ServerStringUtils.toAppendEntriesReplyString(appendEntriesReplyProto);
                objArr[3] = remove;
                logger.debug("{}: received {} reply {}, request={}", objArr);
            }
            try {
                onNextImpl(appendEntriesReplyProto);
            } catch (Exception e) {
                GrpcLogAppender.LOG.error("Failed onNext request=" + remove + ", reply=" + ServerStringUtils.toAppendEntriesReplyString(appendEntriesReplyProto), e);
            }
        }

        private void onNextImpl(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
            GrpcLogAppender.this.getFollower().updateLastRpcResponseTime();
            if (!GrpcLogAppender.this.firstResponseReceived) {
                GrpcLogAppender.this.firstResponseReceived = true;
            }
            switch (appendEntriesReplyProto.getResult()) {
                case SUCCESS:
                    GrpcLogAppender.this.grpcServerMetrics.onRequestSuccess(GrpcLogAppender.this.getFollowerId().toString(), appendEntriesReplyProto.getIsHearbeat());
                    GrpcLogAppender.this.getLeaderState().onFollowerCommitIndex(GrpcLogAppender.this.getFollower(), appendEntriesReplyProto.getFollowerCommit());
                    if (GrpcLogAppender.this.getFollower().updateMatchIndex(appendEntriesReplyProto.getMatchIndex())) {
                        GrpcLogAppender.this.getLeaderState().onFollowerSuccessAppendEntries(GrpcLogAppender.this.getFollower());
                        break;
                    }
                    break;
                case NOT_LEADER:
                    GrpcLogAppender.this.grpcServerMetrics.onRequestNotLeader(GrpcLogAppender.this.getFollowerId().toString());
                    if (GrpcLogAppender.this.onFollowerTerm(appendEntriesReplyProto.getTerm())) {
                        return;
                    }
                    break;
                case INCONSISTENCY:
                    GrpcLogAppender.this.grpcServerMetrics.onRequestInconsistency(GrpcLogAppender.this.getFollowerId().toString());
                    GrpcLogAppender.this.updateNextIndex(appendEntriesReplyProto.getNextIndex());
                    break;
                default:
                    throw new IllegalStateException("Unexpected reply result: " + appendEntriesReplyProto.getResult());
            }
            GrpcLogAppender.this.notifyLogAppender();
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            if (!GrpcLogAppender.this.isRunning()) {
                GrpcLogAppender.LOG.info("{} is stopped", GrpcLogAppender.this);
                return;
            }
            GrpcUtil.warn(GrpcLogAppender.LOG, () -> {
                return this + ": Failed appendEntries";
            }, th);
            GrpcLogAppender.this.grpcServerMetrics.onRequestRetry();
            GrpcLogAppender.this.resetClient(GrpcLogAppender.this.pendingRequests.remove(GrpcUtil.getCallId(th), GrpcUtil.isHeartbeat(th)), true);
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onCompleted() {
            GrpcLogAppender.LOG.info("{}: follower responses appendEntries COMPLETED", this);
            GrpcLogAppender.this.resetClient(null, false);
        }

        public String toString() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ratis/grpc/server/GrpcLogAppender$InstallSnapshotResponseHandler.class */
    public class InstallSnapshotResponseHandler implements StreamObserver<RaftProtos.InstallSnapshotReplyProto> {
        private final String name;
        private final Queue<Integer> pending;
        private final AtomicBoolean done;
        private final boolean isNotificationOnly;

        InstallSnapshotResponseHandler(GrpcLogAppender grpcLogAppender) {
            this(false);
        }

        InstallSnapshotResponseHandler(boolean z) {
            this.name = GrpcLogAppender.this.getFollower().getName() + ProcessIdUtil.DEFAULT_PROCESSID + JavaUtils.getClassSimpleName(getClass());
            this.done = new AtomicBoolean(false);
            this.pending = new LinkedList();
            this.isNotificationOnly = z;
        }

        void addPending(RaftProtos.InstallSnapshotRequestProto installSnapshotRequestProto) {
            AutoCloseableReadWriteLock autoCloseableReadWriteLock = GrpcLogAppender.this.lock;
            StackTraceElement stackTraceElement = GrpcLogAppender.this.caller;
            Logger logger = GrpcLogAppender.LOG;
            logger.getClass();
            AutoCloseableLock writeLock = autoCloseableReadWriteLock.writeLock(stackTraceElement, logger::trace);
            Throwable th = null;
            try {
                try {
                    this.pending.offer(Integer.valueOf(installSnapshotRequestProto.getSnapshotChunk().getRequestIndex()));
                    if (writeLock != null) {
                        if (0 == 0) {
                            writeLock.close();
                            return;
                        }
                        try {
                            writeLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (writeLock != null) {
                    if (th != null) {
                        try {
                            writeLock.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        writeLock.close();
                    }
                }
                throw th4;
            }
        }

        void removePending(RaftProtos.InstallSnapshotReplyProto installSnapshotReplyProto) {
            AutoCloseableReadWriteLock autoCloseableReadWriteLock = GrpcLogAppender.this.lock;
            StackTraceElement stackTraceElement = GrpcLogAppender.this.caller;
            Logger logger = GrpcLogAppender.LOG;
            logger.getClass();
            AutoCloseableLock writeLock = autoCloseableReadWriteLock.writeLock(stackTraceElement, logger::trace);
            Throwable th = null;
            try {
                try {
                    Integer poll = this.pending.poll();
                    Objects.requireNonNull(poll, "index == null");
                    Preconditions.assertTrue(poll.intValue() == installSnapshotReplyProto.getRequestIndex());
                    if (writeLock != null) {
                        if (0 == 0) {
                            writeLock.close();
                            return;
                        }
                        try {
                            writeLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (writeLock != null) {
                    if (th != null) {
                        try {
                            writeLock.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        writeLock.close();
                    }
                }
                throw th4;
            }
        }

        boolean isDone() {
            return this.done.get();
        }

        void close() {
            this.done.set(true);
            GrpcLogAppender.this.notifyLogAppender();
        }

        boolean hasAllResponse() {
            AutoCloseableReadWriteLock autoCloseableReadWriteLock = GrpcLogAppender.this.lock;
            StackTraceElement stackTraceElement = GrpcLogAppender.this.caller;
            Logger logger = GrpcLogAppender.LOG;
            logger.getClass();
            AutoCloseableLock readLock = autoCloseableReadWriteLock.readLock(stackTraceElement, logger::trace);
            Throwable th = null;
            try {
                boolean isEmpty = this.pending.isEmpty();
                if (readLock != null) {
                    if (0 != 0) {
                        try {
                            readLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        readLock.close();
                    }
                }
                return isEmpty;
            } catch (Throwable th3) {
                if (readLock != null) {
                    if (0 != 0) {
                        try {
                            readLock.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        readLock.close();
                    }
                }
                throw th3;
            }
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onNext(RaftProtos.InstallSnapshotReplyProto installSnapshotReplyProto) {
            if (GrpcLogAppender.LOG.isInfoEnabled()) {
                Logger logger = GrpcLogAppender.LOG;
                Object[] objArr = new Object[3];
                objArr[0] = this;
                objArr[1] = GrpcLogAppender.this.firstResponseReceived ? OzoneConsts.OZONE_ACL_ALL : "the first";
                objArr[2] = ServerStringUtils.toInstallSnapshotReplyString(installSnapshotReplyProto);
                logger.info("{}: received {} reply {}", objArr);
            }
            GrpcLogAppender.this.getFollower().updateLastRpcResponseTime();
            if (!GrpcLogAppender.this.firstResponseReceived) {
                GrpcLogAppender.this.firstResponseReceived = true;
            }
            switch (installSnapshotReplyProto.getResult()) {
                case SUCCESS:
                    GrpcLogAppender.LOG.info("{}: Completed InstallSnapshot. Reply: {}", this, installSnapshotReplyProto);
                    GrpcLogAppender.this.getFollower().setAttemptedToInstallSnapshot();
                    removePending(installSnapshotReplyProto);
                    return;
                case IN_PROGRESS:
                    GrpcLogAppender.LOG.info("{}: InstallSnapshot in progress.", this);
                    removePending(installSnapshotReplyProto);
                    return;
                case ALREADY_INSTALLED:
                    long snapshotIndex = installSnapshotReplyProto.getSnapshotIndex();
                    GrpcLogAppender.LOG.info("{}: Follower snapshot is already at index {}.", this, Long.valueOf(snapshotIndex));
                    GrpcLogAppender.this.getFollower().setSnapshotIndex(snapshotIndex);
                    GrpcLogAppender.this.getFollower().setAttemptedToInstallSnapshot();
                    GrpcLogAppender.this.getLeaderState().onFollowerCommitIndex(GrpcLogAppender.this.getFollower(), snapshotIndex);
                    GrpcLogAppender.this.increaseNextIndex(snapshotIndex);
                    removePending(installSnapshotReplyProto);
                    return;
                case NOT_LEADER:
                    GrpcLogAppender.this.onFollowerTerm(installSnapshotReplyProto.getTerm());
                    return;
                case CONF_MISMATCH:
                    Logger logger2 = GrpcLogAppender.LOG;
                    Object[] objArr2 = new Object[6];
                    objArr2[0] = this;
                    objArr2[1] = RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY;
                    objArr2[2] = GrpcLogAppender.this.getServer().getId();
                    objArr2[3] = Boolean.valueOf(GrpcLogAppender.this.installSnapshotEnabled);
                    objArr2[4] = GrpcLogAppender.this.getFollowerId();
                    objArr2[5] = Boolean.valueOf(!GrpcLogAppender.this.installSnapshotEnabled);
                    logger2.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}", objArr2);
                    return;
                case SNAPSHOT_INSTALLED:
                    long snapshotIndex2 = installSnapshotReplyProto.getSnapshotIndex();
                    GrpcLogAppender.LOG.info("{}: Follower installed snapshot at index {}", this, Long.valueOf(snapshotIndex2));
                    GrpcLogAppender.this.getFollower().setSnapshotIndex(snapshotIndex2);
                    GrpcLogAppender.this.getFollower().setAttemptedToInstallSnapshot();
                    GrpcLogAppender.this.getLeaderState().onFollowerCommitIndex(GrpcLogAppender.this.getFollower(), snapshotIndex2);
                    GrpcLogAppender.this.increaseNextIndex(snapshotIndex2);
                    removePending(installSnapshotReplyProto);
                    return;
                case SNAPSHOT_UNAVAILABLE:
                    GrpcLogAppender.LOG.info("{}: Follower could not install snapshot as it is not available.", this);
                    GrpcLogAppender.this.getFollower().setAttemptedToInstallSnapshot();
                    removePending(installSnapshotReplyProto);
                    return;
                case UNRECOGNIZED:
                    GrpcLogAppender.LOG.error("Unrecongnized the reply result {}: Leader is {}, follower is {}", new Object[]{installSnapshotReplyProto.getResult(), GrpcLogAppender.this.getServer().getId(), GrpcLogAppender.this.getFollowerId()});
                    return;
                default:
                    return;
            }
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            if (!GrpcLogAppender.this.isRunning()) {
                GrpcLogAppender.LOG.info("{} is stopped", GrpcLogAppender.this);
                return;
            }
            GrpcUtil.warn(GrpcLogAppender.LOG, () -> {
                return this + ": Failed InstallSnapshot";
            }, th);
            GrpcLogAppender.this.grpcServerMetrics.onRequestRetry();
            GrpcLogAppender.this.resetClient(null, true);
            close();
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onCompleted() {
            if (!this.isNotificationOnly || GrpcLogAppender.LOG.isDebugEnabled()) {
                GrpcLogAppender.LOG.info("{}: follower responded installSnapshot COMPLETED", this);
            }
            close();
        }

        public String toString() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/grpc/server/GrpcLogAppender$RequestMap.class */
    public static class RequestMap {
        private final Map<Long, AppendEntriesRequest> logRequests = new ConcurrentHashMap();
        private final Map<Long, AppendEntriesRequest> heartbeats = new ConcurrentHashMap();

        RequestMap() {
        }

        int logRequestsSize() {
            return this.logRequests.size();
        }

        void clear() {
            this.logRequests.clear();
            this.heartbeats.clear();
        }

        void put(AppendEntriesRequest appendEntriesRequest) {
            if (appendEntriesRequest.isHeartbeat()) {
                this.heartbeats.put(Long.valueOf(appendEntriesRequest.getCallId()), appendEntriesRequest);
            } else {
                this.logRequests.put(Long.valueOf(appendEntriesRequest.getCallId()), appendEntriesRequest);
            }
        }

        AppendEntriesRequest remove(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
            return remove(appendEntriesReplyProto.getServerReply().getCallId(), appendEntriesReplyProto.getIsHearbeat());
        }

        AppendEntriesRequest remove(long j, boolean z) {
            return z ? this.heartbeats.remove(Long.valueOf(j)) : this.logRequests.remove(Long.valueOf(j));
        }

        public AppendEntriesRequest handleTimeout(long j, boolean z) {
            return z ? this.heartbeats.remove(Long.valueOf(j)) : this.logRequests.get(Long.valueOf(j));
        }
    }

    public GrpcLogAppender(RaftServer.Division division, LeaderState leaderState, FollowerInfo followerInfo) {
        super(division, leaderState, followerInfo);
        this.pendingRequests = new RequestMap();
        this.callId = 0L;
        this.firstResponseReceived = false;
        this.scheduler = TimeoutScheduler.getInstance();
        Preconditions.assertNotNull(getServerRpc(), "getServerRpc()");
        RaftProperties properties = division.getRaftServer().getProperties();
        this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
        this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties);
        this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
        this.grpcServerMetrics = new GrpcServerMetrics(division.getMemberId().toString());
        GrpcServerMetrics grpcServerMetrics = this.grpcServerMetrics;
        String raftPeerId = getFollowerId().toString();
        RequestMap requestMap = this.pendingRequests;
        requestMap.getClass();
        grpcServerMetrics.addPendingRequestsCount(raftPeerId, requestMap::logRequestsSize);
        this.lock = new AutoCloseableReadWriteLock(this);
        this.caller = LOG.isTraceEnabled() ? JavaUtils.getCallerStackTraceElement() : null;
    }

    @Override // org.apache.ratis.server.leader.LogAppender
    public GrpcService getServerRpc() {
        return (GrpcService) super.getServerRpc();
    }

    private GrpcServerProtocolClient getClient() throws IOException {
        return getServerRpc().getProxies().getProxy(getFollowerId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetClient(AppendEntriesRequest appendEntriesRequest, boolean z) {
        try {
            AutoCloseableReadWriteLock autoCloseableReadWriteLock = this.lock;
            StackTraceElement stackTraceElement = this.caller;
            Logger logger = LOG;
            logger.getClass();
            AutoCloseableLock writeLock = autoCloseableReadWriteLock.writeLock(stackTraceElement, logger::trace);
            Throwable th = null;
            try {
                getClient().resetConnectBackoff();
                this.appendLogRequestObserver = null;
                this.firstResponseReceived = false;
                this.pendingRequests.clear();
                Optional map = Optional.ofNullable(appendEntriesRequest).map((v0) -> {
                    return v0.getPreviousLog();
                }).map((v0) -> {
                    return v0.getIndex();
                });
                FollowerInfo follower = getFollower();
                follower.getClass();
                long longValue = 1 + ((Long) map.orElseGet(follower::getMatchIndex)).longValue();
                if (!z || getFollower().getMatchIndex() != 0 || appendEntriesRequest != null) {
                    getFollower().decreaseNextIndex(longValue);
                    if (writeLock != null) {
                        if (0 != 0) {
                            try {
                                writeLock.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            writeLock.close();
                        }
                    }
                    return;
                }
                LOG.warn("{}: Leader has not got in touch with Follower {} yet, just keep nextIndex unchanged and retry.", this, getFollower());
                if (writeLock != null) {
                    if (0 == 0) {
                        writeLock.close();
                        return;
                    }
                    try {
                        writeLock.close();
                        return;
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                        return;
                    }
                }
                return;
            } finally {
            }
        } catch (IOException e) {
            LOG.warn(this + ": Failed to getClient for " + getFollowerId(), e);
        }
        LOG.warn(this + ": Failed to getClient for " + getFollowerId(), e);
    }

    private boolean isFollowerCommitBehindLastCommitIndex() {
        return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
    }

    @Override // org.apache.ratis.server.leader.LogAppender
    public void run() throws IOException {
        while (isRunning()) {
            boolean z = false;
            if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) {
                if (this.installSnapshotEnabled) {
                    SnapshotInfo shouldInstallSnapshot = shouldInstallSnapshot();
                    if (shouldInstallSnapshot != null) {
                        installSnapshot(shouldInstallSnapshot);
                        z = true;
                    }
                } else {
                    TermIndex shouldNotifyToInstallSnapshot = shouldNotifyToInstallSnapshot();
                    if (shouldNotifyToInstallSnapshot != null) {
                        installSnapshot(shouldNotifyToInstallSnapshot);
                        z = true;
                    }
                }
                appendLog(z || haveTooManyPendingRequests());
            }
            getLeaderState().checkHealth(getFollower());
            mayWait();
        }
        Optional.ofNullable(this.appendLogRequestObserver).ifPresent((v0) -> {
            v0.onCompleted();
        });
    }

    private long getWaitTimeMs() {
        if (haveTooManyPendingRequests()) {
            return getHeartbeatRemainingTimeMs();
        }
        if (shouldSendAppendEntries()) {
            return 0L;
        }
        return Math.min(10L, getHeartbeatRemainingTimeMs());
    }

    private void mayWait() {
        long waitTimeMs = getWaitTimeMs();
        if (waitTimeMs <= 0) {
            return;
        }
        synchronized (this) {
            try {
                LOG.trace("{}: wait {}ms", this, Long.valueOf(waitTimeMs));
                wait(waitTimeMs);
            } catch (InterruptedException e) {
                LOG.warn(this + ": Wait interrupted by " + e);
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override // org.apache.ratis.server.leader.LogAppenderBase, org.apache.ratis.server.leader.LogAppender
    public void stop() {
        this.grpcServerMetrics.unregister();
        super.stop();
    }

    @Override // org.apache.ratis.server.leader.LogAppender
    public boolean shouldSendAppendEntries() {
        return this.appendLogRequestObserver == null || super.shouldSendAppendEntries();
    }

    private boolean haveTooManyPendingRequests() {
        int logRequestsSize = this.pendingRequests.logRequestsSize();
        if (logRequestsSize == 0) {
            return false;
        }
        return !this.firstResponseReceived || logRequestsSize >= this.maxPendingRequestsNum;
    }

    private void appendLog(boolean z) throws IOException {
        AutoCloseableReadWriteLock autoCloseableReadWriteLock = this.lock;
        StackTraceElement stackTraceElement = this.caller;
        Logger logger = LOG;
        logger.getClass();
        AutoCloseableLock writeLock = autoCloseableReadWriteLock.writeLock(stackTraceElement, logger::trace);
        Throwable th = null;
        try {
            try {
                long j = this.callId;
                this.callId = j + 1;
                RaftProtos.AppendEntriesRequestProto newAppendEntriesRequest = newAppendEntriesRequest(j, z);
                if (newAppendEntriesRequest == null) {
                    if (writeLock != null) {
                        if (0 == 0) {
                            writeLock.close();
                            return;
                        }
                        try {
                            writeLock.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                AppendEntriesRequest appendEntriesRequest = new AppendEntriesRequest(newAppendEntriesRequest, getFollowerId(), this.grpcServerMetrics);
                this.pendingRequests.put(appendEntriesRequest);
                increaseNextIndex(newAppendEntriesRequest);
                if (this.appendLogRequestObserver == null) {
                    this.appendLogRequestObserver = getClient().appendEntries(new AppendLogResponseHandler());
                }
                StreamObserver<RaftProtos.AppendEntriesRequestProto> streamObserver = this.appendLogRequestObserver;
                if (writeLock != null) {
                    if (0 != 0) {
                        try {
                            writeLock.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        writeLock.close();
                    }
                }
                if (isRunning()) {
                    sendRequest(appendEntriesRequest, newAppendEntriesRequest, streamObserver);
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (writeLock != null) {
                if (th != null) {
                    try {
                        writeLock.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    writeLock.close();
                }
            }
            throw th5;
        }
    }

    private void sendRequest(AppendEntriesRequest appendEntriesRequest, RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto, StreamObserver<RaftProtos.AppendEntriesRequestProto> streamObserver) {
        CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST, getServer().getId(), null, appendEntriesRequestProto);
        appendEntriesRequest.startRequestTimer();
        streamObserver.onNext(appendEntriesRequestProto);
        this.scheduler.onTimeout(this.requestTimeoutDuration, () -> {
            timeoutAppendRequest(appendEntriesRequest.getCallId(), appendEntriesRequest.isHeartbeat());
        }, LOG, () -> {
            return "Timeout check failed for append entry request: " + appendEntriesRequest;
        });
        getFollower().updateLastRpcSendTime();
    }

    private void timeoutAppendRequest(long j, boolean z) {
        AppendEntriesRequest handleTimeout = this.pendingRequests.handleTimeout(j, z);
        if (handleTimeout != null) {
            Logger logger = LOG;
            Object[] objArr = new Object[3];
            objArr[0] = this;
            objArr[1] = z ? "HEARTBEAT" : "";
            objArr[2] = handleTimeout;
            logger.warn("{}: {} appendEntries Timeout, request={}", objArr);
            this.grpcServerMetrics.onRequestTimeout(getFollowerId().toString(), z);
        }
    }

    private void increaseNextIndex(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto) {
        int entriesCount = appendEntriesRequestProto.getEntriesCount();
        if (entriesCount > 0) {
            getFollower().increaseNextIndex(appendEntriesRequestProto.getEntries(entriesCount - 1).getIndex() + 1);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void increaseNextIndex(long j) {
        getFollower().updateNextIndex(j + 1);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateNextIndex(long j) {
        AutoCloseableReadWriteLock autoCloseableReadWriteLock = this.lock;
        StackTraceElement stackTraceElement = this.caller;
        Logger logger = LOG;
        logger.getClass();
        AutoCloseableLock writeLock = autoCloseableReadWriteLock.writeLock(stackTraceElement, logger::trace);
        Throwable th = null;
        try {
            try {
                this.pendingRequests.clear();
                getFollower().setNextIndex(j);
                if (writeLock != null) {
                    if (0 == 0) {
                        writeLock.close();
                        return;
                    }
                    try {
                        writeLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (writeLock != null) {
                if (th != null) {
                    try {
                        writeLock.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    writeLock.close();
                }
            }
            throw th4;
        }
    }

    private void installSnapshot(SnapshotInfo snapshotInfo) {
        LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower", new Object[]{this, Long.valueOf(getFollower().getNextIndex()), Long.valueOf(getRaftLog().getStartIndex()), snapshotInfo});
        InstallSnapshotResponseHandler installSnapshotResponseHandler = new InstallSnapshotResponseHandler(this);
        StreamObserver<RaftProtos.InstallSnapshotRequestProto> streamObserver = null;
        String uuid = UUID.randomUUID().toString();
        try {
            streamObserver = getClient().installSnapshot(installSnapshotResponseHandler);
            for (RaftProtos.InstallSnapshotRequestProto installSnapshotRequestProto : newInstallSnapshotRequests(uuid, snapshotInfo)) {
                if (!isRunning()) {
                    break;
                }
                streamObserver.onNext(installSnapshotRequestProto);
                getFollower().updateLastRpcSendTime();
                installSnapshotResponseHandler.addPending(installSnapshotRequestProto);
            }
            streamObserver.onCompleted();
            this.grpcServerMetrics.onInstallSnapshot();
            synchronized (this) {
                while (isRunning() && !installSnapshotResponseHandler.isDone()) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            if (installSnapshotResponseHandler.hasAllResponse()) {
                getFollower().setSnapshotIndex(snapshotInfo.getTermIndex().getIndex());
                LOG.info("{}: installed snapshot {} successfully", this, snapshotInfo);
            }
        } catch (Exception e2) {
            LOG.warn("{}: failed to install snapshot {}: {}", new Object[]{this, snapshotInfo.getFiles(), e2});
            if (streamObserver != null) {
                streamObserver.onError(e2);
            }
        }
    }

    private void installSnapshot(TermIndex termIndex) {
        LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, notify follower to install snapshot-{}", new Object[]{this, Long.valueOf(getFollower().getNextIndex()), Long.valueOf(getRaftLog().getStartIndex()), termIndex});
        InstallSnapshotResponseHandler installSnapshotResponseHandler = new InstallSnapshotResponseHandler(true);
        StreamObserver<RaftProtos.InstallSnapshotRequestProto> streamObserver = null;
        RaftProtos.InstallSnapshotRequestProto newInstallSnapshotNotificationRequest = newInstallSnapshotNotificationRequest(termIndex);
        if (LOG.isInfoEnabled()) {
            LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(newInstallSnapshotNotificationRequest));
        }
        try {
            streamObserver = getClient().installSnapshot(installSnapshotResponseHandler);
            streamObserver.onNext(newInstallSnapshotNotificationRequest);
            getFollower().updateLastRpcSendTime();
            installSnapshotResponseHandler.addPending(newInstallSnapshotNotificationRequest);
            streamObserver.onCompleted();
            synchronized (this) {
                while (isRunning() && !installSnapshotResponseHandler.isDone()) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        } catch (Exception e2) {
            GrpcUtil.warn(LOG, () -> {
                return this + ": Failed to notify follower to install snapshot.";
            }, e2);
            if (streamObserver != null) {
                streamObserver.onError(e2);
            }
        }
    }

    private TermIndex shouldNotifyToInstallSnapshot() {
        FollowerInfo follower = getFollower();
        long nextIndex = getRaftLog().getNextIndex();
        if (getLeaderState().isFollowerBootstrapping(follower) && !follower.hasAttemptedToInstallSnapshot()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}: Notify follower to install snapshot as it is bootstrapping.", this);
            }
            return getRaftLog().getLastEntryTermIndex();
        }
        long nextIndex2 = follower.getNextIndex();
        if (nextIndex2 >= nextIndex) {
            return null;
        }
        long startIndex = getRaftLog().getStartIndex();
        if (nextIndex2 < startIndex) {
            return getRaftLog().getTermIndex(startIndex);
        }
        if (startIndex == -1) {
            return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), nextIndex);
        }
        return null;
    }
}
