package org.apache.giraph.comm.messages;

import com.google.common.collect.Iterators;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.utils.RepresentativeByteStructIterator;
import org.apache.giraph.utils.VerboseByteStructMessageWrite;
import org.apache.giraph.utils.VertexIdIterator;
import org.apache.giraph.utils.VertexIdMessageBytesIterator;
import org.apache.giraph.utils.VertexIdMessageIterator;
import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* loaded from: input_file:org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.class */
public class ByteArrayMessagesPerVertexStore<I extends WritableComparable, M extends Writable> extends SimpleMessageStore<I, M, DataInputOutput> {

    /* loaded from: input_file:org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore$Factory.class */
    private static class Factory<I extends WritableComparable, M extends Writable> implements MessageStoreFactory<I, M, MessageStore<I, M>> {
        private CentralizedServiceWorker<I, ?, ?> service;
        private ImmutableClassesGiraphConfiguration<I, ?, ?> config;

        public Factory(CentralizedServiceWorker<I, ?, ?> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
            this.service = centralizedServiceWorker;
            this.config = immutableClassesGiraphConfiguration;
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public MessageStore<I, M> newStore(MessageValueFactory<M> messageValueFactory) {
            return new ByteArrayMessagesPerVertexStore(messageValueFactory, this.service, this.config);
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public void initialize(CentralizedServiceWorker<I, ?, ?> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
            this.service = centralizedServiceWorker;
            this.config = immutableClassesGiraphConfiguration;
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public boolean shouldTraverseMessagesInOrder() {
            return false;
        }
    }

    public ByteArrayMessagesPerVertexStore(MessageValueFactory<M> messageValueFactory, CentralizedServiceWorker<I, ?, ?> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
        super(messageValueFactory, centralizedServiceWorker, immutableClassesGiraphConfiguration);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public boolean isPointerListEncoding() {
        return false;
    }

    private DataInputOutput getDataInputOutput(ConcurrentMap<I, DataInputOutput> concurrentMap, VertexIdIterator<I> vertexIdIterator) {
        DataInputOutput dataInputOutput = concurrentMap.get(vertexIdIterator.getCurrentVertexId());
        if (dataInputOutput == null) {
            DataInputOutput createMessagesInputOutput = this.config.createMessagesInputOutput();
            dataInputOutput = concurrentMap.putIfAbsent(vertexIdIterator.releaseCurrentVertexId(), createMessagesInputOutput);
            if (dataInputOutput == null) {
                dataInputOutput = createMessagesInputOutput;
            }
        }
        return dataInputOutput;
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void addPartitionMessages(int i, VertexIdMessages<I, M> vertexIdMessages) throws IOException {
        ConcurrentMap<I, DataInputOutput> orCreatePartitionMap = getOrCreatePartitionMap(i);
        VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator = vertexIdMessages.getVertexIdMessageBytesIterator();
        if (vertexIdMessageBytesIterator != null) {
            while (vertexIdMessageBytesIterator.hasNext()) {
                vertexIdMessageBytesIterator.next();
                DataInputOutput dataInputOutput = getDataInputOutput(orCreatePartitionMap, vertexIdMessageBytesIterator);
                synchronized (dataInputOutput) {
                    vertexIdMessageBytesIterator.writeCurrentMessageBytes(dataInputOutput.getDataOutput());
                }
            }
            return;
        }
        VertexIdMessageIterator<I, M> vertexIdMessageIterator = vertexIdMessages.getVertexIdMessageIterator();
        while (vertexIdMessageIterator.hasNext()) {
            vertexIdMessageIterator.next();
            DataInputOutput dataInputOutput2 = getDataInputOutput(orCreatePartitionMap, vertexIdMessageIterator);
            synchronized (dataInputOutput2) {
                VerboseByteStructMessageWrite.verboseWriteCurrentMessage(vertexIdMessageIterator, dataInputOutput2.getDataOutput());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    public Iterable<M> getMessagesAsIterable(DataInputOutput dataInputOutput) {
        return new MessagesIterable(dataInputOutput, this.messageValueFactory);
    }

    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    protected int getNumberOfMessagesIn(ConcurrentMap<I, DataInputOutput> concurrentMap) {
        int i = 0;
        Iterator<DataInputOutput> it2 = concurrentMap.values().iterator();
        while (it2.hasNext()) {
            i += Iterators.size(new RepresentativeByteStructIterator<M>(it2.next().createDataInput()) { // from class: org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore.1
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.apache.giraph.utils.ByteStructIterator
                public M createWritable() {
                    return (M) ByteArrayMessagesPerVertexStore.this.messageValueFactory.mo2128newInstance();
                }
            });
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    public void writeMessages(DataInputOutput dataInputOutput, DataOutput dataOutput) throws IOException {
        dataInputOutput.write(dataOutput);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    public DataInputOutput readFieldsForMessages(DataInput dataInput) throws IOException {
        DataInputOutput createMessagesInputOutput = this.config.createMessagesInputOutput();
        createMessagesInputOutput.readFields(dataInput);
        return createMessagesInputOutput;
    }

    public static <I extends WritableComparable, M extends Writable> MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(CentralizedServiceWorker<I, ?, ?> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
        return new Factory(centralizedServiceWorker, immutableClassesGiraphConfiguration);
    }
}
