/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.euphoria.core.client.operator;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Recommended;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.CombinableReduceFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CompositeOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.WindowBuilder;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.OptionalMethodBuilder;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.ShuffleOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
import org.apache.beam.sdk.extensions.euphoria.core.translate.TimestampExtractTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

@Recommended(reason="Might be useful to override the default implementation because of performance reasons(e.g. using bloom filters), which might reduce the space complexity", state=StateComplexity.CONSTANT, repartitions=1)
public class Distinct<InputT, KeyT>
extends ShuffleOperator<InputT, KeyT, InputT>
implements CompositeOperator<InputT, InputT> {
    private final boolean projected;
    private final @Nullable SelectionPolicy policy;

    public static <InputT> ProjectedBuilder<InputT, InputT> of(PCollection<InputT> input) {
        return Distinct.named(null).of((PCollection)input);
    }

    public static OfBuilder named(@Nullable String name) {
        return new Builder(name);
    }

    private Distinct(@Nullable String name, UnaryFunction<InputT, KeyT> transform, @Nullable TypeDescriptor<InputT> outputType, @Nullable TypeDescriptor<KeyT> projectedType, @Nullable Window<InputT> window, @Nullable SelectionPolicy policy, boolean projected) {
        super(name, outputType, transform, projectedType, window);
        this.projected = projected;
        this.policy = policy;
        Preconditions.checkState((!projected || policy != null ? 1 : 0) != 0, (Object)"Please specify selection policy when using projected distinct.");
    }

    @Override
    public PCollection<InputT> expand(PCollectionList<InputT> inputs) {
        PCollection tmp = PCollectionLists.getOnlyElement(inputs);
        PCollection<InputT> input = this.getWindow().map(w -> {
            PCollection ret = (PCollection)tmp.apply((PTransform)w);
            ret.setTypeDescriptor(tmp.getTypeDescriptor());
            return ret;
        }).orElse(tmp);
        if (!this.projected) {
            PCollection distinct = ReduceByKey.named(this.getName().orElse(null)).of((PCollection)input).keyBy(e -> e, input.getTypeDescriptor()).valueBy(e -> null, TypeDescriptors.nulls()).combineBy(e -> null, (TypeDescriptor<Void>)TypeDescriptors.nulls()).output();
            return MapElements.named(this.getName().orElse("") + "::extract-keys").of(distinct).using(KV::getKey, input.getTypeDescriptor()).output();
        }
        UnaryFunction<PCollection<PCollection<InputT>>, PCollection<PCollection<InputT>>> transformFn = this.getTransformFn();
        return transformFn.apply(input);
    }

    private UnaryFunction<PCollection<InputT>, PCollection<InputT>> getTransformFn() {
        switch (this.policy) {
            case NEWEST: 
            case OLDEST: {
                String name = this.getName().orElse(null);
                return input -> (PCollection)input.apply(TimestampExtractTransform.of(name, this::reduceTimestamped));
            }
            case ANY: {
                return this::reduceSelectingAny;
            }
        }
        throw new IllegalArgumentException("Unknown policy " + (Object)((Object)this.policy));
    }

    private PCollection<InputT> reduceSelectingAny(PCollection<InputT> input) {
        return ReduceByKey.named(this.getName().orElse(null)).of((PCollection)input).keyBy(this.getKeyExtractor(), (TypeDescriptor)this.getKeyType().orElse(null)).valueBy(e -> e, this.getOutputType().orElse(null)).combineBy(values -> Distinct.nonEmpty(values.findAny()), (TypeDescriptor<Object>)((TypeDescriptor)this.getOutputType().orElse(null))).outputValues();
    }

    private PCollection<InputT> reduceTimestamped(PCollection<KV<Long, InputT>> input) {
        CombinableReduceFunction<KV<Long, InputT>> select = this.getReduceFn();
        PCollection outputValues = ReduceByKey.named(this.getName().orElse(null)).of((PCollection)input).keyBy(e -> this.getKeyExtractor().apply(e.getValue()), (TypeDescriptor)this.getKeyType().orElse(null)).valueBy(e -> e, Objects.requireNonNull(input.getTypeDescriptor())).combineBy(select, (TypeDescriptor<KV>)Objects.requireNonNull(input.getTypeDescriptor())).outputValues();
        return MapElements.named(this.getName().map(n -> n + "::unwrap").orElse(null)).of(outputValues).using(KV::getValue, (TypeDescriptor)this.getOutputType().orElse(null)).output();
    }

    private CombinableReduceFunction<KV<Long, InputT>> getReduceFn() {
        return this.policy == SelectionPolicy.NEWEST ? values -> (KV)Distinct.nonEmpty(values.collect(Collectors.maxBy((a, b) -> Long.compare((Long)a.getKey(), (Long)b.getKey())))) : values -> (KV)Distinct.nonEmpty(values.collect(Collectors.minBy((a, b) -> Long.compare((Long)a.getKey(), (Long)b.getKey()))));
    }

    private static <T> T nonEmpty(Optional<T> in) {
        return in.orElseThrow(() -> new IllegalStateException("Empty reduce values?"));
    }

    private static class Builder<InputT, KeyT>
    implements OfBuilder,
    ProjectedBuilder<InputT, KeyT>,
    WindowByBuilder<InputT, KeyT>,
    TriggerByBuilder<InputT>,
    AccumulationModeBuilder<InputT>,
    WindowedOutputBuilder<InputT>,
    Builders.Output<InputT> {
        private final WindowBuilder<InputT> windowBuilder = new WindowBuilder();
        private final @Nullable String name;
        private PCollection<InputT> input;
        private UnaryFunction<InputT, KeyT> transform = e -> e;
        private SelectionPolicy policy = null;
        private @Nullable TypeDescriptor<KeyT> projectedType;
        @SuppressFBWarnings(value={"UWF_NULL_FIELD"})
        private @Nullable TypeDescriptor<InputT> outputType = null;
        private boolean projected = false;

        Builder(@Nullable String name) {
            this.name = name;
        }

        public <T> ProjectedBuilder<T, T> of(PCollection<T> input) {
            Builder cast = this;
            cast.input = Objects.requireNonNull(input);
            return cast;
        }

        @Override
        public <K> WindowByBuilder<InputT, K> projected(UnaryFunction<InputT, K> transform, SelectionPolicy policy, @Nullable TypeDescriptor<K> projectedType) {
            Builder cast = this;
            cast.transform = Objects.requireNonNull(transform);
            cast.policy = Objects.requireNonNull(policy);
            cast.projectedType = projectedType;
            cast.projected = true;
            return cast;
        }

        @Override
        public <W extends BoundedWindow> TriggerByBuilder<InputT> windowBy(WindowFn<Object, W> windowFn) {
            this.windowBuilder.windowBy((WindowFn)windowFn);
            return this;
        }

        @Override
        public AccumulationModeBuilder<InputT> triggeredBy(Trigger trigger) {
            this.windowBuilder.triggeredBy(trigger);
            return this;
        }

        @Override
        public WindowedOutputBuilder<InputT> accumulationMode(WindowingStrategy.AccumulationMode accumulationMode) {
            this.windowBuilder.accumulationMode(accumulationMode);
            return this;
        }

        @Override
        public WindowedOutputBuilder<InputT> withAllowedLateness(Duration allowedLateness) {
            this.windowBuilder.withAllowedLateness(allowedLateness);
            return this;
        }

        @Override
        public WindowedOutputBuilder<InputT> withAllowedLateness(Duration allowedLateness, Window.ClosingBehavior closingBehavior) {
            this.windowBuilder.withAllowedLateness(allowedLateness, closingBehavior);
            return this;
        }

        @Override
        public WindowedOutputBuilder<InputT> withTimestampCombiner(TimestampCombiner timestampCombiner) {
            this.windowBuilder.withTimestampCombiner(timestampCombiner);
            return this;
        }

        @Override
        public WindowedOutputBuilder<InputT> withOnTimeBehavior(Window.OnTimeBehavior behavior) {
            this.windowBuilder.withOnTimeBehavior(behavior);
            return this;
        }

        @Override
        public PCollection<InputT> output() {
            if (this.transform == null) {
                this.transform = UnaryFunction.identity();
            }
            Distinct distinct = new Distinct(this.name, this.transform, this.outputType, this.projectedType, this.windowBuilder.getWindow().orElse(null), this.policy, this.projected);
            return OperatorTransform.apply(distinct, PCollectionList.of(this.input));
        }
    }

    public static interface WindowedOutputBuilder<T>
    extends Builders.WindowedOutput<WindowedOutputBuilder<T>>,
    Builders.Output<T> {
    }

    public static interface AccumulationModeBuilder<T>
    extends Builders.AccumulationMode<WindowedOutputBuilder<T>> {
        @Override
        public WindowedOutputBuilder<T> accumulationMode(WindowingStrategy.AccumulationMode var1);
    }

    public static interface TriggerByBuilder<T>
    extends Builders.TriggeredBy<AccumulationModeBuilder<T>> {
        @Override
        public AccumulationModeBuilder<T> triggeredBy(Trigger var1);
    }

    public static interface WindowByBuilder<InputT, KeyT>
    extends Builders.WindowBy<TriggerByBuilder<InputT>>,
    OptionalMethodBuilder<WindowByBuilder<InputT, KeyT>, Builders.Output<InputT>>,
    Builders.Output<InputT> {
        @Override
        public <W extends BoundedWindow> TriggerByBuilder<InputT> windowBy(WindowFn<Object, W> var1);

        @Override
        default public Builders.Output<InputT> applyIf(boolean cond, UnaryFunction<WindowByBuilder<InputT, KeyT>, Builders.Output<InputT>> fn) {
            return cond ? Objects.requireNonNull(fn).apply(this) : this;
        }
    }

    public static interface ProjectedBuilder<InputT, KeyT>
    extends WindowByBuilder<InputT, KeyT> {
        default public <KeyT> WindowByBuilder<InputT, KeyT> projected(UnaryFunction<InputT, KeyT> transform) {
            return this.projected(transform, SelectionPolicy.ANY, null);
        }

        default public <KeyT> WindowByBuilder<InputT, KeyT> projected(UnaryFunction<InputT, KeyT> transform, TypeDescriptor<KeyT> projectedType) {
            return this.projected(transform, SelectionPolicy.ANY, Objects.requireNonNull(projectedType));
        }

        default public <KeyT> WindowByBuilder<InputT, KeyT> projected(UnaryFunction<InputT, KeyT> transform, SelectionPolicy policy) {
            return this.projected(transform, policy, null);
        }

        public <KeyT> WindowByBuilder<InputT, KeyT> projected(UnaryFunction<InputT, KeyT> var1, SelectionPolicy var2, @Nullable TypeDescriptor<KeyT> var3);
    }

    public static enum SelectionPolicy {
        ANY,
        OLDEST,
        NEWEST;

    }

    public static interface OfBuilder
    extends Builders.Of {
        @Override
        public <InputT> ProjectedBuilder<InputT, InputT> of(PCollection<InputT> var1);
    }
}

