/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.euphoria.core.client.operator;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Recommended;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.CombinableReduceFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CompositeOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.WindowBuilder;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.OptionalMethodBuilder;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.ShuffleOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
import org.apache.beam.sdk.extensions.euphoria.core.translate.TimestampExtractTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;

@Recommended(reason="Might be useful to override the default implementation because of performance reasons(e.g. using bloom filters), which might reduce the space complexity", state=StateComplexity.CONSTANT, repartitions=1)
public class Distinct<@UnknownKeyFor InputT, @UnknownKeyFor KeyT>
extends ShuffleOperator<InputT, KeyT, InputT>
implements CompositeOperator<InputT, InputT> {
    private final @UnknownKeyFor @NonNull @Initialized boolean projected;
    private final @Nullable @UnknownKeyFor @Initialized SelectionPolicy policy;

    public static <InputT> @UnknownKeyFor @NonNull @Initialized ProjectedBuilder<InputT, InputT> of(@UnknownKeyFor @NonNull @Initialized PCollection<InputT> input) {
        return Distinct.named(null).of((PCollection)input);
    }

    public static @UnknownKeyFor @NonNull @Initialized OfBuilder named(@Nullable @UnknownKeyFor @Initialized String name) {
        return new Builder(name);
    }

    private Distinct(@Nullable @UnknownKeyFor @Initialized String name, @UnknownKeyFor @NonNull @Initialized UnaryFunction<InputT, KeyT> transform, @Nullable @UnknownKeyFor @Initialized TypeDescriptor<InputT> outputType, @Nullable @UnknownKeyFor @Initialized TypeDescriptor<KeyT> projectedType, @Nullable @UnknownKeyFor @Initialized Window<InputT> window, @Nullable @UnknownKeyFor @Initialized SelectionPolicy policy, @UnknownKeyFor @NonNull @Initialized boolean projected) {
        super(name, outputType, transform, projectedType, window);
        this.projected = projected;
        this.policy = policy;
        Preconditions.checkState((!projected || policy != null ? 1 : 0) != 0, (Object)"Please specify selection policy when using projected distinct.");
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized PCollection<InputT> expand(@UnknownKeyFor @NonNull @Initialized PCollectionList<InputT> inputs) {
        PCollection tmp = PCollectionLists.getOnlyElement(inputs);
        PCollection<InputT> input = this.getWindow().map(w -> {
            PCollection ret = (PCollection)tmp.apply((PTransform)w);
            ret.setTypeDescriptor(tmp.getTypeDescriptor());
            return ret;
        }).orElse(tmp);
        if (!this.projected) {
            PCollection distinct = ReduceByKey.named(this.getName().orElse(null)).of((PCollection)input).keyBy(e -> e, input.getTypeDescriptor()).valueBy(e -> null, TypeDescriptors.nulls()).combineBy(e -> null, (TypeDescriptor<Void>)TypeDescriptors.nulls()).output();
            return MapElements.named(this.getName().orElse("") + "::extract-keys").of(distinct).using(KV::getKey, input.getTypeDescriptor()).output();
        }
        UnaryFunction<PCollection<PCollection<InputT>>, PCollection<PCollection<InputT>>> transformFn = this.getTransformFn();
        return transformFn.apply(input);
    }

    private @UnknownKeyFor @NonNull @Initialized UnaryFunction<@UnknownKeyFor @NonNull @Initialized PCollection<InputT>, @UnknownKeyFor @NonNull @Initialized PCollection<InputT>> getTransformFn() {
        switch (this.policy) {
            case NEWEST: 
            case OLDEST: {
                String name = this.getName().orElse(null);
                return input -> (PCollection)input.apply(TimestampExtractTransform.of(name, this::reduceTimestamped));
            }
            case ANY: {
                return this::reduceSelectingAny;
            }
        }
        throw new IllegalArgumentException("Unknown policy " + (Object)((Object)this.policy));
    }

    private @UnknownKeyFor @NonNull @Initialized PCollection<InputT> reduceSelectingAny(@UnknownKeyFor @NonNull @Initialized PCollection<InputT> input) {
        return ReduceByKey.named(this.getName().orElse(null)).of((PCollection)input).keyBy(this.getKeyExtractor(), (TypeDescriptor)this.getKeyType().orElse(null)).valueBy(e -> e, this.getOutputType().orElse(null)).combineBy(values -> Distinct.nonEmpty(values.findAny()), (TypeDescriptor<Object>)((TypeDescriptor)this.getOutputType().orElse(null))).outputValues();
    }

    private @UnknownKeyFor @NonNull @Initialized PCollection<InputT> reduceTimestamped(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized Long, InputT>> input) {
        CombinableReduceFunction<KV<Long, InputT>> select = this.getReduceFn();
        PCollection outputValues = ReduceByKey.named(this.getName().orElse(null)).of((PCollection)input).keyBy(e -> this.getKeyExtractor().apply(e.getValue()), (TypeDescriptor)this.getKeyType().orElse(null)).valueBy(e -> e, Objects.requireNonNull(input.getTypeDescriptor())).combineBy(select, (TypeDescriptor<KV>)Objects.requireNonNull(input.getTypeDescriptor())).outputValues();
        return MapElements.named(this.getName().map(n -> n + "::unwrap").orElse(null)).of(outputValues).using(KV::getValue, (TypeDescriptor)this.getOutputType().orElse(null)).output();
    }

    private @UnknownKeyFor @NonNull @Initialized CombinableReduceFunction<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized Long, InputT>> getReduceFn() {
        return this.policy == SelectionPolicy.NEWEST ? values -> (KV)Distinct.nonEmpty(values.collect(Collectors.maxBy((a, b) -> Long.compare((Long)a.getKey(), (Long)b.getKey())))) : values -> (KV)Distinct.nonEmpty(values.collect(Collectors.minBy((a, b) -> Long.compare((Long)a.getKey(), (Long)b.getKey()))));
    }

    private static <T> T nonEmpty(@UnknownKeyFor @NonNull @Initialized Optional<T> in) {
        return in.orElseThrow(() -> new IllegalStateException("Empty reduce values?"));
    }

    private static class Builder<@UnknownKeyFor InputT, @UnknownKeyFor KeyT>
    implements OfBuilder,
    ProjectedBuilder<InputT, KeyT>,
    WindowByBuilder<InputT, KeyT>,
    TriggerByBuilder<InputT>,
    AccumulationModeBuilder<InputT>,
    WindowedOutputBuilder<InputT>,
    Builders.Output<InputT> {
        private final @UnknownKeyFor @NonNull @Initialized WindowBuilder<InputT> windowBuilder = new WindowBuilder();
        private final @Nullable @UnknownKeyFor @Initialized String name;
        private @UnknownKeyFor @NonNull @Initialized PCollection<InputT> input;
        private @UnknownKeyFor @NonNull @Initialized UnaryFunction<InputT, KeyT> transform = e -> e;
        private @UnknownKeyFor @NonNull @Initialized SelectionPolicy policy = null;
        private @Nullable @UnknownKeyFor @Initialized TypeDescriptor<KeyT> projectedType;
        @SuppressFBWarnings(value={"UWF_NULL_FIELD"})
        private @Nullable @UnknownKeyFor @Initialized TypeDescriptor<InputT> outputType = null;
        private @UnknownKeyFor @NonNull @Initialized boolean projected = false;

        Builder(@Nullable @UnknownKeyFor @Initialized String name) {
            this.name = name;
        }

        public <T> @UnknownKeyFor @NonNull @Initialized ProjectedBuilder<T, T> of(@UnknownKeyFor @NonNull @Initialized PCollection<T> input) {
            Builder cast = this;
            cast.input = Objects.requireNonNull(input);
            return cast;
        }

        @Override
        public <K> @UnknownKeyFor @NonNull @Initialized WindowByBuilder<InputT, K> projected(@UnknownKeyFor @NonNull @Initialized UnaryFunction<InputT, K> transform, @UnknownKeyFor @NonNull @Initialized SelectionPolicy policy, @Nullable @UnknownKeyFor @Initialized TypeDescriptor<K> projectedType) {
            Builder cast = this;
            cast.transform = Objects.requireNonNull(transform);
            cast.policy = Objects.requireNonNull(policy);
            cast.projectedType = projectedType;
            cast.projected = true;
            return cast;
        }

        @Override
        public <W extends BoundedWindow> @UnknownKeyFor @NonNull @Initialized TriggerByBuilder<InputT> windowBy(@UnknownKeyFor @NonNull @Initialized WindowFn<@UnknownKeyFor @NonNull @Initialized Object, W> windowFn) {
            this.windowBuilder.windowBy((WindowFn)windowFn);
            return this;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized AccumulationModeBuilder<InputT> triggeredBy(@UnknownKeyFor @NonNull @Initialized Trigger trigger) {
            this.windowBuilder.triggeredBy(trigger);
            return this;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized WindowedOutputBuilder<InputT> accumulationMode(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized WindowingStrategy.AccumulationMode accumulationMode) {
            this.windowBuilder.accumulationMode(accumulationMode);
            return this;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized WindowedOutputBuilder<InputT> withAllowedLateness(@UnknownKeyFor @NonNull @Initialized Duration allowedLateness) {
            this.windowBuilder.withAllowedLateness(allowedLateness);
            return this;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized WindowedOutputBuilder<InputT> withAllowedLateness(@UnknownKeyFor @NonNull @Initialized Duration allowedLateness, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized Window.ClosingBehavior closingBehavior) {
            this.windowBuilder.withAllowedLateness(allowedLateness, closingBehavior);
            return this;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized WindowedOutputBuilder<InputT> withTimestampCombiner(@UnknownKeyFor @NonNull @Initialized TimestampCombiner timestampCombiner) {
            this.windowBuilder.withTimestampCombiner(timestampCombiner);
            return this;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized WindowedOutputBuilder<InputT> withOnTimeBehavior(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized Window.OnTimeBehavior behavior) {
            this.windowBuilder.withOnTimeBehavior(behavior);
            return this;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized PCollection<InputT> output() {
            if (this.transform == null) {
                this.transform = UnaryFunction.identity();
            }
            Distinct distinct = new Distinct(this.name, this.transform, this.outputType, this.projectedType, this.windowBuilder.getWindow().orElse(null), this.policy, this.projected);
            return OperatorTransform.apply(distinct, PCollectionList.of(this.input));
        }
    }

    public static interface WindowedOutputBuilder<@UnknownKeyFor T>
    extends Builders.WindowedOutput<WindowedOutputBuilder<T>>,
    Builders.Output<T> {
    }

    public static interface AccumulationModeBuilder<@UnknownKeyFor T>
    extends Builders.AccumulationMode<WindowedOutputBuilder<T>> {
        @Override
        public @UnknownKeyFor @NonNull @Initialized WindowedOutputBuilder<T> accumulationMode(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized WindowingStrategy.AccumulationMode var1);
    }

    public static interface TriggerByBuilder<@UnknownKeyFor T>
    extends Builders.TriggeredBy<AccumulationModeBuilder<T>> {
        @Override
        public @UnknownKeyFor @NonNull @Initialized AccumulationModeBuilder<T> triggeredBy(@UnknownKeyFor @NonNull @Initialized Trigger var1);
    }

    public static interface WindowByBuilder<@UnknownKeyFor InputT, @UnknownKeyFor KeyT>
    extends Builders.WindowBy<TriggerByBuilder<InputT>>,
    OptionalMethodBuilder<WindowByBuilder<InputT, KeyT>, Builders.Output<InputT>>,
    Builders.Output<InputT> {
        @Override
        public <W extends BoundedWindow> @UnknownKeyFor @NonNull @Initialized TriggerByBuilder<InputT> windowBy(@UnknownKeyFor @NonNull @Initialized WindowFn<@UnknownKeyFor @NonNull @Initialized Object, W> var1);

        @Override
        default public  @UnknownKeyFor @NonNull @Initialized Builders.Output<InputT> applyIf(@UnknownKeyFor @NonNull @Initialized boolean cond, @UnknownKeyFor @NonNull @Initialized UnaryFunction<@UnknownKeyFor @NonNull @Initialized WindowByBuilder<InputT, KeyT>,  @UnknownKeyFor @NonNull @Initialized Builders.Output<InputT>> fn) {
            return cond ? Objects.requireNonNull(fn).apply(this) : this;
        }
    }

    public static interface ProjectedBuilder<@UnknownKeyFor InputT, @UnknownKeyFor KeyT>
    extends WindowByBuilder<InputT, KeyT> {
        default public <KeyT> @UnknownKeyFor @NonNull @Initialized WindowByBuilder<InputT, KeyT> projected(@UnknownKeyFor @NonNull @Initialized UnaryFunction<InputT, KeyT> transform) {
            return this.projected(transform, SelectionPolicy.ANY, null);
        }

        default public <KeyT> @UnknownKeyFor @NonNull @Initialized WindowByBuilder<InputT, KeyT> projected(@UnknownKeyFor @NonNull @Initialized UnaryFunction<InputT, KeyT> transform, @UnknownKeyFor @NonNull @Initialized TypeDescriptor<KeyT> projectedType) {
            return this.projected(transform, SelectionPolicy.ANY, Objects.requireNonNull(projectedType));
        }

        default public <KeyT> @UnknownKeyFor @NonNull @Initialized WindowByBuilder<InputT, KeyT> projected(@UnknownKeyFor @NonNull @Initialized UnaryFunction<InputT, KeyT> transform, @UnknownKeyFor @NonNull @Initialized SelectionPolicy policy) {
            return this.projected(transform, policy, null);
        }

        public <KeyT> @UnknownKeyFor @NonNull @Initialized WindowByBuilder<InputT, KeyT> projected(@UnknownKeyFor @NonNull @Initialized UnaryFunction<InputT, KeyT> var1, @UnknownKeyFor @NonNull @Initialized SelectionPolicy var2, @Nullable @UnknownKeyFor @Initialized TypeDescriptor<KeyT> var3);
    }

    public static enum SelectionPolicy {
        ANY,
        OLDEST,
        NEWEST;

    }

    public static interface OfBuilder
    extends Builders.Of {
        @Override
        public <InputT> @UnknownKeyFor @NonNull @Initialized ProjectedBuilder<InputT, InputT> of(@UnknownKeyFor @NonNull @Initialized PCollection<InputT> var1);
    }
}

