/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sketching;

import com.clearspring.analytics.stream.frequency.CountMinSketch;
import com.clearspring.analytics.stream.frequency.FrequencyMergeException;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sketching.AutoValue_SketchFrequencies_GlobalSketch;
import org.apache.beam.sdk.extensions.sketching.AutoValue_SketchFrequencies_PerKeySketch;
import org.apache.beam.sdk.extensions.sketching.AutoValue_SketchFrequencies_Sketch;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;

@Experimental
public final class SketchFrequencies {
    public static <InputT> GlobalSketch<InputT> globally() {
        return GlobalSketch.builder().build();
    }

    public static <K, V> PerKeySketch<K, V> perKey() {
        return PerKeySketch.builder().build();
    }

    static class CountMinSketchCoder<T>
    extends CustomCoder<Sketch<T>> {
        private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();

        CountMinSketchCoder() {
        }

        public void encode(Sketch<T> value, OutputStream outStream) throws IOException {
            if (value == null) {
                throw new CoderException("cannot encode a null Count-min Sketch");
            }
            BYTE_ARRAY_CODER.encode(CountMinSketch.serialize((CountMinSketch)value.sketch()), outStream);
        }

        public Sketch<T> decode(InputStream inStream) throws IOException {
            byte[] sketchBytes = BYTE_ARRAY_CODER.decode(inStream);
            CountMinSketch sketch = CountMinSketch.deserialize((byte[])sketchBytes);
            return Sketch.create(sketch);
        }

        public boolean isRegisterByteSizeObserverCheap(Sketch<T> value) {
            return true;
        }

        protected long getEncodedElementByteSize(Sketch<T> value) throws IOException {
            if (value == null) {
                throw new CoderException("cannot encode a null Count-min Sketch");
            }
            return 24L + 8L * (long)value.depth() * (long)(value.width() + 1);
        }
    }

    @AutoValue
    public static abstract class Sketch<T>
    implements Serializable {
        static final int SEED = 123456;

        static <T> Sketch<T> create(double eps, double conf) {
            int width = (int)Math.ceil(2.0 / eps);
            int depth = (int)Math.ceil(-Math.log(1.0 - conf) / Math.log(2.0));
            return new AutoValue_SketchFrequencies_Sketch(depth, width, new CountMinSketch(depth, width, 123456));
        }

        static <T> Sketch<T> create(CountMinSketch sketch) {
            int width = (int)Math.ceil(2.0 / sketch.getRelativeError());
            int depth = (int)Math.ceil(-Math.log(1.0 - sketch.getConfidence()) / Math.log(2.0));
            return new AutoValue_SketchFrequencies_Sketch(depth, width, sketch);
        }

        abstract int depth();

        abstract int width();

        abstract CountMinSketch sketch();

        public void add(T element, long count, Coder<T> coder) {
            this.sketch().add(this.hashElement(element, coder), count);
        }

        public void add(T element, Coder<T> coder) {
            this.add(element, 1L, coder);
        }

        private long hashElement(T element, Coder<T> coder) {
            try {
                byte[] elemBytes = CoderUtils.encodeToByteArray(coder, element);
                return Hashing.murmur3_128().hashBytes(elemBytes).asLong();
            }
            catch (CoderException e) {
                throw new IllegalStateException("The input value cannot be encoded: " + e.getMessage(), e);
            }
        }

        public long estimateCount(T element, Coder<T> coder) {
            return this.sketch().estimateCount(this.hashElement(element, coder));
        }
    }

