package org.apache.wayang.java.operators;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.wayang.basic.operators.IntersectOperator;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.java.channels.CollectionChannel;
import org.apache.wayang.java.channels.JavaChannelInstance;
import org.apache.wayang.java.channels.StreamChannel;
import org.apache.wayang.java.execution.JavaExecutor;

/* loaded from: input_file:org/apache/wayang/java/operators/JavaIntersectOperator.class */
public class JavaIntersectOperator<Type> extends IntersectOperator<Type> implements JavaExecutionOperator {
    static final /* synthetic */ boolean $assertionsDisabled;

    public JavaIntersectOperator(DataSetType<Type> dataSetType) {
        super(dataSetType);
    }

    public JavaIntersectOperator(Class<Type> cls) {
        super(cls);
    }

    public JavaIntersectOperator(IntersectOperator<Type> intersectOperator) {
        super(intersectOperator);
    }

    @Override // org.apache.wayang.java.operators.JavaExecutionOperator
    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2, JavaExecutor javaExecutor, OptimizationContext.OperatorContext operatorContext) {
        Stream provideStream;
        Set<Type> createProbingTable;
        if (!$assertionsDisabled && channelInstanceArr.length != getNumInputs()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && channelInstanceArr2.length != getNumOutputs()) {
            throw new AssertionError();
        }
        CardinalityEstimate inputCardinality = operatorContext.getInputCardinality(0);
        CardinalityEstimate outputCardinality = operatorContext.getOutputCardinality(0);
        ExecutionLineageNode executionLineageNode = new ExecutionLineageNode(operatorContext);
        executionLineageNode.add(LoadProfileEstimators.createFromSpecification("wayang.java.intersect.load.indexing", javaExecutor.getConfiguration()));
        ExecutionLineageNode executionLineageNode2 = new ExecutionLineageNode(operatorContext);
        executionLineageNode2.add(LoadProfileEstimators.createFromSpecification("wayang.java.intersect.load.probing", javaExecutor.getConfiguration()));
        boolean z = (inputCardinality == null || outputCardinality == null || inputCardinality.getUpperEstimate() > outputCardinality.getUpperEstimate()) ? false : true;
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        if (z) {
            provideStream = ((JavaChannelInstance) channelInstanceArr[0]).provideStream();
            createProbingTable = createProbingTable(((JavaChannelInstance) channelInstanceArr[1]).provideStream());
            executionLineageNode.addPredecessor(channelInstanceArr[0].getLineage());
            executionLineageNode2.addPredecessor(channelInstanceArr[1].getLineage());
        } else {
            provideStream = ((JavaChannelInstance) channelInstanceArr[1]).provideStream();
            createProbingTable = createProbingTable(((JavaChannelInstance) channelInstanceArr[0]).provideStream());
            executionLineageNode.addPredecessor(channelInstanceArr[1].getLineage());
            executionLineageNode2.addPredecessor(channelInstanceArr[0].getLineage());
        }
        Set<Type> set = createProbingTable;
        set.getClass();
        ((StreamChannel.Instance) channelInstanceArr2[0]).accept(provideStream.filter(set::remove));
        channelInstanceArr2[0].getLineage().addPredecessor(executionLineageNode2);
        executionLineageNode.collectAndMark(linkedList, linkedList2);
        return new Tuple<>(linkedList, linkedList2);
    }

    private Set<Type> createProbingTable(Stream<Type> stream) {
        return (Set) stream.collect(Collectors.toSet());
    }

    public Collection<String> getLoadProfileEstimatorConfigurationKeys() {
        return Arrays.asList("wayang.java.intersect.load.indexing", "wayang.java.intersect.load.probing");
    }

    protected ExecutionOperator createCopy() {
        return new JavaIntersectOperator(getType());
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int i) {
        if ($assertionsDisabled || i <= getNumInputs() || (i == 0 && getNumInputs() == 0)) {
            return Arrays.asList(CollectionChannel.DESCRIPTOR, StreamChannel.DESCRIPTOR);
        }
        throw new AssertionError();
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int i) {
        if ($assertionsDisabled || i <= getNumOutputs() || (i == 0 && getNumOutputs() == 0)) {
            return Collections.singletonList(StreamChannel.DESCRIPTOR);
        }
        throw new AssertionError();
    }

    static {
        $assertionsDisabled = !JavaIntersectOperator.class.desiredAssertionStatus();
    }
}
