package org.apache.wayang.giraph.operators;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.job.GiraphJob;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.PageRankOperator;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.core.util.fs.FileUtils;
import org.apache.wayang.giraph.Algorithm.PageRankAlgorithm;
import org.apache.wayang.giraph.Algorithm.PageRankParameters;
import org.apache.wayang.giraph.execution.GiraphExecutor;
import org.apache.wayang.giraph.platform.GiraphPlatform;
import org.apache.wayang.java.channels.StreamChannel;

/* loaded from: input_file:org/apache/wayang/giraph/operators/GiraphPageRankOperator.class */
public class GiraphPageRankOperator extends PageRankOperator implements GiraphExecutionOperator {
    private final Logger logger;
    private String path_out;
    static final /* synthetic */ boolean $assertionsDisabled;

    public GiraphPageRankOperator(Integer num) {
        super(num);
        this.logger = LogManager.getLogger(getClass());
        setPathOut(null, null);
    }

    public GiraphPageRankOperator(PageRankOperator pageRankOperator) {
        super(pageRankOperator);
        this.logger = LogManager.getLogger(getClass());
        setPathOut(null, null);
    }

    @Override // org.apache.wayang.giraph.operators.GiraphExecutionOperator
    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> execute(ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2, GiraphExecutor giraphExecutor, OptimizationContext.OperatorContext operatorContext) {
        if (!$assertionsDisabled && channelInstanceArr.length != getNumInputs()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && channelInstanceArr2.length != getNumOutputs()) {
            throw new AssertionError();
        }
        try {
            return runGiraph((FileChannel.Instance) channelInstanceArr[0], (StreamChannel.Instance) channelInstanceArr2[0], giraphExecutor, operatorContext);
        } catch (IOException e) {
            throw new WayangException(String.format("Running %s failed.", this), e);
        } catch (ClassNotFoundException e2) {
            throw new WayangException(e2);
        } catch (InterruptedException e3) {
            throw new WayangException(e3);
        } catch (URISyntaxException e4) {
            throw new WayangException(e4);
        }
    }

    private Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> runGiraph(FileChannel.Instance instance, StreamChannel.Instance instance2, GiraphExecutor giraphExecutor, OptimizationContext.OperatorContext operatorContext) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        if (!$assertionsDisabled && !instance.wasProduced()) {
            throw new AssertionError();
        }
        Configuration configuration = operatorContext.getOptimizationContext().getConfiguration();
        String pathOut = getPathOut(configuration);
        PageRankParameters.setParameter(PageRankParameters.PageRankEnum.ITERATION, Integer.valueOf(getNumIterations()));
        ((FileSystem) FileSystems.getFileSystem(pathOut).orElseThrow(() -> {
            return new WayangException(String.format("Cannot access file system of %s.", pathOut));
        })).delete(pathOut, true);
        String singlePath = instance.getSinglePath();
        GiraphConfiguration giraphConfiguration = giraphExecutor.getGiraphConfiguration();
        giraphConfiguration.set("giraph.vertex.input.dir", singlePath);
        giraphConfiguration.set("mapred.job.tracker", configuration.getStringProperty("wayang.giraph.job.tracker"));
        giraphConfiguration.set("mapreduce.job.counters.limit", configuration.getStringProperty("wayang.mapreduce.job.counters.limit"));
        giraphConfiguration.setWorkerConfiguration((int) configuration.getLongProperty("wayang.giraph.maxWorkers"), (int) configuration.getLongProperty("wayang.giraph.minWorkers"), 100.0f);
        giraphConfiguration.set("giraph.SplitMasterWorker", "false");
        giraphConfiguration.set("mapreduce.output.fileoutputformat.outputdir", pathOut);
        giraphConfiguration.setComputationClass(PageRankAlgorithm.class);
        giraphConfiguration.setVertexInputFormatClass(PageRankAlgorithm.PageRankVertexInputFormat.class);
        giraphConfiguration.setWorkerContextClass(PageRankAlgorithm.PageRankWorkerContext.class);
        giraphConfiguration.setMasterComputeClass(PageRankAlgorithm.PageRankMasterCompute.class);
        giraphConfiguration.setNumComputeThreads((int) configuration.getLongProperty("wayang.giraph.numThread"));
        giraphConfiguration.setVertexOutputFormatClass(PageRankAlgorithm.PageRankVertexOutputFormat.class);
        new GiraphJob(giraphConfiguration, "wayang-giraph").run(true);
        instance2.accept(createStream(FileSystems.findActualSingleInputPath(pathOut)));
        ExecutionLineageNode executionLineageNode = new ExecutionLineageNode(operatorContext);
        executionLineageNode.add(LoadProfileEstimators.createFromSpecification("wayang.giraph.pagerank.load.main", configuration));
        executionLineageNode.addPredecessor(instance.getLineage());
        ExecutionLineageNode executionLineageNode2 = new ExecutionLineageNode(operatorContext);
        executionLineageNode2.add(LoadProfileEstimators.createFromSpecification("wayang.giraph.pagerank.load.output", configuration));
        instance2.getLineage().addPredecessor(executionLineageNode2);
        return executionLineageNode.collectAndMark();
    }

    public Platform getPlatform() {
        return GiraphPlatform.getInstance();
    }

    public Collection<String> getLoadProfileEstimatorConfigurationKeys() {
        return Arrays.asList("wayang.giraph.pagerank.load.main", "wayang.giraph.pagerank.load.output");
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int i) {
        return Collections.singletonList(FileChannel.HDFS_TSV_DESCRIPTOR);
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int i) {
        return Collections.singletonList(StreamChannel.DESCRIPTOR);
    }

    public void setPathOut(String str, Configuration configuration) {
        if (str == null && configuration != null) {
            str = configuration.getStringProperty("wayang.giraph.hdfs.tempdir");
        }
        this.path_out = str;
    }

    public String getPathOut(Configuration configuration) {
        if (this.path_out == null) {
            setPathOut(null, configuration);
        }
        return this.path_out;
    }

    private Stream<Tuple2<Long, Float>> createStream(String str) {
        return FileUtils.streamLines(str).map(str2 -> {
            String[] split = str2.split("\t");
            return new Tuple2(Long.valueOf(Long.parseLong(split[0])), Float.valueOf(Float.parseFloat(split[1])));
        });
    }

    static {
        $assertionsDisabled = !GiraphPageRankOperator.class.desiredAssertionStatus();
    }
}
