package org.apache.giraph.comm.messages.out_of_core;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.messages.MessagesIterable;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.utils.EmptyIterable;
import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.class */
public class SequentialFileMessageStore<I extends WritableComparable, M extends Writable> implements Writable {
    private static final Logger LOG = Logger.getLogger(SequentialFileMessageStore.class);
    private final MessageValueFactory<M> messageValueFactory;
    private final File file;
    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
    private final int bufferSize;
    private DataInputStream in;
    private int verticesLeft;
    private I currentVertexId;

    /* loaded from: input_file:org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore$Factory.class */
    private static class Factory<I extends WritableComparable, M extends Writable> implements MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> {
        private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
        private final String[] directories;
        private final int bufferSize;
        private final AtomicInteger storeCounter;

        public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
            this.config = immutableClassesGiraphConfiguration;
            String str = immutableClassesGiraphConfiguration.get("mapred.job.id", "Unknown Job");
            int taskPartition = immutableClassesGiraphConfiguration.getTaskPartition();
            List<String> list = GiraphConstants.MESSAGES_DIRECTORY.getList(immutableClassesGiraphConfiguration);
            Collections.shuffle(list);
            this.directories = new String[list.size()];
            int i = 0;
            Iterator<String> it2 = list.iterator();
            while (it2.hasNext()) {
                String str2 = it2.next() + File.separator + str + File.separator + taskPartition + File.separator;
                int i2 = i;
                i++;
                this.directories[i2] = str2;
                if (!new File(str2).mkdirs()) {
                    SequentialFileMessageStore.LOG.error("SequentialFileMessageStore$Factory: Failed to create " + str2);
                }
            }
            this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(immutableClassesGiraphConfiguration);
            this.storeCounter = new AtomicInteger();
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public SequentialFileMessageStore<I, M> newStore(MessageValueFactory<M> messageValueFactory) {
            int abs = Math.abs(this.storeCounter.getAndIncrement());
            return new SequentialFileMessageStore<>(messageValueFactory, this.config, this.bufferSize, this.directories[abs % this.directories.length] + "messages-" + abs);
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public void initialize(CentralizedServiceWorker<I, ?, ?> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public boolean shouldTraverseMessagesInOrder() {
            return true;
        }
    }

    public SequentialFileMessageStore(MessageValueFactory<M> messageValueFactory, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration, int i, String str) {
        this.messageValueFactory = messageValueFactory;
        this.config = immutableClassesGiraphConfiguration;
        this.bufferSize = i;
        this.file = new File(str);
    }

    public void addMessages(NavigableMap<I, DataInputOutput> navigableMap) throws IOException {
        if (this.file.exists()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("addMessages: Deleting " + this.file);
            }
            if (!this.file.delete()) {
                throw new IOException("Failed to delete existing file " + this.file);
            }
        }
        if (!this.file.createNewFile()) {
            throw new IOException("Failed to create file " + this.file);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("addMessages: Creating " + this.file);
        }
        DataOutputStream dataOutputStream = null;
        try {
            dataOutputStream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(this.file), this.bufferSize));
            dataOutputStream.writeInt(navigableMap.size());
            for (Map.Entry<I, DataInputOutput> entry : navigableMap.entrySet()) {
                I key = entry.getKey();
                key.write(dataOutputStream);
                MessagesIterable<Writable> messagesIterable = new MessagesIterable(entry.getValue(), this.messageValueFactory);
                int size = Iterables.size(messagesIterable);
                dataOutputStream.writeInt(size);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("addMessages: For vertex id " + key + ", messages = " + size + " to file " + this.file);
                }
                for (Writable writable : messagesIterable) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("addMessages: Wrote " + writable + " to " + this.file);
                    }
                    writable.write(dataOutputStream);
                }
            }
            if (dataOutputStream != null) {
                dataOutputStream.close();
            }
        } catch (Throwable th) {
            if (dataOutputStream != null) {
                dataOutputStream.close();
            }
            throw th;
        }
    }

    public Iterable<M> getVertexMessages(I i) throws IOException {
        I i2;
        if (LOG.isDebugEnabled()) {
            LOG.debug("getVertexMessages: Reading for vertex id " + i + " (currently " + this.currentVertexId + ") from " + this.file);
        }
        if (this.in == null) {
            startReading();
        }
        I currentVertexId = getCurrentVertexId();
        while (true) {
            i2 = currentVertexId;
            if (i2 == null || i.compareTo(i2) <= 0) {
                break;
            }
            currentVertexId = getNextVertexId();
        }
        return (i2 == null || i.compareTo(i2) < 0) ? EmptyIterable.get() : readMessagesForCurrentVertex();
    }

    public void clearAll() throws IOException {
        endReading();
        if (this.file.delete()) {
            return;
        }
        LOG.error("clearAll: Failed to delete file " + this.file);
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(this.file.length());
        FileInputStream fileInputStream = new FileInputStream(this.file);
        try {
            byte[] bArr = new byte[this.bufferSize];
            while (true) {
                int read = fileInputStream.read(bArr);
                if (read < 0) {
                    return;
                } else {
                    dataOutput.write(bArr, 0, read);
                }
            }
        } finally {
            fileInputStream.close();
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(this.file);
        try {
            long readLong = dataInput.readLong();
            byte[] bArr = new byte[this.bufferSize];
            long j = 0;
            while (j < readLong) {
                dataInput.readFully(bArr, 0, (int) Math.min(this.bufferSize, readLong - j));
                fileOutputStream.write(bArr);
                j += this.bufferSize;
            }
        } finally {
            fileOutputStream.close();
        }
    }

    private void startReading() throws IOException {
        this.currentVertexId = null;
        this.in = new DataInputStream(new BufferedInputStream(new FileInputStream(this.file), this.bufferSize));
        this.verticesLeft = this.in.readInt();
        if (LOG.isDebugEnabled()) {
            LOG.debug("startReading: File " + this.file + " with " + this.verticesLeft + " vertices left");
        }
    }

    private I getCurrentVertexId() throws IOException {
        return this.currentVertexId != null ? this.currentVertexId : getNextVertexId();
    }

    private I getNextVertexId() throws IOException {
        if (this.currentVertexId != null) {
            readMessagesForCurrentVertex();
        }
        if (this.verticesLeft == 0) {
            return null;
        }
        this.currentVertexId = this.config.createVertexId();
        this.currentVertexId.readFields(this.in);
        return this.currentVertexId;
    }

    /* JADX WARN: Type inference failed for: r0v11, types: [org.apache.hadoop.io.Writable, java.lang.Object] */
    private Collection<M> readMessagesForCurrentVertex() throws IOException {
        int readInt = this.in.readInt();
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(readInt);
        for (int i = 0; i < readInt; i++) {
            ?? newInstance = this.messageValueFactory.mo2218newInstance();
            try {
                newInstance.readFields(this.in);
                newArrayListWithCapacity.add(newInstance);
            } catch (IOException e) {
                throw new IllegalStateException("readMessagesForCurrentVertex: Failed to read message from " + i + " of " + readInt + " for vertex id " + this.currentVertexId + " from " + this.file, e);
            }
        }
        currentVertexDone();
        return newArrayListWithCapacity;
    }

    private void currentVertexDone() throws IOException {
        this.currentVertexId = null;
        this.verticesLeft--;
        if (this.verticesLeft == 0) {
            endReading();
        }
    }

    private void endReading() throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("endReading: Stopped reading " + this.file);
        }
        if (this.in != null) {
            this.in.close();
            this.in = null;
        }
    }

    public static <I extends WritableComparable, M extends Writable> MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> newFactory(ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
        return new Factory(immutableClassesGiraphConfiguration);
    }
}
