package org.apache.giraph.ooc.data;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.ooc.persistence.DataIndex;
import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/ooc/data/DiskBackedDataStore.class */
public abstract class DiskBackedDataStore<T> {
    public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH = new IntConfOption("giraph.flushBufferSize", 8388608, "Minimum size of a buffer (in bytes) to flush to disk.");
    private static final Logger LOG = Logger.getLogger(DiskBackedDataStore.class);
    protected final OutOfCoreEngine oocEngine;
    private final int minBufferSizeToOffload;
    protected final Set<Integer> hasPartitionDataOnFile = Sets.newConcurrentHashSet();
    private final Set<Integer> hasPartitionDataOnDisk = Sets.newConcurrentHashSet();
    private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers = Maps.newConcurrentMap();
    private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk = Maps.newConcurrentMap();
    private final ConcurrentMap<Integer, ReadWriteLock> locks = Maps.newConcurrentMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public DiskBackedDataStore(ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, OutOfCoreEngine outOfCoreEngine) {
        this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(immutableClassesGiraphConfiguration);
        this.oocEngine = outOfCoreEngine;
    }

    private ReadWriteLock getPartitionLock(int i) {
        ReadWriteLock readWriteLock = this.locks.get(Integer.valueOf(i));
        if (readWriteLock == null) {
            readWriteLock = new ReentrantReadWriteLock();
            ReadWriteLock putIfAbsent = this.locks.putIfAbsent(Integer.valueOf(i), readWriteLock);
            if (putIfAbsent != null) {
                readWriteLock = putIfAbsent;
            }
        }
        return readWriteLock;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addEntry(int i, T t) {
        ReadWriteLock partitionLock = getPartitionLock(i);
        partitionLock.readLock().lock();
        if (this.hasPartitionDataOnDisk.contains(Integer.valueOf(i))) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(t);
            int entrySerializedSize = entrySerializedSize(t);
            Pair<Integer, List<T>> putIfAbsent = this.dataBuffers.putIfAbsent(Integer.valueOf(i), new MutablePair(Integer.valueOf(entrySerializedSize), arrayList));
            if (putIfAbsent != null) {
                synchronized (putIfAbsent) {
                    MutablePair mutablePair = (MutablePair) putIfAbsent;
                    mutablePair.setLeft(Integer.valueOf(putIfAbsent.getLeft().intValue() + entrySerializedSize));
                    ((List) mutablePair.getRight()).add(t);
                }
            }
        } else {
            addEntryToInMemoryPartitionData(i, t);
        }
        partitionLock.readLock().unlock();
    }

    public abstract long loadPartitionData(int i) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public long loadPartitionDataProxy(int i, DataIndex dataIndex) throws IOException {
        long j = 0;
        ReadWriteLock partitionLock = getPartitionLock(i);
        partitionLock.writeLock().lock();
        if (this.hasPartitionDataOnDisk.contains(Integer.valueOf(i))) {
            int ownerThreadId = this.oocEngine.getMetaPartitionManager().getOwnerThreadId(i);
            j = 0 + loadInMemoryPartitionData(i, ownerThreadId, dataIndex.addIndex(DataIndex.NumericIndexEntry.createPartitionEntry(i)));
            this.hasPartitionDataOnDisk.remove(Integer.valueOf(i));
            Integer remove = this.numDataBuffersOnDisk.remove(Integer.valueOf(i));
            if (remove != null) {
                Preconditions.checkState(remove.intValue() > 0);
                dataIndex.addIndex(DataIndex.TypeIndexEntry.BUFFER);
                OutOfCoreDataAccessor.DataInputWrapper prepareInput = this.oocEngine.getDataAccessor().prepareInput(ownerThreadId, dataIndex.copy());
                DataInput dataInput = prepareInput.getDataInput();
                for (int i2 = 0; i2 < remove.intValue(); i2++) {
                    addEntryToInMemoryPartitionData(i, readNextEntry(dataInput));
                }
                j += prepareInput.finalizeInput(true);
                dataIndex.removeLastIndex();
            }
            dataIndex.removeLastIndex();
            Pair<Integer, List<T>> remove2 = this.dataBuffers.remove(Integer.valueOf(i));
            if (remove2 != null) {
                Iterator<T> it2 = remove2.getValue().iterator();
                while (it2.hasNext()) {
                    addEntryToInMemoryPartitionData(i, it2.next());
                }
            }
        }
        partitionLock.writeLock().unlock();
        return j;
    }