    public static class CountMinSketchFn<InputT>
    extends Combine.CombineFn<InputT, Sketch<InputT>, Sketch<InputT>> {
        private final Coder<InputT> inputCoder;
        private final int depth;
        private final int width;
        private final double epsilon;
        private final double confidence;

        private CountMinSketchFn(Coder<InputT> coder, double eps, double confidence) {
            this.epsilon = eps;
            this.confidence = confidence;
            this.width = (int)Math.ceil(2.0 / eps);
            this.depth = (int)Math.ceil(-Math.log(1.0 - confidence) / Math.log(2.0));
            this.inputCoder = coder;
        }

        public static <InputT> CountMinSketchFn<InputT> create(Coder<InputT> coder) {
            try {
                coder.verifyDeterministic();
            }
            catch (Coder.NonDeterministicException e) {
                throw new IllegalArgumentException("Coder must be deterministic to perform this sketch." + e.getMessage(), e);
            }
            return new CountMinSketchFn<InputT>(coder, 0.01, 0.999);
        }

        public CountMinSketchFn<InputT> withAccuracy(double epsilon, double confidence) {
            if (epsilon <= 0.0) {
                throw new IllegalArgumentException("The relative error must be positive");
            }
            if (confidence <= 0.0 || confidence >= 1.0) {
                throw new IllegalArgumentException("The confidence must be between 0 and 1");
            }
            return new CountMinSketchFn<InputT>(this.inputCoder, epsilon, confidence);
        }

        public Sketch<InputT> createAccumulator() {
            return Sketch.create(this.epsilon, this.confidence);
        }

        public Sketch<InputT> addInput(Sketch<InputT> accumulator, InputT element) {
            accumulator.add(element, this.inputCoder);
            return accumulator;
        }

        public Sketch<InputT> mergeAccumulators(Iterable<Sketch<InputT>> accumulators) {
            Iterator<Sketch<InputT>> it = accumulators.iterator();
            Sketch<InputT> first = it.next();
            CountMinSketch mergedSketches = first.sketch();
            try {
                while (it.hasNext()) {
                    mergedSketches = CountMinSketch.merge((CountMinSketch[])new CountMinSketch[]{mergedSketches, it.next().sketch()});
                }
            }
            catch (FrequencyMergeException e) {
                throw new IllegalStateException("The accumulators cannot be merged:" + e.getMessage());
            }
            return Sketch.create(mergedSketches);
        }

        public Sketch<InputT> extractOutput(Sketch<InputT> accumulator) {
            return accumulator;
        }

        public Coder<Sketch<InputT>> getAccumulatorCoder(CoderRegistry registry, Coder inputCoder) {
            return new CountMinSketchCoder();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item((String)"width", (Integer)this.width).withLabel("width of the Count-Min sketch array")).add(DisplayData.item((String)"depth", (Integer)this.depth).withLabel("depth of the Count-Min sketch array")).add(DisplayData.item((String)"eps", (Double)this.epsilon).withLabel("relative error to the total number of elements")).add(DisplayData.item((String)"conf", (Double)this.confidence).withLabel("confidence in the relative error"));
        }
    }

    @AutoValue
    public static abstract class PerKeySketch<K, V>
    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Sketch<V>>>> {
        abstract double relativeError();

        abstract double confidence();

        abstract Builder<K, V> toBuilder();

        static <K, V> Builder<K, V> builder() {
            return new AutoValue_SketchFrequencies_PerKeySketch.Builder().setRelativeError(0.01).setConfidence(0.999);
        }

        public PerKeySketch<K, V> withRelativeError(double eps) {
            return this.toBuilder().setRelativeError(eps).build();
        }

        public PerKeySketch<K, V> withConfidence(double conf) {
            return this.toBuilder().setConfidence(conf).build();
        }

        public PCollection<KV<K, Sketch<V>>> expand(PCollection<KV<K, V>> input) {
            KvCoder inputCoder = (KvCoder)input.getCoder();
            return (PCollection)input.apply("Compute Count-Min Sketch perKey", (PTransform)Combine.perKey(CountMinSketchFn.create(inputCoder.getValueCoder()).withAccuracy(this.relativeError(), this.confidence())));
        }

        @AutoValue.Builder
        static abstract class Builder<K, V> {
            Builder() {
            }

            abstract Builder<K, V> setRelativeError(double var1);

            abstract Builder<K, V> setConfidence(double var1);

            abstract PerKeySketch<K, V> build();
        }
    }

    @AutoValue
    public static abstract class GlobalSketch<InputT>
    extends PTransform<PCollection<InputT>, PCollection<Sketch<InputT>>> {
        abstract double relativeError();

        abstract double confidence();

        abstract Builder<InputT> toBuilder();

        static <InputT> Builder<InputT> builder() {
            return new AutoValue_SketchFrequencies_GlobalSketch.Builder().setRelativeError(0.01).setConfidence(0.999);
        }

        public GlobalSketch<InputT> withRelativeError(double eps) {
            return this.toBuilder().setRelativeError(eps).build();
        }

        public GlobalSketch<InputT> withConfidence(double conf) {
            return this.toBuilder().setConfidence(conf).build();
        }

        public PCollection<Sketch<InputT>> expand(PCollection<InputT> input) {
            return (PCollection)input.apply("Compute Count-Min Sketch", (PTransform)Combine.globally(CountMinSketchFn.create(input.getCoder()).withAccuracy(this.relativeError(), this.confidence())));
        }

        @AutoValue.Builder
        static abstract class Builder<InputT> {
            Builder() {
            }

            abstract Builder<InputT> setRelativeError(double var1);

            abstract Builder<InputT> setConfidence(double var1);

            abstract GlobalSketch<InputT> build();
        }
    }
}

