package org.apache.wayang.java.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.apache.wayang.basic.operators.ReduceByOperator;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.function.ReduceDescriptor;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.java.channels.CollectionChannel;
import org.apache.wayang.java.channels.JavaChannelInstance;
import org.apache.wayang.java.channels.StreamChannel;
import org.apache.wayang.java.execution.JavaExecutor;

/* loaded from: input_file:org/apache/wayang/java/operators/JavaReduceByOperator.class */
public class JavaReduceByOperator<Type, KeyType> extends ReduceByOperator<Type, KeyType> implements JavaExecutionOperator {
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/wayang/java/operators/JavaReduceByOperator$ReducingCollector.class */
    private static class ReducingCollector<T> implements Collector<T, List<T>, T> {
        private final BinaryOperator<T> reduceFunction;
        static final /* synthetic */ boolean $assertionsDisabled;

        ReducingCollector(BinaryOperator<T> binaryOperator) {
            this.reduceFunction = binaryOperator;
        }

        @Override // java.util.stream.Collector
        public Supplier<List<T>> supplier() {
            return () -> {
                return new ArrayList(1);
            };
        }

        @Override // java.util.stream.Collector
        public BiConsumer<List<T>, T> accumulator() {
            return (list, obj) -> {
                if (list.isEmpty()) {
                    list.add(obj);
                } else {
                    list.set(0, this.reduceFunction.apply(list.get(0), obj));
                }
            };
        }

        @Override // java.util.stream.Collector
        public BinaryOperator<List<T>> combiner() {
            return (list, list2) -> {
                if (!list.isEmpty() && !list2.isEmpty()) {
                    list.set(0, this.reduceFunction.apply(list.get(0), list2.get(0)));
                    return list;
                }
                return list2;
            };
        }

        @Override // java.util.stream.Collector
        public Function<List<T>, T> finisher() {
            return list -> {
                if ($assertionsDisabled || !list.isEmpty()) {
                    return list.get(0);
                }
                throw new AssertionError();
            };
        }

        @Override // java.util.stream.Collector
        public Set<Collector.Characteristics> characteristics() {
            return Collections.emptySet();
        }

        static {
            $assertionsDisabled = !JavaReduceByOperator.class.desiredAssertionStatus();
        }
    }

    public JavaReduceByOperator(DataSetType<Type> dataSetType, TransformationDescriptor<Type, KeyType> transformationDescriptor, ReduceDescriptor<Type> reduceDescriptor) {
        super(transformationDescriptor, reduceDescriptor, dataSetType);
    }

    public JavaReduceByOperator(ReduceByOperator<Type, KeyType> reduceByOperator) {
        super(reduceByOperator);
    }

    @Override // org.apache.wayang.java.operators.JavaExecutionOperator
    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2, JavaExecutor javaExecutor, OptimizationContext.OperatorContext operatorContext) {
        if (!$assertionsDisabled && channelInstanceArr.length != getNumInputs()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && channelInstanceArr2.length != getNumOutputs()) {
            throw new AssertionError();
        }
        Function compile = javaExecutor.getCompiler().compile(this.keyDescriptor);
        BinaryOperator<Type> compile2 = javaExecutor.getCompiler().compile(this.reduceDescriptor);
        JavaExecutor.openFunction(this, compile2, channelInstanceArr, operatorContext);
        ((CollectionChannel.Instance) channelInstanceArr2[0]).accept(((Map) ((JavaChannelInstance) channelInstanceArr[0]).provideStream().collect(Collectors.groupingBy(compile, new ReducingCollector(compile2)))).values());
        return ExecutionOperator.modelEagerExecution(channelInstanceArr, channelInstanceArr2, operatorContext);
    }

    public String getLoadProfileEstimatorConfigurationKey() {
        return "wayang.java.reduceby.load";
    }

    public Optional<LoadProfileEstimator> createLoadProfileEstimator(Configuration configuration) {
        Optional<LoadProfileEstimator> createLoadProfileEstimator = super.createLoadProfileEstimator(configuration);
        LoadProfileEstimators.nestUdfEstimator(createLoadProfileEstimator, this.keyDescriptor, configuration);
        LoadProfileEstimators.nestUdfEstimator(createLoadProfileEstimator, this.reduceDescriptor, configuration);
        return createLoadProfileEstimator;
    }

    protected ExecutionOperator createCopy() {
        return new JavaReduceByOperator(getType(), getKeyDescriptor(), getReduceDescriptor());
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int i) {
        if ($assertionsDisabled || i <= getNumInputs() || (i == 0 && getNumInputs() == 0)) {
            return getInput(i).isBroadcast() ? Collections.singletonList(CollectionChannel.DESCRIPTOR) : Arrays.asList(CollectionChannel.DESCRIPTOR, StreamChannel.DESCRIPTOR);
        }
        throw new AssertionError();
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int i) {
        if ($assertionsDisabled || i <= getNumOutputs() || (i == 0 && getNumOutputs() == 0)) {
            return Collections.singletonList(CollectionChannel.DESCRIPTOR);
        }
        throw new AssertionError();
    }

    static {
        $assertionsDisabled = !JavaReduceByOperator.class.desiredAssertionStatus();
    }
}
