package org.apache.ratis.grpc.server;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.FollowerInfo;
import org.apache.ratis.server.impl.LeaderState;
import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ratis/grpc/server/GrpcLogAppender.class */
public class GrpcLogAppender extends LogAppender {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) GrpcLogAppender.class);
    private final GrpcService rpcService;
    private final Map<Long, RaftProtos.AppendEntriesRequestProto> pendingRequests;
    private final int maxPendingRequestsNum;
    private long callId;
    private volatile boolean firstResponseReceived;
    private final boolean installSnapshotEnabled;
    private final TimeDuration requestTimeoutDuration;
    private final TimeoutScheduler scheduler;
    private volatile StreamObserver<RaftProtos.AppendEntriesRequestProto> appendLogRequestObserver;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ratis/grpc/server/GrpcLogAppender$AppendLogResponseHandler.class */
    public class AppendLogResponseHandler implements StreamObserver<RaftProtos.AppendEntriesReplyProto> {
        private AppendLogResponseHandler() {
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onNext(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
            if (GrpcLogAppender.LOG.isDebugEnabled()) {
                Logger logger = GrpcLogAppender.LOG;
                Object[] objArr = new Object[4];
                objArr[0] = GrpcLogAppender.this.server.getId();
                objArr[1] = GrpcLogAppender.this.follower.getPeer();
                objArr[2] = !GrpcLogAppender.this.firstResponseReceived ? "the first" : OzoneConsts.OZONE_ACL_ALL;
                objArr[3] = ServerProtoUtils.toString(appendEntriesReplyProto);
                logger.debug("{}<-{}: received {} reply {} ", objArr);
            }
            try {
                onNextImpl(appendEntriesReplyProto);
            } catch (Throwable th) {
                GrpcLogAppender.LOG.error("Failed onNext " + appendEntriesReplyProto, th);
            }
        }

        private void onNextImpl(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
            GrpcLogAppender.this.follower.updateLastRpcResponseTime();
            if (!GrpcLogAppender.this.firstResponseReceived) {
                GrpcLogAppender.this.firstResponseReceived = true;
            }
            switch (appendEntriesReplyProto.getResult()) {
                case SUCCESS:
                    GrpcLogAppender.this.onSuccess(appendEntriesReplyProto);
                    break;
                case NOT_LEADER:
                    GrpcLogAppender.this.onNotLeader(appendEntriesReplyProto);
                    break;
                case INCONSISTENCY:
                    GrpcLogAppender.this.onInconsistency(appendEntriesReplyProto);
                    break;
            }
            GrpcLogAppender.this.notifyAppend();
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            if (!GrpcLogAppender.this.isAppenderRunning()) {
                GrpcLogAppender.LOG.info("{} is stopped", GrpcLogAppender.this);
                return;
            }
            GrpcUtil.warn(GrpcLogAppender.LOG, () -> {
                return GrpcLogAppender.this.server.getId() + ": Failed appendEntries to " + GrpcLogAppender.this.follower.getPeer();
            }, th);
            GrpcLogAppender.this.resetClient((RaftProtos.AppendEntriesRequestProto) GrpcLogAppender.this.pendingRequests.get(Long.valueOf(GrpcUtil.getCallId(th))));
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onCompleted() {
            GrpcLogAppender.LOG.info("{}: follower {} response Completed", GrpcLogAppender.this.server.getId(), GrpcLogAppender.this.follower);
            GrpcLogAppender.this.resetClient(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ratis/grpc/server/GrpcLogAppender$InstallSnapshotResponseHandler.class */
    public class InstallSnapshotResponseHandler implements StreamObserver<RaftProtos.InstallSnapshotReplyProto> {
        private final AtomicBoolean done = new AtomicBoolean(false);
        private final Queue<Integer> pending = new LinkedList();

        InstallSnapshotResponseHandler() {
        }

        synchronized void addPending(RaftProtos.InstallSnapshotRequestProto installSnapshotRequestProto) {
            this.pending.offer(Integer.valueOf(installSnapshotRequestProto.getRequestIndex()));
        }

        synchronized void removePending(RaftProtos.InstallSnapshotReplyProto installSnapshotReplyProto) {
            Preconditions.assertTrue(this.pending.poll().intValue() == installSnapshotReplyProto.getRequestIndex());
        }

        boolean isDone() {
            return this.done.get();
        }

        void close() {
            this.done.set(true);
            GrpcLogAppender.this.notifyAppend();
        }

        synchronized boolean hasAllResponse() {
            return this.pending.isEmpty();
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onNext(RaftProtos.InstallSnapshotReplyProto installSnapshotReplyProto) {
            Logger logger = GrpcLogAppender.LOG;
            Object[] objArr = new Object[3];
            objArr[0] = GrpcLogAppender.this.server.getId();
            objArr[1] = !GrpcLogAppender.this.firstResponseReceived ? "the first" : OzoneConsts.OZONE_ACL_ALL;
            objArr[2] = GrpcLogAppender.this.follower.getPeer();
            logger.debug("{} received {} response from {}", objArr);
            GrpcLogAppender.this.follower.updateLastRpcResponseTime();
            if (!GrpcLogAppender.this.firstResponseReceived) {
                GrpcLogAppender.this.firstResponseReceived = true;
            }
            switch (installSnapshotReplyProto.getResult()) {
                case SUCCESS:
                    removePending(installSnapshotReplyProto);
                    return;
                case NOT_LEADER:
                    GrpcLogAppender.this.checkResponseTerm(installSnapshotReplyProto.getTerm());
                    return;
                case UNRECOGNIZED:
                default:
                    return;
            }
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            if (!GrpcLogAppender.this.isAppenderRunning()) {
                GrpcLogAppender.LOG.info("{} is stopped", GrpcLogAppender.this);
                return;
            }
            GrpcLogAppender.LOG.info("{} got error when installing snapshot to {}, exception: {}", GrpcLogAppender.this.server.getId(), GrpcLogAppender.this.follower.getPeer(), th);
            GrpcLogAppender.this.resetClient(null);
            close();
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onCompleted() {
            GrpcLogAppender.LOG.info("{} stops sending snapshots to follower {}", GrpcLogAppender.this.server.getId(), GrpcLogAppender.this.follower);
            close();
        }
    }

    public GrpcLogAppender(RaftServerImpl raftServerImpl, LeaderState leaderState, FollowerInfo followerInfo) {
        super(raftServerImpl, leaderState, followerInfo);
        this.callId = 0L;
        this.firstResponseReceived = false;
        this.scheduler = TimeoutScheduler.newInstance(1);
        this.rpcService = (GrpcService) raftServerImpl.getServerRpc();
        this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(raftServerImpl.getProxy().getProperties());
        this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(raftServerImpl.getProxy().getProperties());
        this.pendingRequests = new ConcurrentHashMap();
        this.installSnapshotEnabled = GrpcConfigKeys.LogAppender.installSnapshotEnabled(raftServerImpl.getProxy().getProperties());
    }

    private GrpcServerProtocolClient getClient() throws IOException {
        return this.rpcService.getProxies().getProxy(this.follower.getPeer().getId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void resetClient(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto) {
        this.rpcService.getProxies().resetProxy(this.follower.getPeer().getId());
        this.appendLogRequestObserver = null;
        this.firstResponseReceived = false;
        clearPendingRequests((appendEntriesRequestProto == null || !appendEntriesRequestProto.hasPreviousLog()) ? this.raftLog.getStartIndex() : appendEntriesRequestProto.getPreviousLog().getIndex() + 1);
    }

    @Override // org.apache.ratis.server.impl.LogAppender
    protected void runAppenderImpl() throws IOException {
        SnapshotInfo shouldInstallSnapshot;
        while (isAppenderRunning()) {
            boolean z = true;
            if (shouldSendRequest()) {
                if (this.installSnapshotEnabled && (shouldInstallSnapshot = shouldInstallSnapshot()) != null) {
                    installSnapshot(shouldInstallSnapshot);
                    z = false;
                }
                if (z && !shouldWait()) {
                    appendLog();
                }
            }
            checkSlowness();
            mayWait();
        }
        Optional.ofNullable(this.appendLogRequestObserver).ifPresent((v0) -> {
            v0.onCompleted();
        });
    }

    private long getWaitTimeMs() {
        if (!shouldSendRequest()) {
            return getHeartbeatRemainingTime();
        }
        if (shouldWait()) {
            return this.halfMinTimeoutMs;
        }
        return 0L;
    }

    private void mayWait() {
        long waitTimeMs = getWaitTimeMs();
        if (waitTimeMs <= 0) {
            return;
        }
        synchronized (this) {
            try {
                LOG.trace("{}: wait {}ms", this, Long.valueOf(waitTimeMs));
                wait(waitTimeMs);
            } catch (InterruptedException e) {
                LOG.warn(this + ": Wait interrupted by " + e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ratis.server.impl.LogAppender
    public boolean shouldSendRequest() {
        return this.appendLogRequestObserver == null || super.shouldSendRequest();
    }

    private boolean shouldWait() {
        int size = this.pendingRequests.size();
        if (size == 0) {
            return false;
        }
        return !this.firstResponseReceived || size >= this.maxPendingRequestsNum;
    }

    private void appendLog() throws IOException {
        synchronized (this) {
            long j = this.callId;
            this.callId = j + 1;
            RaftProtos.AppendEntriesRequestProto createRequest = createRequest(j);
            if (createRequest == null) {
                return;
            }
            this.pendingRequests.put(Long.valueOf(createRequest.getServerRequest().getCallId()), createRequest);
            updateNextIndex(createRequest);
            if (this.appendLogRequestObserver == null) {
                this.appendLogRequestObserver = getClient().appendEntries(new AppendLogResponseHandler());
            }
            StreamObserver<RaftProtos.AppendEntriesRequestProto> streamObserver = this.appendLogRequestObserver;
            if (isAppenderRunning()) {
                sendRequest(createRequest, streamObserver);
            }
        }
    }

    private void sendRequest(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto, StreamObserver<RaftProtos.AppendEntriesRequestProto> streamObserver) {
        CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST, this.server.getId(), null, appendEntriesRequestProto);
        streamObserver.onNext(appendEntriesRequestProto);
        this.scheduler.onTimeout(this.requestTimeoutDuration, () -> {
            timeoutAppendRequest(appendEntriesRequestProto);
        }, LOG, () -> {
            return "Timeout check failed for append entry request: " + appendEntriesRequestProto;
        });
        this.follower.updateLastRpcSendTime();
    }

    private void timeoutAppendRequest(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto) {
        RaftProtos.AppendEntriesRequestProto remove = this.pendingRequests.remove(Long.valueOf(appendEntriesRequestProto.getServerRequest().getCallId()));
        if (remove != null) {
            LOG.warn("{}: appendEntries Timeout, request={}", this, ServerProtoUtils.toString(remove));
        }
    }

    private void updateNextIndex(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto) {
        int entriesCount = appendEntriesRequestProto.getEntriesCount();
        if (entriesCount > 0) {
            this.follower.updateNextIndex(appendEntriesRequestProto.getEntries(entriesCount - 1).getIndex() + 1);
        }
    }

    private void clearPendingRequests(long j) {
        this.pendingRequests.clear();
        this.follower.decreaseNextIndex(j);
    }

    protected synchronized void onSuccess(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
        boolean z;
        RaftProtos.AppendEntriesRequestProto remove = this.pendingRequests.remove(Long.valueOf(appendEntriesReplyProto.getServerReply().getCallId()));
        if (remove == null) {
            LOG.warn("{}: Request not found, ignoring reply: {}", this, ServerProtoUtils.toString(appendEntriesReplyProto));
            return;
        }
        updateCommitIndex(appendEntriesReplyProto.getFollowerCommit());
        long nextIndex = appendEntriesReplyProto.getNextIndex();
        long j = nextIndex - 1;
        if (remove.getEntriesCount() == 0) {
            Preconditions.assertTrue(!remove.hasPreviousLog() || j == remove.getPreviousLog().getIndex(), "reply's next index is %s, request's previous is %s", Long.valueOf(nextIndex), remove.getPreviousLog());
            z = remove.hasPreviousLog() && this.follower.getMatchIndex() < j;
        } else {
            long index = remove.getEntries(remove.getEntriesCount() - 1).getIndex();
            Preconditions.assertTrue(j == index, "reply's next index is %s, request's last entry index is %s", Long.valueOf(nextIndex), Long.valueOf(index));
            z = true;
        }
        if (z) {
            this.follower.updateMatchIndex(j);
            submitEventOnSuccessAppend();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onNotLeader(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
        checkResponseTerm(appendEntriesReplyProto.getTerm());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void onInconsistency(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
        RaftProtos.AppendEntriesRequestProto remove = this.pendingRequests.remove(Long.valueOf(appendEntriesReplyProto.getServerReply().getCallId()));
        if (remove == null) {
            LOG.warn("{}: Ignoring {}", this.server.getId(), appendEntriesReplyProto);
            return;
        }
        Preconditions.assertTrue(remove.hasPreviousLog());
        if (remove.getPreviousLog().getIndex() >= appendEntriesReplyProto.getNextIndex()) {
            clearPendingRequests(appendEntriesReplyProto.getNextIndex());
        }
    }

    private void installSnapshot(SnapshotInfo snapshotInfo) {
        LOG.info("{}: follower {}'s next index is {}, log's start index is {}, need to install snapshot", this.server.getId(), this.follower.getPeer(), Long.valueOf(this.follower.getNextIndex()), Long.valueOf(this.raftLog.getStartIndex()));
        InstallSnapshotResponseHandler installSnapshotResponseHandler = new InstallSnapshotResponseHandler();
        StreamObserver<RaftProtos.InstallSnapshotRequestProto> streamObserver = null;
        String uuid = UUID.randomUUID().toString();
        try {
            streamObserver = getClient().installSnapshot(installSnapshotResponseHandler);
            Iterator<RaftProtos.InstallSnapshotRequestProto> it = new LogAppender.SnapshotRequestIter(snapshotInfo, uuid).iterator();
            while (it.hasNext()) {
                RaftProtos.InstallSnapshotRequestProto next = it.next();
                if (!isAppenderRunning()) {
                    break;
                }
                streamObserver.onNext(next);
                this.follower.updateLastRpcSendTime();
                installSnapshotResponseHandler.addPending(next);
            }
            streamObserver.onCompleted();
            synchronized (this) {
                while (isAppenderRunning() && !installSnapshotResponseHandler.isDone()) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
            if (installSnapshotResponseHandler.hasAllResponse()) {
                this.follower.setSnapshotIndex(snapshotInfo.getTermIndex().getIndex());
                LOG.info("{}: install snapshot-{} successfully on follower {}", this.server.getId(), Long.valueOf(snapshotInfo.getTermIndex().getIndex()), this.follower.getPeer());
            }
        } catch (Exception e2) {
            LOG.warn("{} failed to install snapshot {}. Exception: {}", this, snapshotInfo.getFiles(), e2);
            if (streamObserver != null) {
                streamObserver.onError(e2);
            }
        }
    }
}