    public abstract long offloadPartitionData(int i) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    @SuppressWarnings({"UL_UNRELEASED_LOCK_EXCEPTION_PATH"})
    public long offloadPartitionDataProxy(int i, DataIndex dataIndex) throws IOException {
        ReadWriteLock partitionLock = getPartitionLock(i);
        partitionLock.writeLock().lock();
        this.hasPartitionDataOnDisk.add(Integer.valueOf(i));
        partitionLock.writeLock().unlock();
        long offloadInMemoryPartitionData = offloadInMemoryPartitionData(i, this.oocEngine.getMetaPartitionManager().getOwnerThreadId(i), dataIndex.addIndex(DataIndex.NumericIndexEntry.createPartitionEntry(i)));
        dataIndex.removeLastIndex();
        return offloadInMemoryPartitionData;
    }

    public abstract long offloadBuffers(int i) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public long offloadBuffersProxy(int i, DataIndex dataIndex) throws IOException {
        Pair<Integer, List<T>> pair = this.dataBuffers.get(Integer.valueOf(i));
        if (pair == null || pair.getLeft().intValue() < this.minBufferSizeToOffload) {
            return 0L;
        }
        ReadWriteLock partitionLock = getPartitionLock(i);
        partitionLock.writeLock().lock();
        Pair<Integer, List<T>> remove = this.dataBuffers.remove(Integer.valueOf(i));
        partitionLock.writeLock().unlock();
        Preconditions.checkNotNull(remove);
        Preconditions.checkState(!remove.getRight().isEmpty());
        int ownerThreadId = this.oocEngine.getMetaPartitionManager().getOwnerThreadId(i);
        dataIndex.addIndex(DataIndex.NumericIndexEntry.createPartitionEntry(i)).addIndex(DataIndex.TypeIndexEntry.BUFFER);
        OutOfCoreDataAccessor.DataOutputWrapper prepareOutput = this.oocEngine.getDataAccessor().prepareOutput(ownerThreadId, dataIndex.copy(), true);
        Iterator<T> it2 = remove.getRight().iterator();
        while (it2.hasNext()) {
            writeEntry(it2.next(), prepareOutput.getDataOutput());
        }
        long finalizeOutput = prepareOutput.finalizeOutput();
        dataIndex.removeLastIndex().removeLastIndex();
        int size = remove.getRight().size();
        Integer putIfAbsent = this.numDataBuffersOnDisk.putIfAbsent(Integer.valueOf(i), Integer.valueOf(size));
        if (putIfAbsent != null) {
            this.numDataBuffersOnDisk.replace(Integer.valueOf(i), Integer.valueOf(putIfAbsent.intValue() + size));
        }
        return finalizeOutput;
    }

    public Set<Integer> getCandidateBuffersToOffload(int i) {
        HashSet hashSet = new HashSet();
        Iterator<Map.Entry<Integer, Pair<Integer, List<T>>>> it2 = this.dataBuffers.entrySet().iterator();
        while (it2.hasNext()) {
            int intValue = it2.next().getKey().intValue();
            if (r0.getValue().getLeft().intValue() > this.minBufferSizeToOffload && this.oocEngine.getMetaPartitionManager().getOwnerThreadId(intValue) == i) {
                hashSet.add(Integer.valueOf(intValue));
            }
        }
        return hashSet;
    }

    protected abstract void writeEntry(T t, DataOutput dataOutput) throws IOException;

    protected abstract T readNextEntry(DataInput dataInput) throws IOException;

    protected abstract long loadInMemoryPartitionData(int i, int i2, DataIndex dataIndex) throws IOException;

    protected abstract long offloadInMemoryPartitionData(int i, int i2, DataIndex dataIndex) throws IOException;

    protected abstract int entrySerializedSize(T t);

    protected abstract void addEntryToInMemoryPartitionData(int i, T t);
}
