package org.apache.giraph.comm.netty;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerServer;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/netty/NettyWorkerServer.class */
public class NettyWorkerServer<I extends WritableComparable, V extends Writable, E extends Writable> implements WorkerServer<I, V, E> {
    private static final Logger LOG = Logger.getLogger(NettyWorkerServer.class);
    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
    private final CentralizedServiceWorker<I, V, E> service;
    private final NettyServer nettyServer;
    private final ServerData<I, V, E> serverData;
    private final Mapper<?, ?, ?, ?>.Context context;

    public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E> immutableClassesGiraphConfiguration, CentralizedServiceWorker<I, V, E> centralizedServiceWorker, Mapper<?, ?, ?, ?>.Context context, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.conf = immutableClassesGiraphConfiguration;
        this.service = centralizedServiceWorker;
        this.context = context;
        this.serverData = new ServerData<>(centralizedServiceWorker, immutableClassesGiraphConfiguration, createMessageStoreFactory(), context);
        this.nettyServer = new NettyServer(immutableClassesGiraphConfiguration, new WorkerRequestServerHandler.Factory(this.serverData), centralizedServiceWorker.getWorkerInfo(), context, uncaughtExceptionHandler);
        this.nettyServer.start();
    }

    private MessageStoreFactory<I, Writable, MessageStore<I, Writable>> createMessageStoreFactory() {
        MessageStoreFactory<I, Writable, MessageStore<I, Writable>> messageStoreFactory = (MessageStoreFactory) ReflectionUtils.newInstance(GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.get(this.conf));
        messageStoreFactory.initialize(this.service, this.conf);
        return messageStoreFactory;
    }

    @Override // org.apache.giraph.comm.WorkerServer
    public InetSocketAddress getMyAddress() {
        return this.nettyServer.getMyAddress();
    }

    @Override // org.apache.giraph.comm.WorkerServer
    public void prepareSuperstep() {
        this.serverData.prepareSuperstep();
        resolveMutations();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void resolveMutations() {
        HashMultimap create = HashMultimap.create(this.service.getPartitionStore().getNumPartitions(), 100);
        Iterator<Map.Entry<I, VertexMutations<I, V, E>>> it2 = this.serverData.getVertexMutations().entrySet().iterator();
        while (it2.hasNext()) {
            I key = it2.next().getKey();
            if (!create.put(Integer.valueOf(this.service.getPartitionId(key)), key)) {
                throw new IllegalStateException("resolveMutations: Already has missing vertex on this worker for " + key);
            }
        }
        for (Integer num : this.service.getPartitionStore().getPartitionIds()) {
            Iterable<I> partitionDestinationVertices = this.serverData.getCurrentMessageStore().getPartitionDestinationVertices(num.intValue());
            if (!Iterables.isEmpty(partitionDestinationVertices)) {
                Partition<I, V, E> orCreatePartition = this.service.getPartitionStore().getOrCreatePartition(num);
                for (I i : partitionDestinationVertices) {
                    if (orCreatePartition.getVertex(i) == null && !create.put(num, i)) {
                        throw new IllegalStateException("resolveMutations: Already has missing vertex on this worker for " + i);
                    }
                }
                this.service.getPartitionStore().putPartition(orCreatePartition);
            }
        }
        VertexResolver<I, V, E> createVertexResolver = this.conf.createVertexResolver();
        for (Map.Entry entry : create.asMap().entrySet()) {
            Partition<I, V, E> orCreatePartition2 = this.service.getPartitionStore().getOrCreatePartition((Integer) entry.getKey());
            for (WritableComparable writableComparable : (Collection) entry.getValue()) {
                Vertex vertex = orCreatePartition2.getVertex(writableComparable);
                VertexMutations<I, V, E> vertexMutations = null;
                VertexMutations<I, V, E> vertexMutations2 = this.serverData.getVertexMutations().get(writableComparable);
                if (vertexMutations2 != null) {
                    synchronized (vertexMutations2) {
                        vertexMutations = vertexMutations2.copy();
                    }
                    this.serverData.getVertexMutations().remove(writableComparable);
                }
                Vertex resolve = createVertexResolver.resolve(writableComparable, vertex, vertexMutations, this.serverData.getCurrentMessageStore().hasMessagesForVertex(writableComparable));
                this.context.progress();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("resolveMutations: Resolved vertex index " + writableComparable + " with original vertex " + vertex + ", returned vertex " + resolve + " on superstep " + this.service.getSuperstep() + " with mutations " + vertexMutations);
                }
                if (resolve != null) {
                    orCreatePartition2.putVertex(resolve);
                } else if (vertex != null) {
                    orCreatePartition2.removeVertex(vertex.getId());
                }
            }
            this.service.getPartitionStore().putPartition(orCreatePartition2);
        }
        if (!this.serverData.getVertexMutations().isEmpty()) {
            throw new IllegalStateException("resolveMutations: Illegally still has " + this.serverData.getVertexMutations().size() + " mutations left.");
        }
    }

    @Override // org.apache.giraph.comm.WorkerServer
    public ServerData<I, V, E> getServerData() {
        return this.serverData;
    }

    @Override // org.apache.giraph.comm.WorkerServer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.nettyServer.stop();
    }
}
