package org.apache.giraph.comm;

import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.EdgeStore;
import org.apache.giraph.edge.EdgeStoreFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
import org.apache.giraph.ooc.data.DiskBackedMessageStore;
import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.partition.SimplePartitionStore;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/ServerData.class */
public class ServerData<I extends WritableComparable, V extends Writable, E extends Writable> {
    private static final Logger LOG = Logger.getLogger(ServerData.class);
    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
    private volatile PartitionStore<I, V, E> partitionStore;
    private final EdgeStore<I, V, E> edgeStore;
    private volatile MessageStore<I, Writable> incomingMessageStore;
    private volatile MessageStore<I, Writable> currentMessageStore;
    private final OwnerAggregatorServerData ownerAggregatorData;
    private final AllAggregatorServerData allAggregatorData;
    private final CentralizedServiceWorker<I, V, E> serviceWorker;
    private final Mapper<?, ?, ?, ?>.Context context;
    private final OutOfCoreEngine oocEngine;
    private ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>> oldPartitionMutations = Maps.newConcurrentMap();
    private ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>> partitionMutations = Maps.newConcurrentMap();
    private volatile List<Writable> currentWorkerToWorkerMessages = Collections.synchronizedList(new ArrayList());
    private volatile List<Writable> incomingWorkerToWorkerMessages = Collections.synchronizedList(new ArrayList());
    private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>> messageStoreFactory = createMessageStoreFactory();

