/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative.nephele.danglingpagerank;

import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
import org.apache.flink.runtime.operators.CoGroupDriver;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.test.iterative.nephele.JobGraphUtils;
import org.apache.flink.test.iterative.nephele.danglingpagerank.CompensatableDotProductCoGroup;
import org.apache.flink.test.iterative.nephele.danglingpagerank.CompensatableDotProductMatch;
import org.apache.flink.test.iterative.nephele.danglingpagerank.CompensatingMap;
import org.apache.flink.test.iterative.nephele.danglingpagerank.DiffL1NormConvergenceCriterion;
import org.apache.flink.test.iterative.nephele.danglingpagerank.ImprovedAdjacencyListInputFormat;
import org.apache.flink.test.iterative.nephele.danglingpagerank.ImprovedDanglingPageRankInputFormat;
import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
import org.apache.flink.test.iterative.nephele.danglingpagerank.PageWithRankOutFormat;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.OperatingSystem;

public class CompensatableDanglingPageRank {
    private static final TypeSerializerFactory<?> recSerializer = RecordSerializerFactory.get();
    private static final TypeComparatorFactory<?> fieldZeroComparator = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{true});
    private static final TypePairComparatorFactory<?, ?> pairComparatorFactory = new RecordPairComparatorFactory();
    private static final int NUM_FILE_HANDLES_PER_SORT = 64;
    private static final float SORT_SPILL_THRESHOLD = 0.85f;
    private static final int ITERATION_ID = 1;

    public static JobGraph getJobGraph(String[] args) throws Exception {
        int parallelism = 2;
        String pageWithRankInputPath = "";
        String adjacencyListInputPath = "";
        String outputPath = OperatingSystem.isWindows() ? "file:/c:/tmp/flink/iterations" : "file:///tmp/flink/iterations";
        int minorConsumer = 25;
        int matchMemory = 50;
        int coGroupSortMemory = 50;
        int numIterations = 25;
        long numVertices = 5L;
        long numDanglingVertices = 1L;
        String failingWorkers = "1";
        int failingIteration = 2;
        double messageLoss = 0.75;
        if (args.length >= 15) {
            parallelism = Integer.parseInt(args[0]);
            pageWithRankInputPath = args[1];
            adjacencyListInputPath = args[2];
            outputPath = args[3];
            minorConsumer = Integer.parseInt(args[5]);
            matchMemory = Integer.parseInt(args[6]);
            coGroupSortMemory = Integer.parseInt(args[7]);
            numIterations = Integer.parseInt(args[8]);
            numVertices = Long.parseLong(args[9]);
            numDanglingVertices = Long.parseLong(args[10]);
            failingWorkers = args[11];
            failingIteration = Integer.parseInt(args[12]);
            messageLoss = Double.parseDouble(args[13]);
        }
        int totalMemoryConsumption = 3 * minorConsumer + matchMemory + coGroupSortMemory;
        JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
        InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(), pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
        TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
        pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        pageWithRankInputConfig.setOutputComparator(fieldZeroComparator, 0);
        pageWithRankInputConfig.setOutputSerializer(recSerializer);
        pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
        InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(), adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
        TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
        adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        adjacencyListInputConfig.setOutputSerializer(recSerializer);
        adjacencyListInputConfig.setOutputComparator(fieldZeroComparator, 0);
        JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, parallelism);
        TaskConfig headConfig = new TaskConfig(head.getConfiguration());
        headConfig.setIterationId(1);
        headConfig.addInputToGroup(0);
        headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
        headConfig.setInputSerializer(recSerializer, 0);
        headConfig.setInputComparator(fieldZeroComparator, 0);
        headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
        headConfig.setRelativeMemoryInput(0, (double)minorConsumer / (double)totalMemoryConsumption);
        headConfig.setFilehandlesInput(0, 64);
        headConfig.setSpillingThresholdInput(0, 0.85f);
        headConfig.setRelativeBackChannelMemory((double)minorConsumer / (double)totalMemoryConsumption);
        headConfig.setOutputSerializer(recSerializer);
        headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
        headFinalOutConfig.setOutputSerializer(recSerializer);
        headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
        headConfig.setIterationHeadIndexOfSyncOutput(3);
        headConfig.setNumberOfIterations(numIterations);
        headConfig.setDriver(CollectorMapDriver.class);
        headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        headConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(CompensatingMap.class));
        headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
        headConfig.setStubParameter("compensation.failingWorker", failingWorkers);
        headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
        headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
        headConfig.addIterationAggregator("pagerank.aggregator", (Aggregator)new PageRankStatsAggregator());
        JobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "IterationIntermediate", jobGraph, parallelism);
        TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
        intermediateConfig.setIterationId(1);
        intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
        intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        intermediateConfig.setRelativeMemoryDriver((double)matchMemory / (double)totalMemoryConsumption);
        intermediateConfig.addInputToGroup(0);
        intermediateConfig.addInputToGroup(1);
        intermediateConfig.setInputSerializer(recSerializer, 0);
        intermediateConfig.setInputSerializer(recSerializer, 1);
        intermediateConfig.setDriverComparator(fieldZeroComparator, 0);
        intermediateConfig.setDriverComparator(fieldZeroComparator, 1);
        intermediateConfig.setDriverPairComparator(pairComparatorFactory);
        intermediateConfig.setOutputSerializer(recSerializer);
        intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        intermediateConfig.setOutputComparator(fieldZeroComparator, 0);
        intermediateConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(CompensatableDotProductMatch.class));
        intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
        intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers);
        intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
        intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
        JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, parallelism);
        TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
        tailConfig.setIterationId(1);
        tailConfig.setIsWorksetUpdate();
        tailConfig.setDriver(CoGroupDriver.class);
        tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP);
        tailConfig.addInputToGroup(0);
        tailConfig.addInputToGroup(1);
        tailConfig.setInputSerializer(recSerializer, 0);
        tailConfig.setInputSerializer(recSerializer, 1);
        tailConfig.setDriverComparator(fieldZeroComparator, 0);
        tailConfig.setDriverComparator(fieldZeroComparator, 1);
        tailConfig.setDriverPairComparator(pairComparatorFactory);
        tailConfig.setInputAsynchronouslyMaterialized(0, true);
        tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer / (double)totalMemoryConsumption);
        tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
        tailConfig.setInputComparator(fieldZeroComparator, 1);
        tailConfig.setRelativeMemoryInput(1, (double)coGroupSortMemory / (double)totalMemoryConsumption);
        tailConfig.setFilehandlesInput(1, 64);
        tailConfig.setSpillingThresholdInput(1, 0.85f);
        tailConfig.setOutputSerializer(recSerializer);
        tailConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(CompensatableDotProductCoGroup.class));
        tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
        tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices));
        tailConfig.setStubParameter("compensation.failingWorker", failingWorkers);
        tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
        tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
        OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
        TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
        outputConfig.addInputToGroup(0);
        outputConfig.setInputSerializer(recSerializer, 0);
        outputConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(PageWithRankOutFormat.class));
        outputConfig.setStubParameter("flink.output.file", outputPath);
        JobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
        TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
        syncConfig.setNumberOfIterations(numIterations);
        syncConfig.addIterationAggregator("pagerank.aggregator", (Aggregator)new PageRankStatsAggregator());
        syncConfig.setConvergenceCriterion("pagerank.aggregator", (ConvergenceCriterion)new DiffL1NormConvergenceCriterion());
        syncConfig.setIterationId(1);
        JobGraphUtils.connect((JobVertex)pageWithRankInput, head, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(head, intermediate, DistributionPattern.POINTWISE);
        intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((JobVertex)adjacencyListInput, intermediate, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
        tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
        JobGraphUtils.connect(head, (JobVertex)output, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        pageWithRankInput.setSlotSharingGroup(sharingGroup);
        adjacencyListInput.setSlotSharingGroup(sharingGroup);
        head.setSlotSharingGroup(sharingGroup);
        intermediate.setSlotSharingGroup(sharingGroup);
        tail.setSlotSharingGroup(sharingGroup);
        output.setSlotSharingGroup(sharingGroup);
        sync.setSlotSharingGroup(sharingGroup);
        tail.setStrictlyCoLocatedWith(head);
        intermediate.setStrictlyCoLocatedWith(head);
        return jobGraph;
    }
}

