package org.apache.hugegraph.computer.core.compute;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.compute.input.EdgesInput;
import org.apache.hugegraph.computer.core.compute.input.MessageInput;
import org.apache.hugegraph.computer.core.compute.input.VertexInput;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.graph.partition.PartitionStat;
import org.apache.hugegraph.computer.core.graph.value.Value;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.io.BufferedFileInput;
import org.apache.hugegraph.computer.core.io.BufferedFileOutput;
import org.apache.hugegraph.computer.core.manager.Managers;
import org.apache.hugegraph.computer.core.output.ComputerOutput;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.store.EntryIterator;
import org.apache.hugegraph.computer.core.store.FileGenerator;
import org.apache.hugegraph.computer.core.store.FileManager;
import org.apache.hugegraph.computer.core.store.entry.EntriesUtil;
import org.apache.hugegraph.computer.core.store.entry.KvEntry;
import org.apache.hugegraph.computer.core.store.entry.Pointer;
import org.apache.hugegraph.computer.core.worker.Computation;
import org.apache.hugegraph.computer.core.worker.ComputationContext;
import org.apache.hugegraph.computer.core.worker.WorkerContext;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/compute/FileGraphPartition.class */
public class FileGraphPartition {
    private static final Logger LOG;
    private static final String VERTEX = "vertex";
    private static final String EDGE = "edge";
    private static final String STATUS = "status";
    private static final String VALUE = "value";
    private final ComputerContext context;
    private final Computation<Value> computation;
    private final FileGenerator fileGenerator;
    private final int partition;
    private final File vertexFile;
    private final File edgeFile;
    private File preStatusFile;
    private File curStatusFile;
    private File preValueFile;
    private File curValueFile;
    private long vertexCount;
    private long edgeCount;
    private BufferedFileOutput curStatusOutput;
    private BufferedFileOutput curValueOutput;
    private BufferedFileInput preStatusInput;
    private BufferedFileInput preValueInput;
    private VertexInput vertexInput;
    private EdgesInput edgesInput;
    private MessageInput<Value> messageInput;
    static final /* synthetic */ boolean $assertionsDisabled;