    public ServerData(CentralizedServiceWorker<I, V, E> centralizedServiceWorker, WorkerServer workerServer, ImmutableClassesGiraphConfiguration<I, V, E> immutableClassesGiraphConfiguration, Mapper<?, ?, ?, ?>.Context context) {
        this.serviceWorker = centralizedServiceWorker;
        this.conf = immutableClassesGiraphConfiguration;
        EdgeStoreFactory<I, V, E> createEdgeStoreFactory = immutableClassesGiraphConfiguration.createEdgeStoreFactory();
        createEdgeStoreFactory.initialize(centralizedServiceWorker, immutableClassesGiraphConfiguration, context);
        EdgeStore<I, V, E> newStore = createEdgeStoreFactory.newStore();
        SimplePartitionStore simplePartitionStore = new SimplePartitionStore(immutableClassesGiraphConfiguration, context);
        if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(immutableClassesGiraphConfiguration)) {
            this.oocEngine = new OutOfCoreEngine(immutableClassesGiraphConfiguration, centralizedServiceWorker, workerServer);
            this.partitionStore = new DiskBackedPartitionStore(simplePartitionStore, immutableClassesGiraphConfiguration, context, this.oocEngine);
            this.edgeStore = new DiskBackedEdgeStore(newStore, immutableClassesGiraphConfiguration, this.oocEngine);
        } else {
            this.partitionStore = simplePartitionStore;
            this.edgeStore = newStore;
            this.oocEngine = null;
        }
        this.ownerAggregatorData = new OwnerAggregatorServerData(context);
        this.allAggregatorData = new AllAggregatorServerData(context, immutableClassesGiraphConfiguration);
        this.context = context;
    }

    private MessageStoreFactory<I, Writable, MessageStore<I, Writable>> createMessageStoreFactory() {
        MessageStoreFactory<I, Writable, MessageStore<I, Writable>> messageStoreFactory = (MessageStoreFactory) ReflectionUtils.newInstance(GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.get(this.conf));
        messageStoreFactory.initialize(this.serviceWorker, this.conf);
        return messageStoreFactory;
    }

    public OutOfCoreEngine getOocEngine() {
        return this.oocEngine;
    }

    public EdgeStore<I, V, E> getEdgeStore() {
        return this.edgeStore;
    }

    public PartitionStore<I, V, E> getPartitionStore() {
        return this.partitionStore;
    }

    public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
        return this.incomingMessageStore;
    }

    public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
        return this.currentMessageStore;
    }

    public void resetMessageStores() {
        if (this.currentMessageStore != null) {
            this.currentMessageStore.clearAll();
            this.currentMessageStore = null;
        }
        if (this.incomingMessageStore != null) {
            this.incomingMessageStore.clearAll();
            this.incomingMessageStore = null;
        }
        prepareSuperstep();
    }

    public void prepareSuperstep() {
        MessageStore<I, Writable> diskBackedMessageStore;
        if (this.currentMessageStore != null) {
            this.currentMessageStore.clearAll();
        }
        if (this.incomingMessageStore != null) {
            diskBackedMessageStore = this.incomingMessageStore;
        } else {
            MessageStore<I, Writable> newStore = this.messageStoreFactory.newStore(this.conf.getIncomingMessageClasses());
            diskBackedMessageStore = this.oocEngine == null ? newStore : new DiskBackedMessageStore(this.conf, this.oocEngine, newStore, this.conf.getIncomingMessageClasses().useMessageCombiner(), this.serviceWorker.getSuperstep());
        }
        MessageStore<I, Writable> newStore2 = this.messageStoreFactory.newStore(this.conf.getOutgoingMessageClasses());
        MessageStore<I, Writable> diskBackedMessageStore2 = this.oocEngine == null ? newStore2 : new DiskBackedMessageStore(this.conf, this.oocEngine, newStore2, this.conf.getOutgoingMessageClasses().useMessageCombiner(), this.serviceWorker.getSuperstep() + 1);
        if (this.oocEngine != null) {
            this.oocEngine.getSuperstepLock().writeLock().lock();
        }
        this.currentMessageStore = diskBackedMessageStore;
        this.incomingMessageStore = diskBackedMessageStore2;
        if (this.oocEngine != null) {
            this.oocEngine.reset();
            this.oocEngine.getSuperstepLock().writeLock().unlock();
        }
        this.currentMessageStore.finalizeStore();
        this.currentWorkerToWorkerMessages = this.incomingWorkerToWorkerMessages;
        this.incomingWorkerToWorkerMessages = Collections.synchronizedList(new ArrayList());
    }

    public ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>> getPartitionMutations() {
        return this.partitionMutations;
    }

    public OwnerAggregatorServerData getOwnerAggregatorData() {
        return this.ownerAggregatorData;
    }

    public AllAggregatorServerData getAllAggregatorData() {
        return this.allAggregatorData;
    }

    public CentralizedServiceWorker<I, V, E> getServiceWorker() {
        return this.serviceWorker;
    }

    public List<Writable> getAndClearCurrentWorkerToWorkerMessages() {
        List<Writable> list = this.currentWorkerToWorkerMessages;
        this.currentWorkerToWorkerMessages = null;
        return list;
    }

    public void addIncomingWorkerToWorkerMessage(Writable writable) {
        this.incomingWorkerToWorkerMessages.add(writable);
    }

    public List<Writable> getCurrentWorkerToWorkerMessages() {
        return this.currentWorkerToWorkerMessages;
    }

    public void prepareResolveMutations() {
        this.oldPartitionMutations = this.partitionMutations;
        this.partitionMutations = Maps.newConcurrentMap();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void resolvePartitionMutation(Partition<I, V, E> partition) {
        Integer valueOf = Integer.valueOf(partition.getId());
        VertexResolver<I, V, E> createVertexResolver = this.conf.createVertexResolver();
        ConcurrentMap<I, VertexMutations<I, V, E>> concurrentMap = this.oldPartitionMutations.get(valueOf);
        boolean ignoreExistingVertices = this.conf.getIncomingMessageClasses().ignoreExistingVertices();
        if (concurrentMap != null) {
            for (Map.Entry<I, VertexMutations<I, V, E>> entry : concurrentMap.entrySet()) {
                I key = entry.getKey();
                Vertex vertex = partition.getVertex(key);
                VertexMutations<I, V, E> value = entry.getValue();
                Vertex resolve = createVertexResolver.resolve(key, vertex, value, !ignoreExistingVertices && getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
                if (LOG.isDebugEnabled()) {
                    LOG.debug("resolvePartitionMutations: Resolved vertex index " + key + " in partition index " + valueOf + " with original vertex " + vertex + ", returned vertex " + resolve + " on superstep " + this.serviceWorker.getSuperstep() + " with mutations " + value);
                }
                if (resolve != null) {
                    partition.putVertex(resolve);
                } else if (vertex != null) {
                    partition.removeVertex(key);
                    if (!ignoreExistingVertices) {
                        getCurrentMessageStore().clearVertexMessages(key);
                    }
                }
                this.context.progress();
            }
        }
        if (ignoreExistingVertices) {
            return;
        }
        Iterable<I> partitionDestinationVertices = getCurrentMessageStore().getPartitionDestinationVertices(valueOf.intValue());
        if (Iterables.isEmpty(partitionDestinationVertices)) {
            return;
        }
        for (I i : partitionDestinationVertices) {
            if (partition.getVertex(i) == null) {
                Vertex resolve2 = createVertexResolver.resolve(i, null, null, true);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("resolvePartitionMutations: A non-existing vertex has message(s). Added vertex index " + i + " in partition index " + valueOf + ", vertex = " + resolve2 + ", on superstep " + this.serviceWorker.getSuperstep());
                }
                if (resolve2 != null) {
                    partition.putVertex(resolve2);
                }
                this.context.progress();
            }
        }
    }

    public void waitForComplete() {
        if (this.incomingMessageStore instanceof AsyncMessageStoreWrapper) {
            ((AsyncMessageStoreWrapper) this.incomingMessageStore).waitToComplete();
        }
    }
}
