package org.apache.giraph.comm.flow_control;

import com.yammer.metrics.core.Counter;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.handler.AckSignalFlag;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.FloatConfOption;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/flow_control/StaticFlowControl.class */
public class StaticFlowControl implements FlowControl, ResetSuperstepMetricsObserver {
    public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS = new IntConfOption("giraph.maxNumberOfOpenRequests", 10000, "Maximum number of requests without confirmation we should have");
    public static final FloatConfOption FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING = new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding", 0.2f, "Fraction of requests to close before proceeding");
    private static final Logger LOG = Logger.getLogger(StaticFlowControl.class);
    private final int maxNumberOfOpenRequests;
    private final int numberOfRequestsToProceed;
    private final NettyClient nettyClient;
    private final int waitingRequestMsecs;
    private Counter timeWaitingOnOpenRequests;
    private final Object requestSpotAvailable = new Object();
    private final AtomicInteger numWaitingThreads = new AtomicInteger(0);

    public StaticFlowControl(ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(immutableClassesGiraphConfiguration);
        this.numberOfRequestsToProceed = (int) (this.maxNumberOfOpenRequests * (1.0f - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(immutableClassesGiraphConfiguration)));
        if (LOG.isInfoEnabled()) {
            LOG.info("StaticFlowControl: Limit number of open requests to " + this.maxNumberOfOpenRequests + " and proceed when <= " + this.numberOfRequestsToProceed);
        }
        this.waitingRequestMsecs = GiraphConstants.WAITING_REQUEST_MSECS.get(immutableClassesGiraphConfiguration);
        GiraphMetrics.get().addSuperstepResetObserver(this);
    }

    @Override // org.apache.giraph.metrics.ResetSuperstepMetricsObserver
    public void newSuperstep(SuperstepMetricsRegistry superstepMetricsRegistry) {
        this.timeWaitingOnOpenRequests = superstepMetricsRegistry.getCounter(MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS);
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public void sendRequest(int i, WritableRequest writableRequest) {
        this.nettyClient.doSend(i, writableRequest);
        if (this.nettyClient.getNumberOfOpenRequests() > this.maxNumberOfOpenRequests) {
            long currentTimeMillis = System.currentTimeMillis();
            waitSomeRequests();
            this.timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - currentTimeMillis);
        }
    }

    private void waitSomeRequests() {
        this.numWaitingThreads.getAndIncrement();
        while (true) {
            if (this.nettyClient.getNumberOfOpenRequests() <= this.numberOfRequestsToProceed) {
                break;
            }
            synchronized (this.requestSpotAvailable) {
                if (this.nettyClient.getNumberOfOpenRequests() > this.numberOfRequestsToProceed) {
                    try {
                        this.requestSpotAvailable.wait(this.waitingRequestMsecs);
                    } catch (InterruptedException e) {
                        throw new IllegalStateException("waitSomeRequests: Got unexpected InterruptedException", e);
                    }
                }
            }
            break;
            this.nettyClient.logAndSanityCheck();
        }
        this.numWaitingThreads.getAndDecrement();
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public void messageAckReceived(int i, long j, int i2) {
        synchronized (this.requestSpotAvailable) {
            this.requestSpotAvailable.notifyAll();
        }
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public AckSignalFlag getAckSignalFlag(int i) {
        return AckSignalFlag.values()[i];
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public int calculateResponse(AckSignalFlag ackSignalFlag, int i) {
        return ackSignalFlag.ordinal();
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public void logInfo() {
        if (LOG.isInfoEnabled()) {
            LOG.info("logInfo: " + this.numWaitingThreads.get() + " threads waiting until number of open requests falls below " + this.numberOfRequestsToProceed);
        }
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public void waitAllRequests() {
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public int getNumberOfUnsentRequests() {
        return 0;
    }
}
