package org.apache.giraph.edge;

import com.google.common.base.Preconditions;
import com.google.common.collect.MapMaker;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.ProgressCounter;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ThreadLocalProgressCounter;
import org.apache.giraph.utils.Trimmable;
import org.apache.giraph.utils.VertexIdEdgeIterator;
import org.apache.giraph.utils.VertexIdEdges;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/edge/AbstractEdgeStore.class */
public abstract class AbstractEdgeStore<I extends WritableComparable, V extends Writable, E extends Writable, K, Et> extends DefaultImmutableClassesGiraphConfigurable<I, V, E> implements EdgeStore<I, V, E> {
    public static final ThreadLocalProgressCounter PROGRESS_COUNTER = new ThreadLocalProgressCounter();
    private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
    protected CentralizedServiceWorker<I, V, E> service;
    protected ImmutableClassesGiraphConfiguration<I, V, E> configuration;
    protected Progressable progressable;
    protected ConcurrentMap<Integer, Map<K, OutEdges<I, E>>> transientEdges;
    protected boolean reuseEdgeObjects;
    protected boolean useInputOutEdges;
    private volatile boolean hasEdgesOnDisk = false;
    private CreateSourceVertexCallback<I> createSourceVertexCallback;

    public AbstractEdgeStore(CentralizedServiceWorker<I, V, E> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, V, E> immutableClassesGiraphConfiguration, Progressable progressable) {
        this.service = centralizedServiceWorker;
        this.configuration = immutableClassesGiraphConfiguration;
        this.progressable = progressable;
        this.transientEdges = new MapMaker().concurrencyLevel(immutableClassesGiraphConfiguration.getNettyServerExecutionConcurrency()).makeMap();
        this.reuseEdgeObjects = immutableClassesGiraphConfiguration.reuseEdgeObjects();
        this.useInputOutEdges = immutableClassesGiraphConfiguration.useInputOutEdges();
        this.createSourceVertexCallback = GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK.newInstance(immutableClassesGiraphConfiguration);
    }

    protected abstract I getVertexId(Et et, I i);

    protected abstract I createVertexId(Et et);

    /* renamed from: getPartitionEdges */
    protected abstract Map<K, OutEdges<I, E>> mo93getPartitionEdges(int i);

    protected abstract OutEdges<I, E> getPartitionEdges(Et et);

    protected abstract void writeVertexKey(K k, DataOutput dataOutput) throws IOException;

    protected abstract K readVertexKey(DataInput dataInput) throws IOException;

    protected abstract Iterator<Et> getPartitionEdgesIterator(Map<K, OutEdges<I, E>> map);

    @Override // org.apache.giraph.edge.EdgeStore
    public boolean hasEdgesForPartition(int i) {
        return this.transientEdges.containsKey(Integer.valueOf(i));
    }

    @Override // org.apache.giraph.edge.EdgeStore
    public void writePartitionEdgeStore(int i, DataOutput dataOutput) throws IOException {
        Map<K, OutEdges<I, E>> remove = this.transientEdges.remove(Integer.valueOf(i));
        if (remove != null) {
            dataOutput.writeInt(remove.size());
            if (remove.size() > 0) {
                this.hasEdgesOnDisk = true;
            }
            for (Map.Entry<K, OutEdges<I, E>> entry : remove.entrySet()) {
                writeVertexKey(entry.getKey(), dataOutput);
                entry.getValue().write(dataOutput);
            }
        }
    }

    @Override // org.apache.giraph.edge.EdgeStore
    public void readPartitionEdgeStore(int i, DataInput dataInput) throws IOException {
        Preconditions.checkState(!this.transientEdges.containsKey(Integer.valueOf(i)), "readPartitionEdgeStore: reading a partition that is already there in the partition store (impossible)");
        Map<K, OutEdges<I, E>> mo93getPartitionEdges = mo93getPartitionEdges(i);
        int readInt = dataInput.readInt();
        for (int i2 = 0; i2 < readInt; i2++) {
            K readVertexKey = readVertexKey(dataInput);
            OutEdges<I, E> createAndInitializeInputOutEdges = this.configuration.createAndInitializeInputOutEdges();
            createAndInitializeInputOutEdges.readFields(dataInput);
            mo93getPartitionEdges.put(readVertexKey, createAndInitializeInputOutEdges);
        }
    }

    protected abstract OutEdges<I, E> getVertexOutEdges(VertexIdEdgeIterator<I, E> vertexIdEdgeIterator, Map<K, OutEdges<I, E>> map);

