package org.apache.giraph.comm.flow_control;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.handler.AckSignalFlag;
import org.apache.giraph.comm.requests.SendResumeRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.utils.AdjustableSemaphore;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;
import org.codehaus.jackson.util.BufferRecycler;
import org.python.apache.xerces.dom3.as.ASDataType;

/* loaded from: input_file:org/apache/giraph/comm/flow_control/CreditBasedFlowControl.class */
public class CreditBasedFlowControl implements FlowControl {
    public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER = new IntConfOption("giraph.maxOpenRequestsPerWorker", 20, "Maximum number of requests without confirmation we can have per worker");
    public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS = new IntConfOption("giraph.maxNumberOfUnsentRequests", BufferRecycler.DEFAULT_WRITE_CONCAT_BUFFER_LEN, "Maximum number of unsent requests we can keep in memory");
    public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL = new IntConfOption("giraph.unsentCacheWaitInterval", ASDataType.OTHER_SIMPLE_DATATYPE, "Time interval to wait on unsent requests cache (in milliseconds)");
    private static final Logger LOG = Logger.getLogger(CreditBasedFlowControl.class);
    private final int unsentWaitMsecs;
    private final int waitingRequestMsecs;
    private volatile short maxOpenRequestsPerWorker;
    private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0);
    private final ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>> perWorkerOpenRequestMap = Maps.newConcurrentMap();
    private final ConcurrentMap<Integer, Deque<WritableRequest>> perWorkerUnsentRequestMap = Maps.newConcurrentMap();
    private final Set<Integer> workersToResume = Sets.newHashSet();
    private final ConcurrentMap<Integer, Set<Long>> resumeRequestsId = Maps.newConcurrentMap();
    private final Semaphore unsentRequestPermit;
    private final NettyClient nettyClient;
    private final Future<Void> resumeThreadResult;
    private volatile boolean shouldTerminate;

    public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.maxOpenRequestsPerWorker = (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(immutableClassesGiraphConfiguration);
        Preconditions.checkState(this.maxOpenRequestsPerWorker < 16384 && this.maxOpenRequestsPerWorker > 0, "NettyClient: max number of open requests should be in range (0, 20479)");
        this.unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(immutableClassesGiraphConfiguration));
        this.unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(immutableClassesGiraphConfiguration);
        this.waitingRequestMsecs = GiraphConstants.WAITING_REQUEST_MSECS.get(immutableClassesGiraphConfiguration);
        this.shouldTerminate = false;
        CallableFactory<Void> callableFactory = new CallableFactory<Void>() { // from class: org.apache.giraph.comm.flow_control.CreditBasedFlowControl.1
            @Override // org.apache.giraph.utils.CallableFactory
            public Callable<Void> newCallable(int i) {
                return new Callable<Void>() { // from class: org.apache.giraph.comm.flow_control.CreditBasedFlowControl.1.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        while (true) {
                            synchronized (CreditBasedFlowControl.this.workersToResume) {
                                if (CreditBasedFlowControl.this.shouldTerminate) {
                                    return null;
                                }
                                for (Integer num : CreditBasedFlowControl.this.workersToResume) {
                                    if (CreditBasedFlowControl.this.maxOpenRequestsPerWorker != 0) {
                                        CreditBasedFlowControl.this.sendResumeSignal(num.intValue());
                                    }
                                }
                                try {
                                    CreditBasedFlowControl.this.workersToResume.wait();
                                } catch (InterruptedException e) {
                                    throw new IllegalStateException("call: caught exception while waiting for resume-sender thread to be notified!", e);
                                }
                            }
                        }
                    }
                };
            }
        };
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("resume-sender"));
        this.resumeThreadResult = newSingleThreadExecutor.submit(new LogStacktraceCallable(callableFactory.newCallable(0)));
        newSingleThreadExecutor.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendResumeSignal(int i) {
        Long doSend = this.nettyClient.doSend(i, new SendResumeRequest(this.maxOpenRequestsPerWorker));
        Preconditions.checkState(doSend != null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("sendResumeSignal: sending signal to worker " + i + " with credit=" + ((int) this.maxOpenRequestsPerWorker) + ", ID=" + (doSend.longValue() & 65535));
        }
        this.resumeRequestsId.get(Integer.valueOf(i)).add(doSend);
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public void sendRequest(int i, WritableRequest writableRequest) {
        Pair<AdjustableSemaphore, Integer> pair = this.perWorkerOpenRequestMap.get(Integer.valueOf(i));
        if (pair == null) {
            pair = new MutablePair(new AdjustableSemaphore(this.maxOpenRequestsPerWorker), -1);
            Pair<AdjustableSemaphore, Integer> putIfAbsent = this.perWorkerOpenRequestMap.putIfAbsent(Integer.valueOf(i), pair);
            this.perWorkerUnsentRequestMap.putIfAbsent(Integer.valueOf(i), new ArrayDeque());
            this.resumeRequestsId.putIfAbsent(Integer.valueOf(i), Sets.newConcurrentHashSet());
            if (putIfAbsent != null) {
                pair = putIfAbsent;
            }
        }
        AdjustableSemaphore left = pair.getLeft();
        boolean tryAcquire = left.tryAcquire();
        boolean z = false;
        while (!tryAcquire) {
            try {
                z = this.unsentRequestPermit.tryAcquire(this.unsentWaitMsecs, TimeUnit.MILLISECONDS);
                if (z) {
                    break;
                }
                tryAcquire = left.tryAcquire();
                if (tryAcquire) {
                    break;
                } else {
                    this.nettyClient.logAndSanityCheck();
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException("shouldSend: failed while waiting on the unsent request cache to have some more room for extra unsent requests!");
            }
        }
        if (z) {
            Deque<WritableRequest> deque = this.perWorkerUnsentRequestMap.get(Integer.valueOf(i));
            synchronized (deque) {
                if (!left.tryAcquire()) {
                    this.aggregateUnsentRequests.getAndIncrement();
                    deque.add(writableRequest);
                    return;
                }
                this.unsentRequestPermit.release();
            }
        }
        this.nettyClient.doSend(i, writableRequest);
    }

    private boolean shouldIgnoreCredit(int i) {
        return ((short) ((i >> 30) & 1)) == 1;
    }

    private short getCredit(int i) {
        return (short) ((i >> 16) & 16383);
    }

    private int getTimestamp(int i) {
        return i & 65535;
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public AckSignalFlag getAckSignalFlag(int i) {
        return AckSignalFlag.values()[(i >> 31) & 1];
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public int calculateResponse(AckSignalFlag ackSignalFlag, int i) {
        boolean masterInvolved = this.nettyClient.masterInvolved(i);
        if (!masterInvolved && this.maxOpenRequestsPerWorker == 0) {
            synchronized (this.workersToResume) {
                this.workersToResume.add(Integer.valueOf(i));
            }
        }
        return (ackSignalFlag.ordinal() << 31) | ((masterInvolved ? 1 : 0) << 30) | (this.maxOpenRequestsPerWorker << 16) | ((int) (this.nettyClient.getNextRequestId(i).longValue() & 65535));
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public void shutdown() {
        synchronized (this.workersToResume) {
            this.shouldTerminate = true;
            this.workersToResume.notifyAll();
        }
        try {
            this.resumeThreadResult.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException("shutdown: caught exception whilegetting result of resume-sender thread");
        }
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public void logInfo() {
        if (LOG.isInfoEnabled()) {
            HashMap newHashMap = Maps.newHashMap();
            for (Map.Entry<Integer, Deque<WritableRequest>> entry : this.perWorkerUnsentRequestMap.entrySet()) {
                newHashMap.put(entry.getKey(), Integer.valueOf(entry.getValue().size()));
            }
            ArrayList newArrayList = Lists.newArrayList(newHashMap.entrySet());
            Collections.sort(newArrayList, new Comparator<Map.Entry<Integer, Integer>>() { // from class: org.apache.giraph.comm.flow_control.CreditBasedFlowControl.2
                @Override // java.util.Comparator
                public int compare(Map.Entry<Integer, Integer> entry2, Map.Entry<Integer, Integer> entry3) {
                    int intValue = entry2.getValue().intValue();
                    int intValue2 = entry3.getValue().intValue();
                    if (intValue < intValue2) {
                        return 1;
                    }
                    return intValue == intValue2 ? 0 : -1;
                }
            });
            StringBuilder sb = new StringBuilder();
            sb.append("logInfo: ").append(this.aggregateUnsentRequests.get()).append(" unsent requests in total. ");
            int min = Math.min(10, newArrayList.size());
            for (int i = 0; i < min; i++) {
                sb.append(((Map.Entry) newArrayList.get(i)).getValue()).append(" unsent requests for taskId=").append(((Map.Entry) newArrayList.get(i)).getKey()).append(" (credit=").append(this.perWorkerOpenRequestMap.get(((Map.Entry) newArrayList.get(i)).getKey()).getKey().getMaxPermits()).append("), ");
            }
            LOG.info(sb);
        }
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public void waitAllRequests() {
        while (true) {
            synchronized (this.aggregateUnsentRequests) {
                if (this.aggregateUnsentRequests.get() == 0) {
                    return;
                }
                try {
                    this.aggregateUnsentRequests.wait(this.waitingRequestMsecs);
                    if (this.aggregateUnsentRequests.get() == 0) {
                        return;
                    } else {
                        this.nettyClient.logAndSanityCheck();
                    }
                } catch (InterruptedException e) {
                    throw new IllegalStateException("waitAllRequests: failed while waiting on open/cached requests");
                }
            }
        }
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public int getNumberOfUnsentRequests() {
        return this.aggregateUnsentRequests.get();
    }

    @Override // org.apache.giraph.comm.flow_control.FlowControl
    public void messageAckReceived(int i, long j, int i2) {
        boolean shouldIgnoreCredit = shouldIgnoreCredit(i2);
        short credit = getCredit(i2);
        int timestamp = getTimestamp(i2);
        MutablePair mutablePair = (MutablePair) this.perWorkerOpenRequestMap.get(Integer.valueOf(i));
        AdjustableSemaphore adjustableSemaphore = (AdjustableSemaphore) mutablePair.getLeft();
        if (!this.resumeRequestsId.get(Integer.valueOf(i)).remove(Long.valueOf(j))) {
            adjustableSemaphore.release();
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("messageAckReceived: ACK of resume received from " + i + " timestamp=" + timestamp);
        }
        if (!shouldIgnoreCredit) {
            synchronized (mutablePair) {
                if (compareTimestamps(timestamp, ((Integer) mutablePair.getRight()).intValue()) > 0) {
                    mutablePair.setRight(Integer.valueOf(timestamp));
                    adjustableSemaphore.setMaxPermits(credit);
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("messageAckReceived: received out-of-order messages.Received timestamp=" + timestamp + " and current timestamp=" + mutablePair.getRight());
                }
            }
        }
        trySendCachedRequests(i);
    }

    private void trySendCachedRequests(int i) {
        WritableRequest pollFirst;
        Deque<WritableRequest> deque = this.perWorkerUnsentRequestMap.get(Integer.valueOf(i));
        AdjustableSemaphore left = this.perWorkerOpenRequestMap.get(Integer.valueOf(i)).getLeft();
        while (true) {
            synchronized (deque) {
                pollFirst = deque.pollFirst();
                if (pollFirst == null) {
                    return;
                }
                if (!left.tryAcquire()) {
                    deque.offerFirst(pollFirst);
                    return;
                }
            }
            this.unsentRequestPermit.release();
            this.nettyClient.doSend(i, pollFirst);
            if (this.aggregateUnsentRequests.decrementAndGet() == 0) {
                synchronized (this.aggregateUnsentRequests) {
                    this.aggregateUnsentRequests.notifyAll();
                }
            }
        }
    }

    public void updateCredit(short s) {
        short max = (short) Math.max(0, Math.min(16383, (int) s));
        if (this.maxOpenRequestsPerWorker != 0 || max == 0) {
            this.maxOpenRequestsPerWorker = max;
            return;
        }
        this.maxOpenRequestsPerWorker = max;
        synchronized (this.workersToResume) {
            this.workersToResume.notifyAll();
        }
    }

    private int compareTimestamps(int i, int i2) {
        int i3 = i - i2;
        return Math.abs(i3) < 32767 ? i3 : -i3;
    }

    public void processResumeSignal(int i, short s, long j) {
        int i2 = (int) (j & 65535);
        if (LOG.isDebugEnabled()) {
            LOG.debug("processResumeSignal: resume signal from " + i + " with timestamp=" + i2);
        }
        MutablePair mutablePair = (MutablePair) this.perWorkerOpenRequestMap.get(Integer.valueOf(i));
        synchronized (mutablePair) {
            if (compareTimestamps(i2, ((Integer) mutablePair.getRight()).intValue()) > 0) {
                mutablePair.setRight(Integer.valueOf(i2));
                ((AdjustableSemaphore) mutablePair.getLeft()).setMaxPermits(s);
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("processResumeSignal: received out-of-order messages. Received timestamp=" + i2 + " and current timestamp=" + mutablePair.getRight());
            }
        }
        trySendCachedRequests(i);
    }
}
