/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.euphoria.core.translate;

import java.io.Serializable;
import java.util.Objects;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.CombinableBinaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ReduceFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.VoidFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareness;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
import org.apache.beam.sdk.extensions.euphoria.core.translate.LazyAccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
import org.apache.beam.sdk.extensions.euphoria.core.translate.collector.AdaptableCollector;
import org.apache.beam.sdk.extensions.euphoria.core.translate.collector.SingleValueCollector;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;

public class ReduceByKeyTranslator<InputT, KeyT, ValueT, OutputT>
implements OperatorTranslator<InputT, KV<KeyT, OutputT>, ReduceByKey<InputT, KeyT, ValueT, ?, OutputT>> {
    @Override
    public PCollection<KV<KeyT, OutputT>> translate(ReduceByKey<InputT, KeyT, ValueT, ?, OutputT> operator, PCollectionList<InputT> inputs) {
        Preconditions.checkState((!operator.getValueComparator().isPresent() ? 1 : 0) != 0, (Object)"Values sorting is not supported.");
        UnaryFunction keyExtractor = operator.getKeyExtractor();
        UnaryFunction<InputT, ValueT> valueExtractor = operator.getValueExtractor();
        PCollection input = operator.getWindow().map(window -> (PCollection)PCollectionLists.getOnlyElement(inputs).apply((PTransform)window)).orElseGet(() -> PCollectionLists.getOnlyElement(inputs));
        MapElements extractor = MapElements.via(new KeyValueExtractor(keyExtractor, valueExtractor));
        PCollection extracted = ((PCollection)input.apply("extract-keys", (PTransform)extractor)).setTypeDescriptor(TypeDescriptors.kvs(TypeAwareness.orObjects(operator.getKeyType()), TypeAwareness.orObjects(operator.getValueType())));
        LazyAccumulatorProvider accumulators = new LazyAccumulatorProvider(AccumulatorProvider.of(inputs.getPipeline()));
        if (operator.isCombinable()) {
            PCollection combined = operator.isCombineFnStyle() ? (PCollection)extracted.apply("combine", (PTransform)Combine.perKey(ReduceByKeyTranslator.asCombineFn(operator))) : (PCollection)extracted.apply("combine", (PTransform)Combine.perKey(ReduceByKeyTranslator.asCombiner(operator.getReducer(), accumulators, operator.getName().orElse(null))));
            PCollection cast = combined;
            return cast.setTypeDescriptor(operator.getOutputType().orElseThrow(() -> new IllegalStateException("Unable to infer output type descriptor.")));
        }
        return ((PCollection)((PCollection)extracted.apply("group", (PTransform)GroupByKey.create())).setTypeDescriptor(TypeDescriptors.kvs(TypeAwareness.orObjects(operator.getKeyType()), (TypeDescriptor)TypeDescriptors.iterables(TypeAwareness.orObjects(operator.getValueType())))).apply("reduce", (PTransform)ParDo.of(new ReduceDoFn(operator.getReducer(), accumulators, operator.getName().orElse(null))))).setTypeDescriptor(operator.getOutputType().orElseThrow(() -> new IllegalStateException("Unable to infer output type descriptor.")));
    }

    @Override
    public boolean canTranslate(ReduceByKey operator) {
        return !operator.getValueComparator().isPresent();
    }

    private static <InputT, KeyT, ValueT, AccT, OutputT> Combine.CombineFn<ValueT, AccT, OutputT> asCombineFn(ReduceByKey<InputT, KeyT, ValueT, AccT, OutputT> operator) {
        ReduceByKey<InputT, KeyT, ValueT, AccT, OutputT> cast = operator;
        final VoidFunction<AccT> accumulatorFactory = cast.getAccumulatorFactory();
        final BinaryFunction<AccT, ValueT, AccT> accumulate = cast.getAccumulate();
        final CombinableBinaryFunction<AccT> mergeAccumulators = cast.getMergeAccumulators();
        final UnaryFunction<AccT, OutputT> outputFn = cast.getOutputFn();
        final TypeDescriptor<AccT> accumulatorType = cast.getAccumulatorType();
        return new Combine.CombineFn<ValueT, AccT, OutputT>(){

            public AccT createAccumulator() {
                return accumulatorFactory.apply();
            }

            public Coder<AccT> getAccumulatorCoder(CoderRegistry registry, Coder<ValueT> inputCoder) throws CannotProvideCoderException {
                return registry.getCoder(accumulatorType);
            }

            public AccT addInput(AccT mutableAccumulator, ValueT input) {
                return accumulate.apply(mutableAccumulator, input);
            }

            public AccT mergeAccumulators(Iterable<AccT> accumulators) {
                Object accumulated = null;
                for (Object o : accumulators) {
                    if (accumulated == null) {
                        accumulated = o;
                        continue;
                    }
                    accumulated = mergeAccumulators.apply(accumulated, o);
                }
                return accumulated;
            }

            public OutputT extractOutput(AccT accumulator) {
                return outputFn.apply(accumulator);
            }
        };
    }

    private static <InputT, OutputT> SerializableFunction<Iterable<InputT>, InputT> asCombiner(ReduceFunctor<InputT, OutputT> reducer, AccumulatorProvider accumulatorProvider, @Nullable String operatorName) {
        ReduceFunctor combiner = reducer;
        return (SerializableFunction & Serializable)input -> {
            SingleValueCollector collector = new SingleValueCollector(accumulatorProvider, operatorName);
            combiner.apply(StreamSupport.stream(input.spliterator(), false), collector);
            return collector.get();
        };
    }

    private static class ReduceDoFn<KeyT, ValueT, OutputT>
    extends DoFn<KV<KeyT, Iterable<ValueT>>, KV<KeyT, OutputT>> {
        private final ReduceFunctor<ValueT, OutputT> reducer;
        private final AdaptableCollector<KV<KeyT, Iterable<ValueT>>, KV<KeyT, OutputT>, OutputT> collector;

        ReduceDoFn(ReduceFunctor<ValueT, OutputT> reducer, AccumulatorProvider accumulators, @Nullable String operatorName) {
            this.reducer = reducer;
            this.collector = new AdaptableCollector(accumulators, operatorName, (ctx, out) -> ctx.output((Object)KV.of((Object)((KV)ctx.element()).getKey(), (Object)out)));
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx) {
            this.collector.setProcessContext(ctx);
            this.reducer.apply(StreamSupport.stream(Objects.requireNonNull((Iterable)((KV)ctx.element()).getValue()).spliterator(), false), this.collector);
        }
    }

    private static class KeyValueExtractor<InputT, KeyT, ValueT>
    extends SimpleFunction<InputT, KV<KeyT, ValueT>> {
        private final UnaryFunction<InputT, KeyT> keyExtractor;
        private final UnaryFunction<InputT, ValueT> valueExtractor;

        KeyValueExtractor(UnaryFunction<InputT, KeyT> keyExtractor, UnaryFunction<InputT, ValueT> valueExtractor) {
            this.keyExtractor = keyExtractor;
            this.valueExtractor = valueExtractor;
        }

        public KV<KeyT, ValueT> apply(InputT in) {
            return KV.of(this.keyExtractor.apply(in), this.valueExtractor.apply(in));
        }
    }
}

