package org.apache.giraph.comm.netty;

import java.io.IOException;
import java.lang.Thread;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Progressable;

/* loaded from: input_file:org/apache/giraph/comm/netty/NettyMasterClient.class */
public class NettyMasterClient implements MasterClient {
    private final NettyClient nettyClient;
    private final CentralizedServiceMaster<?, ?, ?> service;
    private final SendGlobalCommCache sendGlobalCommCache = new SendGlobalCommCache(true);
    private final int maxBytesPerAggregatorRequest;
    private final Progressable progressable;

    public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, CentralizedServiceMaster<?, ?, ?> centralizedServiceMaster, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.nettyClient = new NettyClient(context, immutableClassesGiraphConfiguration, centralizedServiceMaster.getMasterInfo(), uncaughtExceptionHandler);
        this.service = centralizedServiceMaster;
        this.progressable = context;
        this.maxBytesPerAggregatorRequest = immutableClassesGiraphConfiguration.getInt(AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 1048576);
    }

    @Override // org.apache.giraph.comm.MasterClient
    public void openConnections() {
        this.nettyClient.connectAllAddresses(this.service.getWorkerInfoList());
    }

    @Override // org.apache.giraph.comm.MasterClient
    public void sendToOwner(String str, GlobalCommType globalCommType, Writable writable) throws IOException {
        WorkerInfo owner = AggregatorUtils.getOwner(str, this.service.getWorkerInfoList());
        if (this.sendGlobalCommCache.addValue(Integer.valueOf(owner.getTaskId()), str, globalCommType, writable) >= this.maxBytesPerAggregatorRequest) {
            flushAggregatorsToWorker(owner);
        }
    }

    @Override // org.apache.giraph.comm.MasterClient
    public void finishSendingValues() throws IOException {
        for (WorkerInfo workerInfo : this.service.getWorkerInfoList()) {
            this.sendGlobalCommCache.addSpecialCount(Integer.valueOf(workerInfo.getTaskId()));
            flushAggregatorsToWorker(workerInfo);
            this.progressable.progress();
        }
        this.sendGlobalCommCache.reset();
    }

    private void flushAggregatorsToWorker(WorkerInfo workerInfo) {
        this.nettyClient.sendWritableRequest(Integer.valueOf(workerInfo.getTaskId()), new SendAggregatorsToOwnerRequest(this.sendGlobalCommCache.removeSerialized(Integer.valueOf(workerInfo.getTaskId())), this.service.getMasterInfo().getTaskId()));
    }

    @Override // org.apache.giraph.comm.MasterClient
    public void flush() {
        this.nettyClient.waitAllRequests();
    }

    @Override // org.apache.giraph.comm.MasterClient
    public void closeConnections() {
        this.nettyClient.stop();
    }
}
