package org.apache.giraph.comm.netty;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.yammer.metrics.core.Counter;
import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Map;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/netty/NettyWorkerClient.class */
public class NettyWorkerClient<I extends WritableComparable, V extends Writable, E extends Writable> implements WorkerClient<I, V, E>, ResetSuperstepMetricsObserver {
    private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
    private final NettyClient nettyClient;
    private final CentralizedServiceWorker<I, V, E> service;
    private final Map<RequestType, Counter> superstepRequestCounters = Maps.newHashMap();

    public NettyWorkerClient(Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> immutableClassesGiraphConfiguration, CentralizedServiceWorker<I, V, E> centralizedServiceWorker, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.nettyClient = new NettyClient(context, immutableClassesGiraphConfiguration, centralizedServiceWorker.getWorkerInfo(), uncaughtExceptionHandler);
        this.conf = immutableClassesGiraphConfiguration;
        this.service = centralizedServiceWorker;
        GiraphMetrics.get().addSuperstepResetObserver(this);
    }

    @Override // org.apache.giraph.metrics.ResetSuperstepMetricsObserver
    public void newSuperstep(SuperstepMetricsRegistry superstepMetricsRegistry) {
        this.superstepRequestCounters.clear();
        this.superstepRequestCounters.put(RequestType.SEND_WORKER_VERTICES_REQUEST, superstepMetricsRegistry.getCounter(MetricNames.SEND_VERTEX_REQUESTS));
        this.superstepRequestCounters.put(RequestType.SEND_WORKER_MESSAGES_REQUEST, superstepMetricsRegistry.getCounter(MetricNames.SEND_WORKER_MESSAGES_REQUESTS));
        this.superstepRequestCounters.put(RequestType.SEND_PARTITION_CURRENT_MESSAGES_REQUEST, superstepMetricsRegistry.getCounter(MetricNames.SEND_PARTITION_CURRENT_MESSAGES_REQUESTS));
        this.superstepRequestCounters.put(RequestType.SEND_PARTITION_MUTATIONS_REQUEST, superstepMetricsRegistry.getCounter(MetricNames.SEND_PARTITION_MUTATIONS_REQUESTS));
        this.superstepRequestCounters.put(RequestType.SEND_WORKER_AGGREGATORS_REQUEST, superstepMetricsRegistry.getCounter(MetricNames.SEND_WORKER_AGGREGATORS_REQUESTS));
        this.superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST, superstepMetricsRegistry.getCounter(MetricNames.SEND_AGGREGATORS_TO_MASTER_REQUESTS));
        this.superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST, superstepMetricsRegistry.getCounter(MetricNames.SEND_AGGREGATORS_TO_OWNER_REQUESTS));
        this.superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST, superstepMetricsRegistry.getCounter(MetricNames.SEND_AGGREGATORS_TO_WORKER_REQUESTS));
    }

    public CentralizedServiceWorker<I, V, E> getService() {
        return this.service;
    }

    @Override // org.apache.giraph.comm.WorkerClient
    public void openConnections() {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(this.service.getWorkerInfoList().size());
        for (WorkerInfo workerInfo : this.service.getWorkerInfoList()) {
            if (this.service.getWorkerInfo().getTaskId() != workerInfo.getTaskId()) {
                newArrayListWithCapacity.add(workerInfo);
            }
        }
        newArrayListWithCapacity.add(this.service.getMasterInfo());
        this.nettyClient.connectAllAddresses(newArrayListWithCapacity);
    }

    @Override // org.apache.giraph.comm.WorkerClient
    public PartitionOwner getVertexPartitionOwner(I i) {
        return this.service.getVertexPartitionOwner(i);
    }

    @Override // org.apache.giraph.comm.WorkerClient
    public void sendWritableRequest(int i, WritableRequest writableRequest) {
        Counter counter = this.superstepRequestCounters.get(writableRequest.getType());
        if (counter != null) {
            counter.inc();
        }
        this.nettyClient.sendWritableRequest(i, writableRequest);
    }

    @Override // org.apache.giraph.comm.WorkerClient
    public void waitAllRequests() {
        this.nettyClient.waitAllRequests();
    }

    @Override // org.apache.giraph.comm.WorkerClient
    public void closeConnections() throws IOException {
        this.nettyClient.stop();
    }

    @Override // org.apache.giraph.comm.WorkerClient
    public void setup(boolean z) {
        openConnections();
        if (z) {
            authenticate();
        }
    }

    @Override // org.apache.giraph.comm.WorkerClient
    public void authenticate() {
        this.nettyClient.authenticate();
    }

    @Override // org.apache.giraph.comm.WorkerClient
    public FlowControl getFlowControl() {
        return this.nettyClient.getFlowControl();
    }
}