    public FileGraphPartition(ComputerContext computerContext, Managers managers, int i) {
        this.context = computerContext;
        this.computation = (Computation) computerContext.config().createObject(ComputerOptions.WORKER_COMPUTATION_CLASS);
        this.computation.init(computerContext.config());
        this.fileGenerator = (FileGenerator) managers.get(FileManager.NAME);
        this.partition = i;
        this.vertexFile = new File(this.fileGenerator.randomDirectory(VERTEX));
        this.edgeFile = new File(this.fileGenerator.randomDirectory(EDGE));
        this.vertexCount = 0L;
        this.edgeCount = 0L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PartitionStat input(PeekableIterator<KvEntry> peekableIterator, PeekableIterator<KvEntry> peekableIterator2) {
        try {
            createFile(this.vertexFile);
            createFile(this.edgeFile);
            BufferedFileOutput bufferedFileOutput = new BufferedFileOutput(this.vertexFile);
            BufferedFileOutput bufferedFileOutput2 = new BufferedFileOutput(this.edgeFile);
            while (peekableIterator.hasNext()) {
                KvEntry kvEntry = (KvEntry) peekableIterator.next();
                Pointer key = kvEntry.key();
                writeVertex(key, kvEntry.value(), bufferedFileOutput);
                writeEdges(key, peekableIterator2, bufferedFileOutput2);
            }
            bufferedFileOutput.close();
            bufferedFileOutput2.close();
            return new PartitionStat(this.partition, this.vertexCount, this.edgeCount, 0L);
        } catch (IOException e) {
            throw new ComputerException("Failed to init FileGraphPartition '%s'", e, new Object[]{Integer.valueOf(this.partition)});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PartitionStat compute(WorkerContext workerContext, int i) {
        LOG.info("Partition {} begin compute in superstep {}", Integer.valueOf(this.partition), Integer.valueOf(i));
        try {
            beforeCompute(i);
            try {
                this.computation.beforeSuperstep(workerContext);
                long compute0 = i == 0 ? compute0(workerContext) : compute1(workerContext);
                this.computation.afterSuperstep(workerContext);
                try {
                    afterCompute(i);
                    LOG.info("Partition {} finish compute in superstep {}", Integer.valueOf(this.partition), Integer.valueOf(i));
                    return new PartitionStat(this.partition, this.vertexCount, this.edgeCount, this.vertexCount - compute0);
                } catch (Exception e) {
                    throw new ComputerException("Error occurred when afterCompute at superstep %s", e, new Object[]{Integer.valueOf(i)});
                }
            } catch (Exception e2) {
                throw new ComputerException("Error occurred when compute at superstep %s", e2, new Object[]{Integer.valueOf(i)});
            }
        } catch (IOException e3) {
            throw new ComputerException("Error occurred when beforeCompute at superstep %s", e3, new Object[]{Integer.valueOf(i)});
        }
    }

    private long compute0(ComputationContext computationContext) {
        long j = 0;
        while (this.vertexInput.hasNext()) {
            Vertex next = this.vertexInput.next();
            next.reactivate();
            next.edges(this.edgesInput.edges(this.vertexInput.idPointer()));
            this.computation.compute0(computationContext, next);
            if (next.active()) {
                j++;
            }
            try {
                saveVertexStatusAndValue(next);
            } catch (IOException e) {
                throw new ComputerException("Error occurred when saveVertex: %s", e, new Object[]{next});
            }
        }
        return j;
    }

    private long compute1(ComputationContext computationContext) {
        Value value = (Value) this.context.config().createObject(ComputerOptions.ALGORITHM_RESULT_CLASS);
        long j = 0;
        while (this.vertexInput.hasNext()) {
            Vertex next = this.vertexInput.next();
            readVertexStatusAndValue(next, value);
            Iterator<Value> it = this.messageInput.iterator(this.vertexInput.idPointer());
            if (it.hasNext()) {
                next.reactivate();
            }
            if (next.active()) {
                next.edges(this.edgesInput.edges(this.vertexInput.idPointer()));
                this.computation.compute(computationContext, next, it);
            }
            if (next.active()) {
                j++;
            }
            try {
                saveVertexStatusAndValue(next);
            } catch (IOException e) {
                throw new ComputerException("Error occurred when saveVertex", e);
            }
        }
        return j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PartitionStat output() {
        ComputerOutput computerOutput = (ComputerOutput) this.context.config().createObject(ComputerOptions.OUTPUT_CLASS);
        computerOutput.init(this.context.config(), this.partition);
        try {
            beforeOutput();
            Value value = (Value) this.context.config().createObject(ComputerOptions.ALGORITHM_RESULT_CLASS);
            while (this.vertexInput.hasNext()) {
                Vertex next = this.vertexInput.next();
                readVertexStatusAndValue(next, value);
                next.edges(this.edgesInput.edges(this.vertexInput.idPointer()));
                if (computerOutput.filter(this.context.config(), this.computation, next)) {
                    computerOutput.write(next);
                }
            }
            try {
                afterOutput();
                computerOutput.close();
                return new PartitionStat(this.partition, this.vertexCount, this.edgeCount, 0L);
            } catch (IOException e) {
                throw new ComputerException("Error occurred when afterOutput", e);
            }
        } catch (IOException e2) {
            throw new ComputerException("Error occurred when beforeOutput", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void messages(PeekableIterator<KvEntry> peekableIterator) {
        this.messageInput = new MessageInput<>(this.context, peekableIterator);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int partition() {
        return this.partition;
    }

    private void readVertexStatusAndValue(Vertex vertex, Value value) {
        try {
            if (this.preStatusInput.readBoolean()) {
                vertex.reactivate();
            } else {
                vertex.inactivate();
            }
            try {
                value.read(this.preValueInput);
                vertex.value(value);
            } catch (IOException e) {
                throw new ComputerException("Failed to read value of vertex '%s'", e, new Object[]{vertex});
            }
        } catch (IOException e2) {
            throw new ComputerException("Failed to read status of vertex '%s'", e2, new Object[]{vertex});
        }
    }

    private void saveVertexStatusAndValue(Vertex vertex) throws IOException {
        this.curStatusOutput.writeBoolean(vertex.active());
        Value value = vertex.value();
        E.checkNotNull(value, "Vertex's value can't be null");
        value.write(this.curValueOutput);
    }

    private void writeVertex(Pointer pointer, Pointer pointer2, BufferedFileOutput bufferedFileOutput) throws IOException {
        byte[] bytes = pointer.bytes();
        bufferedFileOutput.writeFixedInt(bytes.length);
        bufferedFileOutput.write(bytes);
        byte[] bytes2 = pointer2.bytes();
        bufferedFileOutput.writeFixedInt(bytes2.length);
        bufferedFileOutput.write(bytes2);
        this.vertexCount++;
    }

    private void writeEdges(Pointer pointer, PeekableIterator<KvEntry> peekableIterator, BufferedFileOutput bufferedFileOutput) throws IOException {
        byte[] bytes = pointer.bytes();
        while (peekableIterator.hasNext()) {
            KvEntry peek = peekableIterator.peek();
            int compareTo = pointer.compareTo(peek.key());
            if (compareTo < 0) {
                return;
            }
            peekableIterator.next();
            if (compareTo <= 0) {
                if (!$assertionsDisabled && compareTo != 0) {
                    throw new AssertionError();
                }
                bufferedFileOutput.writeFixedInt(bytes.length);
                bufferedFileOutput.write(bytes);
                long position = bufferedFileOutput.position();
                bufferedFileOutput.writeFixedInt(0);
                this.edgeCount += peek.numSubEntries();
                bufferedFileOutput.writeFixedInt((int) peek.numSubEntries());
                EntryIterator subKvIterFromEntry = EntriesUtil.subKvIterFromEntry(peek);
                while (subKvIterFromEntry.hasNext()) {
                    KvEntry kvEntry = (KvEntry) subKvIterFromEntry.next();
                    bufferedFileOutput.write(kvEntry.key().bytes());
                    bufferedFileOutput.write(kvEntry.value().bytes());
                }
                bufferedFileOutput.writeFixedInt(position, (int) ((bufferedFileOutput.position() - position) - 4));
            }
        }
    }

    private void beforeCompute(int i) throws IOException {
        this.vertexInput = new VertexInput(this.context, this.vertexFile, this.vertexCount);
        this.edgesInput = new EdgesInput(this.context, this.edgeFile);
        this.vertexInput.init();
        this.edgesInput.init();
        if (i != 0) {
            this.preStatusFile = this.curStatusFile;
            this.preValueFile = this.curValueFile;
            this.preStatusInput = new BufferedFileInput(this.preStatusFile);
            this.preValueInput = new BufferedFileInput(this.preValueFile);
        }
        String randomDirectory = this.fileGenerator.randomDirectory(STATUS, Integer.toString(i), Integer.toString(this.partition));
        String randomDirectory2 = this.fileGenerator.randomDirectory(VALUE, Integer.toString(i), Integer.toString(this.partition));
        this.curStatusFile = new File(randomDirectory);
        this.curValueFile = new File(randomDirectory2);
        createFile(this.curStatusFile);
        createFile(this.curValueFile);
        this.curStatusOutput = new BufferedFileOutput(this.curStatusFile);
        this.curValueOutput = new BufferedFileOutput(this.curValueFile);
    }

    private void afterCompute(int i) throws Exception {
        this.vertexInput.close();
        this.edgesInput.close();
        if (i != 0) {
            this.messageInput.close();
            this.preStatusInput.close();
            this.preValueInput.close();
            this.preStatusFile.delete();
            this.preValueFile.delete();
        }
        this.curStatusOutput.close();
        this.curValueOutput.close();
    }

    private void beforeOutput() throws IOException {
        this.vertexInput = new VertexInput(this.context, this.vertexFile, this.vertexCount);
        this.edgesInput = new EdgesInput(this.context, this.edgeFile);
        this.vertexInput.init();
        this.edgesInput.init();
        this.preStatusFile = this.curStatusFile;
        this.preValueFile = this.curValueFile;
        this.preStatusInput = new BufferedFileInput(this.preStatusFile);
        this.preValueInput = new BufferedFileInput(this.preValueFile);
    }

    private void afterOutput() throws IOException {
        this.vertexInput.close();
        this.edgesInput.close();
        this.preStatusInput.close();
        this.preValueInput.close();
        if (!$assertionsDisabled && this.preStatusFile != this.curStatusFile) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.preValueFile != this.curValueFile) {
            throw new AssertionError();
        }
        this.preStatusFile.delete();
        this.preValueFile.delete();
        this.vertexFile.delete();
        this.edgeFile.delete();
    }

    private static void createFile(File file) throws IOException {
        file.getParentFile().mkdirs();
        E.checkArgument(file.createNewFile(), "Already exists file: %s", new Object[]{file});
    }

    static {
        $assertionsDisabled = !FileGraphPartition.class.desiredAssertionStatus();
        LOG = Log.logger(FileGraphPartition.class);
    }
}
