package org.apache.giraph.comm.messages.out_of_core;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.messages.MessagesIterable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* loaded from: input_file:org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.class */
public class PartitionDiskBackedMessageStore<I extends WritableComparable, M extends Writable> implements Writable {
    private final MessageValueFactory<M> messageValueFactory;
    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
    private final MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> fileStoreFactory;
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
    private volatile ConcurrentNavigableMap<I, DataInputOutput> inMemoryMessages = new ConcurrentSkipListMap();
    private final AtomicInteger numberOfMessagesInMemory = new AtomicInteger(0);
    private final Set<I> destinationVertices = Collections.newSetFromMap(Maps.newConcurrentMap());
    private final Collection<SequentialFileMessageStore<I, M>> fileStores = Lists.newArrayList();

    /* loaded from: input_file:org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore$Factory.class */
    private static class Factory<I extends WritableComparable, M extends Writable> implements MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> {
        private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
        private final MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> fileStoreFactory;

        public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration, MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> messageStoreFactory) {
            this.config = immutableClassesGiraphConfiguration;
            this.fileStoreFactory = messageStoreFactory;
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public PartitionDiskBackedMessageStore<I, M> newStore(MessageValueFactory<M> messageValueFactory) {
            return new PartitionDiskBackedMessageStore<>(messageValueFactory, this.config, this.fileStoreFactory);
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public void initialize(CentralizedServiceWorker<I, ?, ?> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public boolean shouldTraverseMessagesInOrder() {
            return true;
        }
    }

    public PartitionDiskBackedMessageStore(MessageValueFactory<M> messageValueFactory, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration, MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> messageStoreFactory) {
        this.messageValueFactory = messageValueFactory;
        this.config = immutableClassesGiraphConfiguration;
        this.fileStoreFactory = messageStoreFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean addVertexMessages(I i, Iterable<M> iterable) throws IOException {
        boolean z = false;
        this.destinationVertices.add(i);
        this.rwLock.readLock().lock();
        try {
            DataInputOutput dataInputOutput = (DataInputOutput) this.inMemoryMessages.get(i);
            if (dataInputOutput == null) {
                DataInputOutput createMessagesInputOutput = this.config.createMessagesInputOutput();
                dataInputOutput = this.inMemoryMessages.putIfAbsent(i, createMessagesInputOutput);
                if (dataInputOutput == null) {
                    z = true;
                    dataInputOutput = createMessagesInputOutput;
                }
            }
            synchronized (dataInputOutput) {
                Iterator<M> it2 = iterable.iterator();
                while (it2.hasNext()) {
                    it2.next().write(dataInputOutput.getDataOutput());
                    this.numberOfMessagesInMemory.getAndIncrement();
                }
            }
            return z;
        } finally {
            this.rwLock.readLock().unlock();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v16, types: [java.lang.Iterable] */
    public Iterable<M> getVertexMessages(I i) throws IOException {
        DataInputOutput dataInputOutput = (DataInputOutput) this.inMemoryMessages.get(i);
        if (dataInputOutput == null) {
            dataInputOutput = this.config.createMessagesInputOutput();
        }
        MessagesIterable messagesIterable = new MessagesIterable(dataInputOutput, this.messageValueFactory);
        Iterator<SequentialFileMessageStore<I, M>> it2 = this.fileStores.iterator();
        while (it2.hasNext()) {
            messagesIterable = Iterables.concat(messagesIterable, it2.next().getVertexMessages(i));
        }
        return messagesIterable;
    }

    public int getNumberOfMessages() {
        return this.numberOfMessagesInMemory.get();
    }

    public boolean hasMessagesForVertex(I i) {
        return this.destinationVertices.contains(i);
    }

    public Iterable<I> getDestinationVertices() {
        return this.destinationVertices;
    }

    public void clearVertexMessages(I i) throws IOException {
        this.inMemoryMessages.remove(i);
    }

    public void clearAll() throws IOException {
        this.inMemoryMessages.clear();
        this.destinationVertices.clear();
        Iterator<SequentialFileMessageStore<I, M>> it2 = this.fileStores.iterator();
        while (it2.hasNext()) {
            it2.next().clearAll();
        }
        this.fileStores.clear();
    }

    public void flush() throws IOException {
        this.rwLock.writeLock().lock();
        try {
            ConcurrentNavigableMap<I, DataInputOutput> concurrentNavigableMap = this.inMemoryMessages;
            this.inMemoryMessages = new ConcurrentSkipListMap();
            this.numberOfMessagesInMemory.set(0);
            this.rwLock.writeLock().unlock();
            SequentialFileMessageStore<I, M> newStore = this.fileStoreFactory.newStore(this.messageValueFactory);
            newStore.addMessages(concurrentNavigableMap);
            synchronized (this.fileStores) {
                this.fileStores.add(newStore);
            }
        } catch (Throwable th) {
            this.rwLock.writeLock().unlock();
            throw th;
        }
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.destinationVertices.size());
        Iterator<I> it2 = this.destinationVertices.iterator();
        while (it2.hasNext()) {
            it2.next().write(dataOutput);
        }
        dataOutput.writeInt(this.numberOfMessagesInMemory.get());
        dataOutput.writeInt(this.inMemoryMessages.size());
        for (Map.Entry entry : this.inMemoryMessages.entrySet()) {
            ((WritableComparable) entry.getKey()).write(dataOutput);
            ((DataInputOutput) entry.getValue()).write(dataOutput);
        }
        dataOutput.writeInt(this.fileStores.size());
        Iterator<SequentialFileMessageStore<I, M>> it3 = this.fileStores.iterator();
        while (it3.hasNext()) {
            it3.next().write(dataOutput);
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            I createVertexId = this.config.createVertexId();
            createVertexId.readFields(dataInput);
            this.destinationVertices.add(createVertexId);
        }
        this.numberOfMessagesInMemory.set(dataInput.readInt());
        int readInt2 = dataInput.readInt();
        for (int i2 = 0; i2 < readInt2; i2++) {
            I createVertexId2 = this.config.createVertexId();
            createVertexId2.readFields(dataInput);
            DataInputOutput createMessagesInputOutput = this.config.createMessagesInputOutput();
            createMessagesInputOutput.readFields(dataInput);
            this.inMemoryMessages.put(createVertexId2, createMessagesInputOutput);
        }
        int readInt3 = dataInput.readInt();
        for (int i3 = 0; i3 < readInt3; i3++) {
            SequentialFileMessageStore<I, M> newStore = this.fileStoreFactory.newStore(this.messageValueFactory);
            newStore.readFields(dataInput);
            this.fileStores.add(newStore);
        }
    }

    public static <I extends WritableComparable, M extends Writable> MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> newFactory(ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration, MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> messageStoreFactory) {
        return new Factory(immutableClassesGiraphConfiguration, messageStoreFactory);
    }
}
