package org.apache.uniffle.shuffle.manager;

import io.grpc.stub.StreamObserver;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.ShuffleManagerGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.class */
public class ShuffleManagerGrpcService extends ShuffleManagerGrpc.ShuffleManagerImplBase {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
    private final Map<Integer, RssShuffleStatus> shuffleStatus = JavaUtils.newConcurrentMap();
    private final RssShuffleManagerInterface shuffleManager;

    /* loaded from: input_file:org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService$RssShuffleStatus.class */
    private static class RssShuffleStatus {
        private final ReentrantReadWriteLock lock;
        private final ReentrantReadWriteLock.ReadLock readLock;
        private final ReentrantReadWriteLock.WriteLock writeLock;
        private final int[] partitions;
        private int stageAttempt;

        private RssShuffleStatus(int i, int i2) {
            this.lock = new ReentrantReadWriteLock();
            this.readLock = this.lock.readLock();
            this.writeLock = this.lock.writeLock();
            this.stageAttempt = i2;
            this.partitions = new int[i];
        }

        private <T> T withReadLock(Supplier<T> supplier) {
            this.readLock.lock();
            try {
                return supplier.get();
            } finally {
                this.readLock.unlock();
            }
        }

        private <T> T withWriteLock(Supplier<T> supplier) {
            this.writeLock.lock();
            try {
                return supplier.get();
            } finally {
                this.writeLock.unlock();
            }
        }

        public int getStageAttempt() {
            return ((Integer) withReadLock(() -> {
                return Integer.valueOf(this.stageAttempt);
            })).intValue();
        }

        public int resetStageAttemptIfNecessary(int i) {
            return ((Integer) withWriteLock(() -> {
                if (this.stageAttempt >= i) {
                    return this.stageAttempt > i ? -1 : 0;
                }
                Arrays.fill(this.partitions, 0);
                this.stageAttempt = i;
                return 1;
            })).intValue();
        }

        public void incPartitionFetchFailure(int i, int i2) {
            withWriteLock(() -> {
                if (this.stageAttempt != i) {
                    return null;
                }
                this.partitions[i2] = this.partitions[i2] + 1;
                return null;
            });
        }

        public int getPartitionFetchFailureNum(int i, int i2) {
            return ((Integer) withReadLock(() -> {
                if (this.stageAttempt != i) {
                    return 0;
                }
                return Integer.valueOf(this.partitions[i2]);
            })).intValue();
        }
    }

    public ShuffleManagerGrpcService(RssShuffleManagerInterface rssShuffleManagerInterface) {
        this.shuffleManager = rssShuffleManagerInterface;
    }

    @Override // org.apache.uniffle.proto.ShuffleManagerGrpc.ShuffleManagerImplBase
    public void reportShuffleFetchFailure(RssProtos.ReportShuffleFetchFailureRequest reportShuffleFetchFailureRequest, StreamObserver<RssProtos.ReportShuffleFetchFailureResponse> streamObserver) {
        RssProtos.StatusCode statusCode;
        boolean z;
        String str;
        String appId = reportShuffleFetchFailureRequest.getAppId();
        int stageAttemptId = reportShuffleFetchFailureRequest.getStageAttemptId();
        int partitionId = reportShuffleFetchFailureRequest.getPartitionId();
        if (appId.equals(this.shuffleManager.getAppId())) {
            RssShuffleStatus computeIfAbsent = this.shuffleStatus.computeIfAbsent(Integer.valueOf(reportShuffleFetchFailureRequest.getShuffleId()), num -> {
                return new RssShuffleStatus(this.shuffleManager.getPartitionNum(num.intValue()), stageAttemptId);
            });
            if (computeIfAbsent.resetStageAttemptIfNecessary(stageAttemptId) < 0) {
                str = String.format("got an old stage(%d vs %d) shuffle fetch failure report, which should be impossible.", Integer.valueOf(computeIfAbsent.getStageAttempt()), Integer.valueOf(stageAttemptId));
                LOG.warn(str);
                statusCode = RssProtos.StatusCode.INVALID_REQUEST;
                z = false;
            } else {
                statusCode = RssProtos.StatusCode.SUCCESS;
                computeIfAbsent.incPartitionFetchFailure(stageAttemptId, partitionId);
                if (computeIfAbsent.getPartitionFetchFailureNum(stageAttemptId, partitionId) >= this.shuffleManager.getMaxFetchFailures()) {
                    z = true;
                    str = String.format("report shuffle fetch failure as maximum number(%d) of shuffle fetch is occurred", Integer.valueOf(this.shuffleManager.getMaxFetchFailures()));
                } else {
                    z = false;
                    str = "don't report shuffle fetch failure";
                }
            }
        } else {
            str = String.format("got a wrong shuffle fetch failure report from appId: %s, expected appId: %s", appId, this.shuffleManager.getAppId());
            LOG.warn(str);
            statusCode = RssProtos.StatusCode.INVALID_REQUEST;
            z = false;
        }
        streamObserver.onNext(RssProtos.ReportShuffleFetchFailureResponse.newBuilder().setStatus(statusCode).setReSubmitWholeStage(z).setMsg(str).build());
        streamObserver.onCompleted();
    }

    public void unregisterShuffle(int i) {
        this.shuffleStatus.remove(Integer.valueOf(i));
    }
}
