package org.apache.wayang.flink.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.util.Collector;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.CoGroupOperator;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.flink.channels.DataSetChannel;
import org.apache.wayang.flink.compiler.FunctionCompiler;
import org.apache.wayang.flink.execution.FlinkExecutor;

/* loaded from: input_file:org/apache/wayang/flink/operators/FlinkCoGroupOperator.class */
public class FlinkCoGroupOperator<InputType0, InputType1, TypeKey> extends CoGroupOperator<InputType0, InputType1, TypeKey> implements FlinkExecutionOperator {
    static final /* synthetic */ boolean $assertionsDisabled;

    public FlinkCoGroupOperator(FunctionDescriptor.SerializableFunction<InputType0, TypeKey> serializableFunction, FunctionDescriptor.SerializableFunction<InputType1, TypeKey> serializableFunction2, Class<InputType0> cls, Class<InputType1> cls2, Class<TypeKey> cls3) {
        super(serializableFunction, serializableFunction2, cls, cls2, cls3);
    }

    public FlinkCoGroupOperator(TransformationDescriptor<InputType0, TypeKey> transformationDescriptor, TransformationDescriptor<InputType1, TypeKey> transformationDescriptor2) {
        super(transformationDescriptor, transformationDescriptor2);
    }

    public FlinkCoGroupOperator(TransformationDescriptor<InputType0, TypeKey> transformationDescriptor, TransformationDescriptor<InputType1, TypeKey> transformationDescriptor2, DataSetType<InputType0> dataSetType, DataSetType<InputType1> dataSetType2) {
        super(transformationDescriptor, transformationDescriptor2, dataSetType, dataSetType2);
    }

    public FlinkCoGroupOperator(CoGroupOperator<InputType0, InputType1, TypeKey> coGroupOperator) {
        super(coGroupOperator);
    }

    @Override // org.apache.wayang.flink.operators.FlinkExecutionOperator
    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2, FlinkExecutor flinkExecutor, OptimizationContext.OperatorContext operatorContext) {
        if (!$assertionsDisabled && channelInstanceArr.length != getNumInputs()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && channelInstanceArr2.length != getNumOutputs()) {
            throw new AssertionError();
        }
        DataSetChannel.Instance instance = (DataSetChannel.Instance) channelInstanceArr[0];
        DataSetChannel.Instance instance2 = (DataSetChannel.Instance) channelInstanceArr[1];
        DataSetChannel.Instance instance3 = (DataSetChannel.Instance) channelInstanceArr2[0];
        DataSet provideDataSet = instance.provideDataSet();
        DataSet provideDataSet2 = instance2.provideDataSet();
        FunctionCompiler compiler = flinkExecutor.getCompiler();
        KeySelector compileKeySelector = compiler.compileKeySelector(this.keyDescriptor0);
        instance3.accept(provideDataSet.coGroup(provideDataSet2).where(compileKeySelector).equalTo(compiler.compileKeySelector(this.keyDescriptor1)).with(new CoGroupFunction<InputType0, InputType1, Tuple2<Iterable<InputType0>, Iterable<InputType1>>>() { // from class: org.apache.wayang.flink.operators.FlinkCoGroupOperator.1
            public void coGroup(Iterable<InputType0> iterable, Iterable<InputType1> iterable2, Collector<Tuple2<Iterable<InputType0>, Iterable<InputType1>>> collector) {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                arrayList.getClass();
                iterable.forEach(arrayList::add);
                arrayList2.getClass();
                iterable2.forEach(arrayList2::add);
                collector.collect(new Tuple2(arrayList, arrayList2));
            }
        }).returns(ReflectionUtils.specify(Tuple2.class)), flinkExecutor);
        return ExecutionOperator.modelLazyExecution(channelInstanceArr, channelInstanceArr2, operatorContext);
    }

    protected ExecutionOperator createCopy() {
        return new FlinkCoGroupOperator(this);
    }

    public String getLoadProfileEstimatorConfigurationTypeKey() {
        return "wayang.flink.cogroup.load";
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int i) {
        if ($assertionsDisabled || i <= getNumInputs() || (i == 0 && getNumInputs() == 0)) {
            return Arrays.asList(DataSetChannel.DESCRIPTOR, DataSetChannel.DESCRIPTOR_MANY);
        }
        throw new AssertionError();
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int i) {
        if ($assertionsDisabled || i <= getNumOutputs() || (i == 0 && getNumOutputs() == 0)) {
            return Collections.singletonList(DataSetChannel.DESCRIPTOR);
        }
        throw new AssertionError();
    }

    @Override // org.apache.wayang.flink.operators.FlinkExecutionOperator
    public boolean containsAction() {
        return false;
    }

    static {
        $assertionsDisabled = !FlinkCoGroupOperator.class.desiredAssertionStatus();
    }
}
