/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sketching;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.sketching.SketchFrequencies;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class SketchFrequenciesTest
implements Serializable {
    @Rule
    public final transient TestPipeline tp = TestPipeline.create();
    private final List<Long> smallStream = Arrays.asList(1L, 2L, 2L, 3L, 3L, 3L, 4L, 4L, 4L, 4L, 5L, 5L, 5L, 5L, 5L, 6L, 6L, 6L, 6L, 6L, 6L, 7L, 7L, 7L, 7L, 7L, 7L, 7L, 8L, 8L, 8L, 8L, 8L, 8L, 8L, 8L, 9L, 9L, 9L, 9L, 9L, 9L, 9L, 9L, 9L);
    private final Long[] distinctElems = new Long[]{1L, 2L, 3L, 4L, 5L, 6L, 8L, 9L};
    private final Long[] frequencies = (Long[])this.distinctElems.clone();

    @Test
    public void perKeyDefault() {
        PCollection stream = (PCollection)this.tp.apply((PTransform)Create.of(this.smallStream));
        PCollection sketch = (PCollection)((PCollection)((PCollection)stream.apply((PTransform)WithKeys.of((Object)1))).apply((PTransform)SketchFrequencies.perKey())).apply((PTransform)Values.create());
        Coder coder = stream.getCoder();
        PAssert.thatSingleton((String)"Verify number of hits", (PCollection)sketch).satisfies(new VerifyStreamFrequencies<Long>(coder, this.distinctElems, this.frequencies));
        this.tp.run();
    }

    @Test
    public void globallyWithTunedParameters() {
        double eps = 0.01;
        double conf = 0.8;
        PCollection stream = (PCollection)this.tp.apply((PTransform)Create.of(this.smallStream));
        PCollection sketch = (PCollection)stream.apply((PTransform)SketchFrequencies.globally().withRelativeError(eps).withConfidence(conf));
        Coder coder = stream.getCoder();
        PAssert.thatSingleton((String)"Verify number of hits", (PCollection)sketch).satisfies(new VerifyStreamFrequencies<Long>(coder, this.distinctElems, this.frequencies));
        this.tp.run();
    }

    @Test
    public void merge() {
        double eps = 0.01;
        double conf = 0.8;
        long nOccurrences = 2L;
        int size = 3;
        ArrayList<SketchFrequencies.Sketch> sketches = new ArrayList<SketchFrequencies.Sketch>();
        VarIntCoder coder = VarIntCoder.of();
        int i = 0;
        while ((long)i < nOccurrences) {
            SketchFrequencies.Sketch sketch = SketchFrequencies.Sketch.create((double)eps, (double)conf);
            for (int j = 0; j < size; ++j) {
                sketch.add((Object)j, (Coder)coder);
            }
            sketches.add(sketch);
            ++i;
        }
        SketchFrequencies.CountMinSketchFn fn = SketchFrequencies.CountMinSketchFn.create((Coder)coder).withAccuracy(eps, conf);
        SketchFrequencies.Sketch merged = fn.mergeAccumulators(sketches);
        for (int i2 = 0; i2 < size; ++i2) {
            Assert.assertEquals((long)nOccurrences, (long)merged.estimateCount((Object)i2, (Coder)coder));
        }
    }

    @Test
    public void customObject() {
        int nUsers = 10;
        long occurrences = 2L;
        double eps = 0.01;
        double conf = 0.8;
        SketchFrequencies.Sketch sketch = SketchFrequencies.Sketch.create((double)eps, (double)conf);
        Schema schema = (Schema)SchemaBuilder.record((String)"User").fields().requiredString("Pseudo").requiredInt("Age").endRecord();
        AvroCoder coder = AvroCoder.of((Schema)schema);
        for (int i = 1; i <= nUsers; ++i) {
            GenericData.Record newRecord = new GenericData.Record(schema);
            newRecord.put("Pseudo", (Object)("User" + i));
            newRecord.put("Age", (Object)i);
            sketch.add((Object)newRecord, occurrences, (Coder)coder);
            Assert.assertEquals((String)"Test API", (long)occurrences, (long)sketch.estimateCount((Object)newRecord, (Coder)coder));
        }
    }

    @Test
    public void testCoder() throws Exception {
        SketchFrequencies.Sketch cMSketch = SketchFrequencies.Sketch.create((double)0.01, (double)0.8);
        VarIntCoder coder = VarIntCoder.of();
        for (int i = 0; i < 3; ++i) {
            cMSketch.add((Object)i, (Coder)coder);
        }
        CoderProperties.coderDecodeEncodeEqual((Coder)new SketchFrequencies.CountMinSketchCoder(), (Object)cMSketch);
    }

    @Test
    public void testDisplayData() {
        double eps = 0.01;
        double conf = 0.8;
        int width = (int)Math.ceil(2.0 / eps);
        int depth = (int)Math.ceil(-Math.log(1.0 - conf) / Math.log(2.0));
        SketchFrequencies.CountMinSketchFn fn = SketchFrequencies.CountMinSketchFn.create((Coder)VarIntCoder.of()).withAccuracy(eps, conf);
        MatcherAssert.assertThat((Object)DisplayData.from((HasDisplayData)fn), (Matcher)DisplayDataMatchers.hasDisplayItem((String)"width", (long)width));
        MatcherAssert.assertThat((Object)DisplayData.from((HasDisplayData)fn), (Matcher)DisplayDataMatchers.hasDisplayItem((String)"depth", (long)depth));
        MatcherAssert.assertThat((Object)DisplayData.from((HasDisplayData)fn), (Matcher)DisplayDataMatchers.hasDisplayItem((String)"eps", (double)eps));
        MatcherAssert.assertThat((Object)DisplayData.from((HasDisplayData)fn), (Matcher)DisplayDataMatchers.hasDisplayItem((String)"conf", (double)conf));
    }

    static class VerifyStreamFrequencies<T>
    implements SerializableFunction<SketchFrequencies.Sketch<T>, Void> {
        final Coder<T> coder;
        final Long[] expectedHits;
        final T[] elements;

        VerifyStreamFrequencies(Coder<T> coder, T[] elements, Long[] expectedHits) {
            this.coder = coder;
            this.elements = elements;
            this.expectedHits = expectedHits;
        }

        public Void apply(SketchFrequencies.Sketch<T> sketch) {
            for (int i = 0; i < this.elements.length; ++i) {
                Assert.assertEquals((long)this.expectedHits[i], (long)sketch.estimateCount(this.elements[i], this.coder));
            }
            return null;
        }
    }
}

