package org.apache.giraph.comm.netty;

import java.io.IOException;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
import org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest;
import org.apache.giraph.comm.requests.SendReducedToMasterRequest;
import org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;

/* loaded from: input_file:org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.class */
public class NettyWorkerAggregatorRequestProcessor implements WorkerAggregatorRequestProcessor {
    private final Progressable progressable;
    private final WorkerClient<?, ?, ?> workerClient;
    private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
    private final SendGlobalCommCache sendReducedValuesCache = new SendGlobalCommCache(false);
    private final int maxBytesPerAggregatorRequest;

    public NettyWorkerAggregatorRequestProcessor(Progressable progressable, ImmutableClassesGiraphConfiguration<?, ?, ?> immutableClassesGiraphConfiguration, CentralizedServiceWorker<?, ?, ?> centralizedServiceWorker) {
        this.serviceWorker = centralizedServiceWorker;
        this.workerClient = centralizedServiceWorker.getWorkerClient();
        this.progressable = progressable;
        this.maxBytesPerAggregatorRequest = immutableClassesGiraphConfiguration.getInt(AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 1048576);
    }

    @Override // org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor
    public boolean sendReducedValue(String str, Writable writable) throws IOException {
        WorkerInfo owner = AggregatorUtils.getOwner(str, this.serviceWorker.getWorkerInfoList());
        if (isThisWorker(owner)) {
            return false;
        }
        if (this.sendReducedValuesCache.addValue(Integer.valueOf(owner.getTaskId()), str, GlobalCommType.REDUCED_VALUE, writable) < this.maxBytesPerAggregatorRequest) {
            return true;
        }
        flushAggregatorsToWorker(owner);
        return true;
    }

    @Override // org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor
    public void flush() throws IOException {
        for (WorkerInfo workerInfo : this.serviceWorker.getWorkerInfoList()) {
            if (!isThisWorker(workerInfo)) {
                this.sendReducedValuesCache.addSpecialCount(Integer.valueOf(workerInfo.getTaskId()));
                flushAggregatorsToWorker(workerInfo);
                this.progressable.progress();
            }
        }
        this.sendReducedValuesCache.reset();
    }

    private void flushAggregatorsToWorker(WorkerInfo workerInfo) {
        this.workerClient.sendWritableRequest(Integer.valueOf(workerInfo.getTaskId()), new SendWorkerAggregatorsRequest(this.sendReducedValuesCache.removeSerialized(Integer.valueOf(workerInfo.getTaskId())), this.serviceWorker.getWorkerInfo().getTaskId()));
    }

    @Override // org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor
    public void sendReducedValuesToMaster(byte[] bArr) throws IOException {
        this.workerClient.sendWritableRequest(Integer.valueOf(this.serviceWorker.getMasterInfo().getTaskId()), new SendReducedToMasterRequest(bArr));
    }

    @Override // org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor
    public void distributeReducedValues(Iterable<byte[]> iterable) throws IOException {
        for (byte[] bArr : iterable) {
            for (WorkerInfo workerInfo : this.serviceWorker.getWorkerInfoList()) {
                if (!isThisWorker(workerInfo)) {
                    this.workerClient.sendWritableRequest(Integer.valueOf(workerInfo.getTaskId()), new SendAggregatorsToWorkerRequest(bArr, this.serviceWorker.getWorkerInfo().getTaskId()));
                }
                this.progressable.progress();
            }
        }
    }

    private boolean isThisWorker(WorkerInfo workerInfo) {
        return this.serviceWorker.getWorkerInfo().getTaskId() == workerInfo.getTaskId();
    }
}
