package org.apache.ratis.grpc.server;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.ProtoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/ratis/grpc/server/GrpcServerProtocolService.class */
public class GrpcServerProtocolService extends RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) GrpcServerProtocolService.class);
    private final Supplier<RaftPeerId> idSupplier;
    private final RaftServer server;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/grpc/server/GrpcServerProtocolService$PendingServerRequest.class */
    public static class PendingServerRequest<REQUEST> {
        private final REQUEST request;
        private final CompletableFuture<Void> future = new CompletableFuture<>();

        PendingServerRequest(REQUEST request) {
            this.request = request;
        }

        REQUEST getRequest() {
            return this.request;
        }

        CompletableFuture<Void> getFuture() {
            return this.future;
        }
    }

    /* loaded from: input_file:org/apache/ratis/grpc/server/GrpcServerProtocolService$ServerRequestStreamObserver.class */
    abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObserver<REQUEST> {
        private final RaftServerProtocol.Op op;
        private final StreamObserver<REPLY> responseObserver;
        private final AtomicReference<PendingServerRequest<REQUEST>> previousOnNext = new AtomicReference<>();
        private final AtomicBoolean isClosed = new AtomicBoolean(false);

        ServerRequestStreamObserver(RaftServerProtocol.Op op, StreamObserver<REPLY> streamObserver) {
            this.op = op;
            this.responseObserver = streamObserver;
        }

        private String getPreviousRequestString() {
            return (String) Optional.ofNullable(this.previousOnNext.get()).map((v0) -> {
                return v0.getRequest();
            }).map(this::requestToString).orElse(null);
        }

        abstract CompletableFuture<REPLY> process(REQUEST request) throws IOException;

        abstract long getCallId(REQUEST request);

        abstract String requestToString(REQUEST request);

        abstract String replyToString(REPLY reply);

        abstract boolean replyInOrder(REQUEST request);

        StatusRuntimeException wrapException(Throwable th, REQUEST request) {
            return GrpcUtil.wrapException(th, getCallId(request));
        }

        private void handleError(Throwable th, REQUEST request) {
            GrpcUtil.warn(GrpcServerProtocolService.LOG, () -> {
                return GrpcServerProtocolService.this.getId() + ": Failed " + this.op + " request " + requestToString(request);
            }, th);
            this.responseObserver.onError(wrapException(th, request));
        }

        private synchronized void handleReply(REPLY reply) {
            if (this.isClosed.get()) {
                return;
            }
            if (GrpcServerProtocolService.LOG.isDebugEnabled()) {
                GrpcServerProtocolService.LOG.debug("{}: reply {}", GrpcServerProtocolService.this.getId(), replyToString(reply));
            }
            this.responseObserver.onNext(reply);
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onNext(REQUEST request) {
            if (!replyInOrder(request)) {
                try {
                    process(request).thenAccept(this::handleReply);
                    return;
                } catch (Exception e) {
                    handleError(e, request);
                    return;
                }
            }
            PendingServerRequest<REQUEST> pendingServerRequest = new PendingServerRequest<>(request);
            try {
                process(request).exceptionally(th -> {
                    handleError(th, request);
                    pendingServerRequest.getFuture().completeExceptionally(th);
                    return null;
                }).thenCombine((CompletionStage) Optional.ofNullable(this.previousOnNext.getAndSet(pendingServerRequest)).map((v0) -> {
                    return v0.getFuture();
                }).orElse(CompletableFuture.completedFuture(null)), (obj, r6) -> {
                    handleReply(obj);
                    pendingServerRequest.getFuture().complete(null);
                    return null;
                });
            } catch (Exception e2) {
                handleError(e2, request);
                pendingServerRequest.getFuture().completeExceptionally(e2);
            }
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onCompleted() {
            if (this.isClosed.compareAndSet(false, true)) {
                GrpcServerProtocolService.LOG.info("{}: Completed {}, lastRequest: {}", GrpcServerProtocolService.this.getId(), this.op, getPreviousRequestString());
                this.responseObserver.onCompleted();
            }
        }

        @Override // org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            Status fromThrowable;
            GrpcUtil.warn(GrpcServerProtocolService.LOG, () -> {
                return GrpcServerProtocolService.this.getId() + ": installSnapshot onError, lastRequest: " + getPreviousRequestString();
            }, th);
            if (!this.isClosed.compareAndSet(false, true) || (fromThrowable = Status.fromThrowable(th)) == null || fromThrowable.getCode() == Status.Code.CANCELLED) {
                return;
            }
            this.responseObserver.onCompleted();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GrpcServerProtocolService(Supplier<RaftPeerId> supplier, RaftServer raftServer) {
        this.idSupplier = supplier;
        this.server = raftServer;
    }

    RaftPeerId getId() {
        return this.idSupplier.get();
    }

    @Override // org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase
    public void requestVote(RaftProtos.RequestVoteRequestProto requestVoteRequestProto, StreamObserver<RaftProtos.RequestVoteReplyProto> streamObserver) {
        try {
            streamObserver.onNext(this.server.requestVote(requestVoteRequestProto));
            streamObserver.onCompleted();
        } catch (Exception e) {
            GrpcUtil.warn(LOG, () -> {
                return getId() + ": Failed requestVote " + ProtoUtils.toString(requestVoteRequestProto.getServerRequest());
            }, e);
            streamObserver.onError(GrpcUtil.wrapException(e));
        }
    }

    @Override // org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase
    public void startLeaderElection(RaftProtos.StartLeaderElectionRequestProto startLeaderElectionRequestProto, StreamObserver<RaftProtos.StartLeaderElectionReplyProto> streamObserver) {
        try {
            streamObserver.onNext(this.server.startLeaderElection(startLeaderElectionRequestProto));
            streamObserver.onCompleted();
        } catch (Throwable th) {
            GrpcUtil.warn(LOG, () -> {
                return getId() + ": Failed startLeaderElection " + ProtoUtils.toString(startLeaderElectionRequestProto.getServerRequest());
            }, th);
            streamObserver.onError(GrpcUtil.wrapException(th));
        }
    }

    @Override // org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase
    public StreamObserver<RaftProtos.AppendEntriesRequestProto> appendEntries(StreamObserver<RaftProtos.AppendEntriesReplyProto> streamObserver) {
        return new ServerRequestStreamObserver<RaftProtos.AppendEntriesRequestProto, RaftProtos.AppendEntriesReplyProto>(RaftServerProtocol.Op.APPEND_ENTRIES, streamObserver) { // from class: org.apache.ratis.grpc.server.GrpcServerProtocolService.1
            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public CompletableFuture<RaftProtos.AppendEntriesReplyProto> process(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto) throws IOException {
                return GrpcServerProtocolService.this.server.appendEntriesAsync(appendEntriesRequestProto);
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public long getCallId(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto) {
                return appendEntriesRequestProto.getServerRequest().getCallId();
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public String requestToString(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto) {
                return ServerStringUtils.toAppendEntriesRequestString(appendEntriesRequestProto);
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public String replyToString(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
                return ServerStringUtils.toAppendEntriesReplyString(appendEntriesReplyProto);
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public boolean replyInOrder(RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto) {
                return appendEntriesRequestProto.getEntriesCount() != 0;
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public StatusRuntimeException wrapException(Throwable th, RaftProtos.AppendEntriesRequestProto appendEntriesRequestProto) {
                return GrpcUtil.wrapException(th, getCallId(appendEntriesRequestProto), appendEntriesRequestProto.getEntriesCount() == 0);
            }
        };
    }

    @Override // org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase
    public StreamObserver<RaftProtos.InstallSnapshotRequestProto> installSnapshot(StreamObserver<RaftProtos.InstallSnapshotReplyProto> streamObserver) {
        return new ServerRequestStreamObserver<RaftProtos.InstallSnapshotRequestProto, RaftProtos.InstallSnapshotReplyProto>(RaftServerProtocol.Op.INSTALL_SNAPSHOT, streamObserver) { // from class: org.apache.ratis.grpc.server.GrpcServerProtocolService.2
            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public CompletableFuture<RaftProtos.InstallSnapshotReplyProto> process(RaftProtos.InstallSnapshotRequestProto installSnapshotRequestProto) throws IOException {
                return CompletableFuture.completedFuture(GrpcServerProtocolService.this.server.installSnapshot(installSnapshotRequestProto));
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public long getCallId(RaftProtos.InstallSnapshotRequestProto installSnapshotRequestProto) {
                return installSnapshotRequestProto.getServerRequest().getCallId();
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public String requestToString(RaftProtos.InstallSnapshotRequestProto installSnapshotRequestProto) {
                return ServerStringUtils.toInstallSnapshotRequestString(installSnapshotRequestProto);
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public String replyToString(RaftProtos.InstallSnapshotReplyProto installSnapshotReplyProto) {
                return ServerStringUtils.toInstallSnapshotReplyString(installSnapshotReplyProto);
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.ratis.grpc.server.GrpcServerProtocolService.ServerRequestStreamObserver
            public boolean replyInOrder(RaftProtos.InstallSnapshotRequestProto installSnapshotRequestProto) {
                return true;
            }
        };
    }
}
