package org.apache.wayang.flink.operators;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.wayang.basic.operators.LoopOperator;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.function.PredicateDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.LoopHeadOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.flink.channels.DataSetChannel;
import org.apache.wayang.flink.compiler.criterion.WayangAggregator;
import org.apache.wayang.flink.compiler.criterion.WayangConvergenceCriterion;
import org.apache.wayang.flink.compiler.criterion.WayangFilterCriterion;
import org.apache.wayang.flink.execution.FlinkExecutor;

/* loaded from: input_file:org/apache/wayang/flink/operators/FlinkLoopOperator.class */
public class FlinkLoopOperator<InputType, ConvergenceType> extends LoopOperator<InputType, ConvergenceType> implements FlinkExecutionOperator {
    private IterativeDataSet iterativeDataSet;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: org.apache.wayang.flink.operators.FlinkLoopOperator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/wayang/flink/operators/FlinkLoopOperator$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$wayang$core$plan$wayangplan$LoopHeadOperator$State = new int[LoopHeadOperator.State.values().length];

        static {
            try {
                $SwitchMap$org$apache$wayang$core$plan$wayangplan$LoopHeadOperator$State[LoopHeadOperator.State.NOT_STARTED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$wayang$core$plan$wayangplan$LoopHeadOperator$State[LoopHeadOperator.State.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public FlinkLoopOperator(DataSetType<InputType> dataSetType, DataSetType<ConvergenceType> dataSetType2, FunctionDescriptor.SerializablePredicate<Collection<ConvergenceType>> serializablePredicate, Integer num) {
        super(dataSetType, dataSetType2, serializablePredicate, num);
    }

    public FlinkLoopOperator(DataSetType<InputType> dataSetType, DataSetType<ConvergenceType> dataSetType2, PredicateDescriptor<Collection<ConvergenceType>> predicateDescriptor, Integer num) {
        super(dataSetType, dataSetType2, predicateDescriptor, num);
    }

    public FlinkLoopOperator(LoopOperator<InputType, ConvergenceType> loopOperator) {
        super(loopOperator);
    }

    @Override // org.apache.wayang.flink.operators.FlinkExecutionOperator
    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2, FlinkExecutor flinkExecutor, OptimizationContext.OperatorContext operatorContext) {
        if (!$assertionsDisabled && channelInstanceArr.length != getNumInputs()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && channelInstanceArr2.length != getNumOutputs()) {
            throw new AssertionError();
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$wayang$core$plan$wayangplan$LoopHeadOperator$State[getState().ordinal()]) {
            case 1:
                DataSet provideDataSet = ((DataSetChannel.Instance) channelInstanceArr[0]).provideDataSet();
                DataSetChannel.Instance instance = (DataSetChannel.Instance) channelInstanceArr2[0];
                WayangConvergenceCriterion compile = flinkExecutor.getCompiler().compile(this.criterionDescriptor);
                DataSet provideDataSet2 = ((DataSetChannel.Instance) channelInstanceArr[1]).provideDataSet();
                DataSetChannel.Instance instance2 = (DataSetChannel.Instance) channelInstanceArr2[1];
                this.iterativeDataSet = provideDataSet.iterate(100).registerAggregationConvergenceCriterion("Iteration_" + getName(), new WayangAggregator(), compile);
                instance.accept(this.iterativeDataSet, flinkExecutor);
                instance2.accept(provideDataSet2, flinkExecutor);
                channelInstanceArr2[2] = null;
                setState(LoopHeadOperator.State.RUNNING);
                break;
            case 2:
                if (!$assertionsDisabled && this.iterativeDataSet == null) {
                    throw new AssertionError();
                }
                ((DataSetChannel.Instance) channelInstanceArr2[2]).accept(this.iterativeDataSet.closeWith(((DataSetChannel.Instance) channelInstanceArr[2]).provideDataSet().filter(new WayangFilterCriterion("Iteration_" + getName()))), flinkExecutor);
                channelInstanceArr2[0] = null;
                setState(LoopHeadOperator.State.FINISHED);
                break;
                break;
            default:
                throw new IllegalStateException(String.format("%s is finished, yet executed.", this));
        }
        return ExecutionOperator.modelLazyExecution(channelInstanceArr, channelInstanceArr2, operatorContext);
    }

    @Override // org.apache.wayang.flink.operators.FlinkExecutionOperator
    public boolean containsAction() {
        return false;
    }

    public String getLoadProfileEstimatorConfigurationKey() {
        return "wayang.flink.loop.load";
    }

    protected ExecutionOperator createCopy() {
        return new FlinkLoopOperator(this);
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int i) {
        if (!$assertionsDisabled && i > getNumInputs() && (i != 0 || getNumInputs() != 0)) {
            throw new AssertionError();
        }
        switch (i) {
            case 0:
            case 2:
                return Collections.singletonList(DataSetChannel.DESCRIPTOR);
            case 1:
            case 3:
                return Collections.singletonList(DataSetChannel.DESCRIPTOR);
            default:
                throw new IllegalStateException(String.format("%s has no %d-th input.", this, Integer.valueOf(i)));
        }
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int i) {
        if (!$assertionsDisabled && i > getNumOutputs() && (i != 0 || getNumOutputs() != 0)) {
            throw new AssertionError();
        }
        switch (i) {
            case 0:
            case 2:
                return Collections.singletonList(DataSetChannel.DESCRIPTOR);
            case 1:
            case 3:
                return Collections.singletonList(DataSetChannel.DESCRIPTOR);
            default:
                throw new IllegalStateException(String.format("%s has no %d-th input.", this, Integer.valueOf(i)));
        }
    }

    static {
        $assertionsDisabled = !FlinkLoopOperator.class.desiredAssertionStatus();
    }
}
