/*
 * Decompiled with CFR 0.152.
 */
package org.apache.wayang.graphchi.execution;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.ExecutionState;
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.ExecutorTemplate;
import org.apache.wayang.core.platform.PartialExecution;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.graphchi.operators.GraphChiExecutionOperator;
import org.apache.wayang.graphchi.platform.GraphChiPlatform;

public class GraphChiExecutor
extends ExecutorTemplate {
    private final GraphChiPlatform platform;
    private final Configuration configuration;
    private final Job job;

    public GraphChiExecutor(GraphChiPlatform platform, Job job) {
        super(job.getCrossPlatformExecutor());
        this.job = job;
        this.platform = platform;
        this.configuration = job.getConfiguration();
    }

    public void execute(ExecutionStage stage, OptimizationContext optimizationContext, ExecutionState executionState) {
        LinkedList scheduledTasks = new LinkedList(stage.getStartTasks());
        HashSet<ExecutionTask> executedTasks = new HashSet<ExecutionTask>();
        while (!scheduledTasks.isEmpty()) {
            ExecutionTask task = (ExecutionTask)scheduledTasks.poll();
            if (executedTasks.contains(task)) continue;
            this.execute(task, optimizationContext, executionState);
            executedTasks.add(task);
            Arrays.stream(task.getOutputChannels()).flatMap(channel -> channel.getConsumers().stream()).filter(consumer -> consumer.getStage() == stage).forEach(scheduledTasks::add);
        }
    }

    private void execute(ExecutionTask task, OptimizationContext optimizationContext, ExecutionState executionState) {
        GraphChiExecutionOperator graphChiExecutionOperator = (GraphChiExecutionOperator)task.getOperator();
        ChannelInstance[] inputChannelInstances = new ChannelInstance[task.getNumInputChannels()];
        for (int i = 0; i < inputChannelInstances.length; ++i) {
            inputChannelInstances[i] = executionState.getChannelInstance(task.getInputChannel(i));
        }
        OptimizationContext.OperatorContext operatorContext = optimizationContext.getOperatorContext((Operator)graphChiExecutionOperator);
        ChannelInstance[] outputChannelInstances = new ChannelInstance[task.getNumOuputChannels()];
        for (int i = 0; i < outputChannelInstances.length; ++i) {
            outputChannelInstances[i] = task.getOutputChannel(i).createInstance((Executor)this, operatorContext, i);
        }
        long startTime = System.currentTimeMillis();
        Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> results = graphChiExecutionOperator.execute(inputChannelInstances, outputChannelInstances, operatorContext);
        long endTime = System.currentTimeMillis();
        Collection executionLineageNodes = (Collection)results.getField0();
        Collection producedChannelInstances = (Collection)results.getField1();
        for (ChannelInstance outputChannelInstance : outputChannelInstances) {
            if (outputChannelInstance == null) continue;
            executionState.register(outputChannelInstance);
        }
        PartialExecution partialExecution = this.createPartialExecution(executionLineageNodes, endTime - startTime);
        executionState.add(partialExecution);
        this.registerMeasuredCardinalities(producedChannelInstances);
    }

    public void dispose() {
    }

    public GraphChiPlatform getPlatform() {
        return this.platform;
    }
}

