package org.apache.wayang.spark.execution;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.PartialExecution;
import org.apache.wayang.core.platform.PushExecutorTemplate;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Formats;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.spark.channels.RddChannel;
import org.apache.wayang.spark.compiler.FunctionCompiler;
import org.apache.wayang.spark.operators.SparkExecutionOperator;
import org.apache.wayang.spark.platform.SparkPlatform;

/* loaded from: input_file:org/apache/wayang/spark/execution/SparkExecutor.class */
public class SparkExecutor extends PushExecutorTemplate {
    private final SparkContextReference sparkContextReference;
    public final JavaSparkContext sc;
    public FunctionCompiler compiler;
    private final SparkPlatform platform;
    private final int numDefaultPartitions;
    private int numActions;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SparkExecutor(SparkPlatform sparkPlatform, Job job) {
        super(job);
        this.compiler = new FunctionCompiler();
        this.numActions = 0;
        this.platform = sparkPlatform;
        this.sparkContextReference = this.platform.getSparkContext(job);
        this.sparkContextReference.noteObtainedReference();
        this.sc = this.sparkContextReference.get();
        if (this.sc.getConf().contains("spark.executor.cores")) {
            this.numDefaultPartitions = 2 * this.sc.getConf().getInt("spark.executor.cores", -1);
        } else {
            this.numDefaultPartitions = (int) (2 * getConfiguration().getLongProperty("wayang.spark.machines") * getConfiguration().getLongProperty("wayang.spark.cores-per-machine"));
        }
    }

    protected Tuple<List<ChannelInstance>, PartialExecution> execute(ExecutionTask executionTask, List<ChannelInstance> list, OptimizationContext.OperatorContext operatorContext, boolean z) {
        ChannelInstance[] createOutputChannelInstances = executionTask.getOperator().createOutputChannelInstances(this, executionTask, operatorContext, list);
        this.job.reportProgress(executionTask.getOperator().getName(), 50);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate = cast(executionTask.getOperator()).evaluate(toArray(list), createOutputChannelInstances, this, operatorContext);
            Collection collection = (Collection) evaluate.getField0();
            Collection collection2 = (Collection) evaluate.getField1();
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            this.job.reportProgress(executionTask.getOperator().getName(), 100);
            PartialExecution createPartialExecution = createPartialExecution(collection, currentTimeMillis2);
            if (createPartialExecution != null && cast(executionTask.getOperator()).containsAction()) {
                if (this.numActions == 0) {
                    createPartialExecution.addInitializedPlatform(SparkPlatform.getInstance());
                }
                this.numActions++;
            }
            if (createPartialExecution == null && currentTimeMillis2 > 10) {
                this.logger.warn("Execution of {} took suspiciously long ({}).", executionTask, Formats.formatDuration(currentTimeMillis2));
            }
            registerMeasuredCardinalities(collection2);
            if (z && createPartialExecution == null) {
                this.logger.info("{} was not executed eagerly as requested.", executionTask);
            }
            return new Tuple<>(Arrays.asList(createOutputChannelInstances), createPartialExecution);
        } catch (Exception e) {
            throw new WayangException(String.format("Executing %s failed.", executionTask), e);
        }
    }

    private static SparkExecutionOperator cast(ExecutionOperator executionOperator) {
        return (SparkExecutionOperator) executionOperator;
    }

    private static ChannelInstance[] toArray(List<ChannelInstance> list) {
        return (ChannelInstance[]) list.toArray(new ChannelInstance[list.size()]);
    }

    public void forward(ChannelInstance channelInstance, ChannelInstance channelInstance2) {
        RddChannel.Instance instance = (RddChannel.Instance) channelInstance;
        RddChannel.Instance instance2 = (RddChannel.Instance) channelInstance2;
        if (!$assertionsDisabled && instance.m10getChannel().getDescriptor() != RddChannel.CACHED_DESCRIPTOR && instance2.m10getChannel().getDescriptor() == RddChannel.CACHED_DESCRIPTOR) {
            throw new AssertionError();
        }
        instance2.accept(instance.provideRdd(), this);
        channelInstance2.getLineage().addPredecessor(channelInstance.getLineage());
    }

    /* renamed from: getPlatform, reason: merged with bridge method [inline-methods] */
    public SparkPlatform m15getPlatform() {
        return this.platform;
    }

    public int getNumDefaultPartitions() {
        return this.numDefaultPartitions;
    }

    public void dispose() {
        super.dispose();
        this.sparkContextReference.noteDiscardedReference(true);
    }

    public FunctionCompiler getCompiler() {
        return this.compiler;
    }

    static {
        $assertionsDisabled = !SparkExecutor.class.desiredAssertionStatus();
    }
}