    @Override // org.apache.giraph.edge.EdgeStore
    public void addPartitionEdges(int i, VertexIdEdges<I, E> vertexIdEdges) {
        Map<K, OutEdges<I, E>> mo93getPartitionEdges = mo93getPartitionEdges(i);
        VertexIdEdgeIterator<I, E> vertexIdEdgeIterator = vertexIdEdges.getVertexIdEdgeIterator();
        while (vertexIdEdgeIterator.hasNext()) {
            vertexIdEdgeIterator.next();
            Edge<I, E> currentEdge = this.reuseEdgeObjects ? vertexIdEdgeIterator.getCurrentEdge() : vertexIdEdgeIterator.releaseCurrentEdge();
            OutEdges<I, E> vertexOutEdges = getVertexOutEdges(vertexIdEdgeIterator, mo93getPartitionEdges);
            synchronized (vertexOutEdges) {
                vertexOutEdges.add(currentEdge);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OutEdges<I, E> convertInputToComputeEdges(OutEdges<I, E> outEdges) {
        return !this.useInputOutEdges ? outEdges : this.configuration.createAndInitializeOutEdges(outEdges);
    }

    @Override // org.apache.giraph.edge.EdgeStore
    public void moveEdgesToVertices() {
        if (this.transientEdges.isEmpty() && !this.hasEdgesOnDisk) {
            if (LOG.isInfoEnabled()) {
                LOG.info("moveEdgesToVertices: No edges to move");
                return;
            }
            return;
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("moveEdgesToVertices: Moving incoming edges to vertices. Using " + this.createSourceVertexCallback);
        }
        this.service.getPartitionStore().startIteration();
        ProgressableUtils.getResultsWithNCallables(new CallableFactory<Void>() { // from class: org.apache.giraph.edge.AbstractEdgeStore.1
            @Override // org.apache.giraph.utils.CallableFactory
            /* renamed from: newCallable */
            public Callable<Void> newCallable2(int i) {
                return new Callable<Void>() { // from class: org.apache.giraph.edge.AbstractEdgeStore.1.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        I createVertexId = AbstractEdgeStore.this.configuration.createVertexId();
                        OutOfCoreEngine oocEngine = AbstractEdgeStore.this.service.getServerData().getOocEngine();
                        if (oocEngine != null) {
                            oocEngine.processingThreadStart();
                        }
                        ProgressCounter progressCounter = AbstractEdgeStore.PROGRESS_COUNTER.get();
                        while (true) {
                            Partition<I, V, E> nextPartition = AbstractEdgeStore.this.service.getPartitionStore().getNextPartition();
                            if (nextPartition == 0) {
                                break;
                            }
                            Map<K, OutEdges<I, E>> remove = AbstractEdgeStore.this.transientEdges.remove(Integer.valueOf(nextPartition.getId()));
                            if (remove == null) {
                                AbstractEdgeStore.this.service.getPartitionStore().putPartition(nextPartition);
                            } else {
                                Iterator<Et> partitionEdgesIterator = AbstractEdgeStore.this.getPartitionEdgesIterator(remove);
                                int i2 = 0;
                                while (partitionEdgesIterator.hasNext()) {
                                    if (oocEngine != null) {
                                        i2++;
                                        if ((i2 & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
                                            oocEngine.activeThreadCheckIn();
                                        }
                                    }
                                    Et next = partitionEdgesIterator.next();
                                    WritableComparable vertexId = AbstractEdgeStore.this.getVertexId(next, createVertexId);
                                    OutEdges convertInputToComputeEdges = AbstractEdgeStore.this.convertInputToComputeEdges(AbstractEdgeStore.this.getPartitionEdges((AbstractEdgeStore) next));
                                    Vertex vertex = nextPartition.getVertex(vertexId);
                                    if (vertex != null) {
                                        if (vertex.getNumEdges() == 0) {
                                            vertex.setEdges(convertInputToComputeEdges);
                                        } else {
                                            Iterator<Edge<I, E>> it = convertInputToComputeEdges.iterator();
                                            while (it.hasNext()) {
                                                vertex.addEdge((Edge) it.next());
                                            }
                                        }
                                        if (vertex instanceof Trimmable) {
                                            ((Trimmable) vertex).trim();
                                        }
                                        nextPartition.saveVertex(vertex);
                                    } else if (AbstractEdgeStore.this.createSourceVertexCallback.shouldCreateSourceVertex(vertexId)) {
                                        Vertex<I, V, E> createVertex = AbstractEdgeStore.this.configuration.createVertex();
                                        createVertex.initialize(AbstractEdgeStore.this.createVertexId(next), AbstractEdgeStore.this.configuration.createVertexValue(), convertInputToComputeEdges);
                                        nextPartition.putVertex(createVertex);
                                    }
                                    progressCounter.inc();
                                    partitionEdgesIterator.remove();
                                }
                                AbstractEdgeStore.this.service.getPartitionStore().putPartition(nextPartition);
                            }
                        }
                        if (oocEngine == null) {
                            return null;
                        }
                        oocEngine.processingThreadFinish();
                        return null;
                    }
                };
            }
        }, this.configuration.getNumInputSplitsThreads(), "move-edges-%d", this.progressable);
        this.transientEdges.clear();
        if (LOG.isInfoEnabled()) {
            LOG.info("moveEdgesToVertices: Finished moving incoming edges to vertices.");
        }
    }
}
