package org.apache.giraph.ooc.persistence;

import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.KryoDataInput;
import com.esotericsoftware.kryo.io.KryoDataOutput;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.io.UnsafeInput;
import com.esotericsoftware.kryo.io.UnsafeOutput;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.class */
public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
    public static final IntConfOption OOC_DISK_BUFFER_SIZE = new IntConfOption("graph.oocDiskBufferSize", 4194304, "size of the buffer when (de)serializing data for reading/writing from/to disk");
    private static final Logger LOG = Logger.getLogger(LocalDiskDataAccessor.class);
    private final byte[][] perThreadBuffers;
    private final String[] basePaths;
    private final int numDisks;

    /* loaded from: input_file:org/apache/giraph/ooc/persistence/LocalDiskDataAccessor$LocalDiskDataInputWrapper.class */
    private static class LocalDiskDataInputWrapper implements OutOfCoreDataAccessor.DataInputWrapper {
        private final File file;
        private final Input input;

        @SuppressWarnings({"OBL_UNSATISFIED_OBLIGATION"})
        LocalDiskDataInputWrapper(String str, byte[] bArr) throws IOException {
            this.file = new File(str);
            if (LocalDiskDataAccessor.LOG.isDebugEnabled()) {
                LocalDiskDataAccessor.LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from local file " + this.file.getAbsolutePath());
            }
            this.input = new UnsafeInput(bArr);
            this.input.setInputStream(new FileInputStream(new RandomAccessFile(this.file, "r").getFD()));
        }

        @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor.DataInputWrapper
        public DataInput getDataInput() {
            return new KryoDataInput(this.input);
        }

        @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor.DataInputWrapper
        public long finalizeInput(boolean z) {
            this.input.close();
            long j = this.input.total();
            Preconditions.checkState(!z || this.file.delete(), "finalizeInput: failed to delete %s.", this.file.getAbsoluteFile());
            return j;
        }
    }

    /* loaded from: input_file:org/apache/giraph/ooc/persistence/LocalDiskDataAccessor$LocalDiskDataOutputWrapper.class */
    private static class LocalDiskDataOutputWrapper implements OutOfCoreDataAccessor.DataOutputWrapper {
        private final File file;
        private final Output output;

        @SuppressWarnings({"OBL_UNSATISFIED_OBLIGATION"})
        LocalDiskDataOutputWrapper(String str, boolean z, byte[] bArr) throws IOException {
            this.file = new File(str);
            if (LocalDiskDataAccessor.LOG.isDebugEnabled()) {
                LocalDiskDataAccessor.LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from local file " + this.file.getAbsolutePath());
                if (!z) {
                    Preconditions.checkState(!this.file.exists(), "LocalDiskDataOutputWrapper: file %s already exist", this.file.getAbsoluteFile());
                    Preconditions.checkState(this.file.createNewFile(), "LocalDiskDataOutputWrapper: cannot create file %s", this.file.getAbsolutePath());
                }
            }
            this.output = new UnsafeOutput(bArr);
            RandomAccessFile randomAccessFile = new RandomAccessFile(this.file, "rw");
            if (z) {
                randomAccessFile.seek(this.file.length());
            }
            this.output.setOutputStream(new FileOutputStream(randomAccessFile.getFD()));
        }

        @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor.DataOutputWrapper
        public DataOutput getDataOutput() {
            return new KryoDataOutput(this.output);
        }

        @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor.DataOutputWrapper
        public long finalizeOutput() {
            this.output.close();
            return this.output.total();
        }
    }

    public LocalDiskDataAccessor(ImmutableClassesGiraphConfiguration<?, ?, ?> immutableClassesGiraphConfiguration) {
        String[] array = GiraphConstants.PARTITIONS_DIRECTORY.getArray(immutableClassesGiraphConfiguration);
        this.numDisks = array.length;
        if (!GiraphConstants.NUM_OUT_OF_CORE_THREADS.isDefaultValue(immutableClassesGiraphConfiguration) || GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(immutableClassesGiraphConfiguration) != this.numDisks) {
            LOG.warn("LocalDiskDataAccessor: with this data accessor, number of out-of-core threads is only specified by the number of directories given by 'giraph.partitionsDirectory' flag! Now using " + this.numDisks + " IO threads!");
        }
        this.basePaths = new String[this.numDisks];
        int i = 0;
        String jobId = immutableClassesGiraphConfiguration.getJobId();
        for (String str : array) {
            String str2 = str + "/" + jobId;
            File file = new File(str2);
            Preconditions.checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create directory " + file.getAbsolutePath());
            this.basePaths[i] = str2 + "/";
            i++;
        }
        this.perThreadBuffers = new byte[this.numDisks][OOC_DISK_BUFFER_SIZE.get(immutableClassesGiraphConfiguration)];
    }

    @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor
    public void initialize() {
    }

    @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor
    @SuppressWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
    public void shutdown() {
        for (String str : this.basePaths) {
            File file = new File(str);
            for (String str2 : file.list()) {
                File file2 = new File(file.getPath(), str2);
                Preconditions.checkState(file2.delete(), "shutdown: cannot delete file %s", file2.getAbsoluteFile());
            }
            Preconditions.checkState(file.delete(), "shutdown: cannot delete directory %s", file.getAbsoluteFile());
        }
    }

    @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor
    public int getNumAccessorThreads() {
        return this.numDisks;
    }

    @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor
    public OutOfCoreDataAccessor.DataInputWrapper prepareInput(int i, DataIndex dataIndex) throws IOException {
        return new LocalDiskDataInputWrapper(this.basePaths[i] + dataIndex.toString(), this.perThreadBuffers[i]);
    }

    @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor
    public OutOfCoreDataAccessor.DataOutputWrapper prepareOutput(int i, DataIndex dataIndex, boolean z) throws IOException {
        return new LocalDiskDataOutputWrapper(this.basePaths[i] + dataIndex.toString(), z, this.perThreadBuffers[i]);
    }

    @Override // org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor
    public boolean dataExist(int i, DataIndex dataIndex) {
        return new File(this.basePaths[i] + dataIndex.toString()).exists();
    }
}
