/*
 * Decompiled with CFR 0.152.
 */
package org.apache.wayang.graphchi.operators;

import edu.cmu.graphchi.ChiFilenames;
import edu.cmu.graphchi.GraphChiProgram;
import edu.cmu.graphchi.apps.Pagerank;
import edu.cmu.graphchi.datablocks.BytesToValueConverter;
import edu.cmu.graphchi.datablocks.FloatConverter;
import edu.cmu.graphchi.engine.GraphChiEngine;
import edu.cmu.graphchi.preprocessing.FastSharder;
import edu.cmu.graphchi.preprocessing.VertexIdTranslate;
import edu.cmu.graphchi.vertexdata.VertexAggregator;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.PageRankOperator;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.platform.lineage.LazyExecutionLineageNode;
import org.apache.wayang.core.util.ConsumerIteratorAdapter;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.core.util.fs.LocalFileSystem;
import org.apache.wayang.graphchi.operators.GraphChiExecutionOperator;
import org.apache.wayang.graphchi.platform.GraphChiPlatform;
import org.apache.wayang.java.channels.StreamChannel;

public class GraphChiPageRankOperator
extends PageRankOperator
implements GraphChiExecutionOperator {
    private final Logger logger = LogManager.getLogger(this.getClass());

    public GraphChiPageRankOperator(Integer numIterations) {
        super(numIterations);
    }

    public GraphChiPageRankOperator(PageRankOperator pageRankOperator) {
        super(pageRankOperator);
    }

    @Override
    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> execute(ChannelInstance[] inputChannelInstances, ChannelInstance[] outputChannelInstances, OptimizationContext.OperatorContext operatorContext) {
        assert (inputChannelInstances.length == this.getNumInputs());
        assert (outputChannelInstances.length == this.getNumOutputs());
        FileChannel.Instance inputChannelInstance = (FileChannel.Instance)inputChannelInstances[0];
        StreamChannel.Instance outputChannelInstance = (StreamChannel.Instance)outputChannelInstances[0];
        try {
            return this.runGraphChi(inputChannelInstance, outputChannelInstance, operatorContext);
        }
        catch (IOException e) {
            throw new WayangException(String.format("Running %s failed.", this), (Throwable)e);
        }
    }

    private Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> runGraphChi(FileChannel.Instance inputFileChannelInstance, StreamChannel.Instance outputChannelInstance, OptimizationContext.OperatorContext operatorContext) throws IOException {
        assert (inputFileChannelInstance.wasProduced());
        String inputPath = inputFileChannelInstance.getSinglePath();
        String actualInputPath = FileSystems.findActualSingleInputPath((String)inputPath);
        FileSystem inputFs = (FileSystem)FileSystems.getFileSystem((String)inputPath).orElseThrow(() -> new WayangException(String.format("Could not identify filesystem for \"%s\".", inputPath)));
        Configuration configuration = operatorContext.getOptimizationContext().getConfiguration();
        String tempDirPath = configuration.getStringProperty("wayang.graphchi.tempdir");
        Random random = new Random();
        String tempFilePath = String.format("%s%s%04x-%04x-%04x-%04x.tmp", tempDirPath, File.separator, random.nextInt() & 0xFFFF, random.nextInt() & 0xFFFF, random.nextInt() & 0xFFFF, random.nextInt() & 0xFFFF);
        File tempFile = new File(tempFilePath);
        LocalFileSystem.touch((File)tempFile);
        tempFile.deleteOnExit();
        String graphName = tempFile.toString();
        int numShards = 2 + (int)inputFs.getFileSize(actualInputPath) / 10000000;
        if (!new File(ChiFilenames.getFilenameIntervals((String)graphName, (int)numShards)).exists()) {
            FastSharder sharder = GraphChiPageRankOperator.createSharder(graphName, numShards);
            InputStream inputStream = inputFs.open(actualInputPath);
            sharder.shard(inputStream, FastSharder.GraphInputFormat.EDGELIST);
        } else {
            this.logger.info("Found shards -- no need to preprocess");
        }
        GraphChiEngine engine = new GraphChiEngine(graphName, numShards);
        engine.setEdataConverter((BytesToValueConverter)new FloatConverter());
        engine.setVertexDataConverter((BytesToValueConverter)new FloatConverter());
        engine.setModifiesInedges(false);
        engine.run((GraphChiProgram)new Pagerank(), this.numIterations.intValue());
        ConsumerIteratorAdapter consumerIteratorAdapter = new ConsumerIteratorAdapter();
        Consumer consumer = consumerIteratorAdapter.getConsumer();
        Iterator iterator = consumerIteratorAdapter.getIterator();
        VertexIdTranslate trans = engine.getVertexIdTranslate();
        new Thread(() -> {
            try {
                VertexAggregator.foreach((int)engine.numVertices(), (String)graphName, (BytesToValueConverter)new FloatConverter(), (vertexId, vertexValue) -> consumer.accept(new Tuple2((Object)trans.backward(vertexId), vertexValue)));
            }
            catch (IOException e) {
                throw new WayangException((Throwable)e);
            }
            finally {
                consumerIteratorAdapter.declareLastAdd();
            }
        }, String.format("%s (output)", this)).start();
        Stream outputStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);
        outputChannelInstance.accept(outputStream);
        ExecutionLineageNode mainExecutionLineage = new ExecutionLineageNode(operatorContext);
        mainExecutionLineage.add(LoadProfileEstimators.createFromSpecification((String)"wayang.graphchi.pagerank.load.main", (Configuration)configuration));
        mainExecutionLineage.addPredecessor((LazyExecutionLineageNode)inputFileChannelInstance.getLineage());
        ExecutionLineageNode outputExecutionLineage = new ExecutionLineageNode(operatorContext);
        outputExecutionLineage.add(LoadProfileEstimators.createFromSpecification((String)"wayang.graphchi.pagerank.load.output", (Configuration)configuration));
        outputChannelInstance.getLineage().addPredecessor((LazyExecutionLineageNode)outputExecutionLineage);
        return mainExecutionLineage.collectAndMark();
    }

    protected static FastSharder createSharder(String graphName, int numShards) throws IOException {
        return new FastSharder(graphName, numShards, (vertexId, token) -> Float.valueOf(token == null ? 0.0f : Float.parseFloat(token)), (from, to, token) -> Float.valueOf(token == null ? 0.0f : Float.parseFloat(token)), (BytesToValueConverter)new FloatConverter(), (BytesToValueConverter)new FloatConverter());
    }

    public Platform getPlatform() {
        return GraphChiPlatform.getInstance();
    }

    public Collection<String> getLoadProfileEstimatorConfigurationKeys() {
        return Arrays.asList("wayang.graphchi.pagerank.load.main", "wayang.graphchi.pagerank.load.output");
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int index) {
        return Collections.singletonList(FileChannel.HDFS_TSV_DESCRIPTOR);
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
        return Collections.singletonList(StreamChannel.DESCRIPTOR);
    }
}

