package org.apache.giraph.comm.requests;

import com.google.common.collect.Maps;
import com.yammer.metrics.core.Histogram;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/requests/SendPartitionMutationsRequest.class */
public class SendPartitionMutationsRequest<I extends WritableComparable, V extends Writable, E extends Writable> extends WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
    private static final Logger LOG = Logger.getLogger(SendPartitionMutationsRequest.class);
    private int partitionId;
    private Map<I, VertexMutations<I, V, E>> vertexIdMutations;

    public SendPartitionMutationsRequest() {
    }

    public SendPartitionMutationsRequest(int i, Map<I, VertexMutations<I, V, E>> map) {
        this.partitionId = i;
        this.vertexIdMutations = map;
    }

    @Override // org.apache.giraph.comm.requests.WritableRequest
    public void readFieldsRequest(DataInput dataInput) throws IOException {
        this.partitionId = dataInput.readInt();
        int readInt = dataInput.readInt();
        this.vertexIdMutations = Maps.newConcurrentMap();
        for (int i = 0; i < readInt; i++) {
            I createVertexId = getConf().createVertexId();
            createVertexId.readFields(dataInput);
            VertexMutations<I, V, E> vertexMutations = new VertexMutations<>();
            vertexMutations.setConf(getConf());
            vertexMutations.readFields(dataInput);
            if (this.vertexIdMutations.put(createVertexId, vertexMutations) != null) {
                throw new IllegalStateException("readFields: Already has vertex id " + createVertexId);
            }
        }
    }

    @Override // org.apache.giraph.comm.requests.WritableRequest
    public void writeRequest(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.partitionId);
        dataOutput.writeInt(this.vertexIdMutations.size());
        for (Map.Entry<I, VertexMutations<I, V, E>> entry : this.vertexIdMutations.entrySet()) {
            entry.getKey().write(dataOutput);
            entry.getValue().write(dataOutput);
        }
    }

    @Override // org.apache.giraph.comm.requests.WritableRequest
    public RequestType getType() {
        return RequestType.SEND_PARTITION_MUTATIONS_REQUEST;
    }

    @Override // org.apache.giraph.comm.requests.WorkerRequest
    public void doRequest(ServerData<I, V, E> serverData) {
        ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>> partitionMutations = serverData.getPartitionMutations();
        Histogram uniformHistogram = GiraphMetrics.get().perSuperstep().getUniformHistogram(MetricNames.VERTICES_IN_MUTATION_REQUEST);
        int i = 0;
        Iterator<ConcurrentMap<I, VertexMutations<I, V, E>>> it = partitionMutations.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        uniformHistogram.update(i);
        if (!(this.vertexIdMutations instanceof ConcurrentMap)) {
            this.vertexIdMutations = new ConcurrentHashMap(this.vertexIdMutations);
        }
        ConcurrentMap putIfAbsent = partitionMutations.putIfAbsent(Integer.valueOf(this.partitionId), (ConcurrentMap) this.vertexIdMutations);
        if (putIfAbsent != null) {
            for (Map.Entry<I, VertexMutations<I, V, E>> entry : this.vertexIdMutations.entrySet()) {
                VertexMutations vertexMutations = (VertexMutations) putIfAbsent.putIfAbsent(entry.getKey(), entry.getValue());
                if (vertexMutations != null) {
                    synchronized (vertexMutations) {
                        vertexMutations.addVertexMutations(entry.getValue());
                    }
                }
            }
        }
    }

    @Override // org.apache.giraph.comm.requests.WritableRequest
    public int getSerializedSize() {
        return -1;
    }
}
