package org.apache.wayang.giraph.execution;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.stream.Stream;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.ExecutionState;
import org.apache.wayang.core.platform.ExecutorTemplate;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.giraph.operators.GiraphExecutionOperator;
import org.apache.wayang.giraph.platform.GiraphPlatform;

/* loaded from: input_file:org/apache/wayang/giraph/execution/GiraphExecutor.class */
public class GiraphExecutor extends ExecutorTemplate {
    private final GiraphPlatform platform;
    private Configuration configuration;
    private Job job;
    private GiraphConfiguration giraphConfiguration;

    public GiraphExecutor(GiraphPlatform giraphPlatform, Job job) {
        super(job.getCrossPlatformExecutor());
        this.job = job;
        this.platform = giraphPlatform;
        this.configuration = job.getConfiguration();
        this.giraphConfiguration = new GiraphConfiguration();
    }

    public void execute(ExecutionStage executionStage, OptimizationContext optimizationContext, ExecutionState executionState) {
        LinkedList linkedList = new LinkedList(executionStage.getStartTasks());
        HashSet hashSet = new HashSet();
        while (!linkedList.isEmpty()) {
            ExecutionTask executionTask = (ExecutionTask) linkedList.poll();
            if (!hashSet.contains(executionTask)) {
                execute(executionTask, optimizationContext, executionState);
                hashSet.add(executionTask);
                Stream filter = Arrays.stream(executionTask.getOutputChannels()).flatMap(channel -> {
                    return channel.getConsumers().stream();
                }).filter(executionTask2 -> {
                    return executionTask2.getStage() == executionStage;
                });
                linkedList.getClass();
                filter.forEach((v1) -> {
                    r1.add(v1);
                });
            }
        }
    }

    private void execute(ExecutionTask executionTask, OptimizationContext optimizationContext, ExecutionState executionState) {
        GiraphExecutionOperator giraphExecutionOperator = (GiraphExecutionOperator) executionTask.getOperator();
        ChannelInstance[] channelInstanceArr = new ChannelInstance[executionTask.getNumInputChannels()];
        for (int i = 0; i < channelInstanceArr.length; i++) {
            channelInstanceArr[i] = executionState.getChannelInstance(executionTask.getInputChannel(i));
        }
        OptimizationContext.OperatorContext operatorContext = optimizationContext.getOperatorContext(giraphExecutionOperator);
        ChannelInstance[] channelInstanceArr2 = new ChannelInstance[executionTask.getNumOuputChannels()];
        for (int i2 = 0; i2 < channelInstanceArr2.length; i2++) {
            channelInstanceArr2[i2] = executionTask.getOutputChannel(i2).createInstance(this, operatorContext, i2);
        }
        long currentTimeMillis = System.currentTimeMillis();
        Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> execute = giraphExecutionOperator.execute(channelInstanceArr, channelInstanceArr2, this, operatorContext);
        long currentTimeMillis2 = System.currentTimeMillis();
        Collection collection = (Collection) execute.getField0();
        Collection collection2 = (Collection) execute.getField1();
        for (ChannelInstance channelInstance : channelInstanceArr2) {
            if (channelInstance != null) {
                executionState.register(channelInstance);
            }
        }
        executionState.add(createPartialExecution(collection, currentTimeMillis2 - currentTimeMillis));
        registerMeasuredCardinalities(collection2);
    }

    public void dispose() {
    }

    /* renamed from: getPlatform, reason: merged with bridge method [inline-methods] */
    public GiraphPlatform m8getPlatform() {
        return this.platform;
    }

    public GiraphConfiguration getGiraphConfiguration() {
        return this.giraphConfiguration;
    }
}
