/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.HashMultimap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class DistinctTest {
    @Rule
    public final TestPipeline p = TestPipeline.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    @Rule
    public TestPipeline windowedDistinctPipeline = TestPipeline.create();
    @Rule
    public TestPipeline triggeredDistinctPipeline = TestPipeline.create();
    @Rule
    public TestPipeline triggeredDistinctRepresentativePipeline = TestPipeline.create();

    @Test
    @Category(value={NeedsRunner.class})
    public void testDistinct() {
        List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3");
        PCollection input = (PCollection)this.p.apply((PTransform)Create.of(strings).withCoder((Coder)StringUtf8Coder.of()));
        PCollection output = (PCollection)input.apply((PTransform)Distinct.create());
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{"k1", "k5", "k2", "k3"});
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDistinctEmpty() {
        List<String> strings = Arrays.asList(new String[0]);
        PCollection input = (PCollection)this.p.apply((PTransform)Create.of(strings).withCoder((Coder)StringUtf8Coder.of()));
        PCollection output = (PCollection)input.apply((PTransform)Distinct.create());
        PAssert.that((PCollection)output).empty();
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testDistinctWithRepresentativeValue() {
        List<KV> strings = Arrays.asList(KV.of((Object)"k1", (Object)"v1"), KV.of((Object)"k1", (Object)"v2"), KV.of((Object)"k2", (Object)"v1"));
        PCollection input = (PCollection)this.p.apply((PTransform)Create.of(strings));
        PCollection output = (PCollection)input.apply((PTransform)Distinct.withRepresentativeValueFn(new Keys()).withRepresentativeType(TypeDescriptor.of(String.class)));
        PAssert.that((PCollection)output).satisfies((SerializableFunction)new Checker());
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testWindowedDistinct() {
        Instant base = new Instant(0L);
        TestStream values = TestStream.create((Coder)StringUtf8Coder.of()).advanceWatermarkTo(base).addElements(TimestampedValue.of((Object)"k1", (Instant)base), new TimestampedValue[]{TimestampedValue.of((Object)"k2", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)10L))), TimestampedValue.of((Object)"k3", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)20L))), TimestampedValue.of((Object)"k1", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)30L))), TimestampedValue.of((Object)"k2", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)40L))), TimestampedValue.of((Object)"k3", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)50L))), TimestampedValue.of((Object)"k4", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)60L))), TimestampedValue.of((Object)"k5", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)70L))), TimestampedValue.of((Object)"k6", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)80L)))}).advanceWatermarkToInfinity();
        PCollection distinctValues = (PCollection)((PCollection)((PCollection)this.windowedDistinctPipeline.apply((PTransform)values)).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)30L))))).apply((PTransform)Distinct.create());
        PAssert.that((PCollection)distinctValues).inWindow((BoundedWindow)new IntervalWindow(base, base.plus((ReadableDuration)Duration.standardSeconds((long)30L)))).containsInAnyOrder((Object[])new String[]{"k1", "k2", "k3"});
        PAssert.that((PCollection)distinctValues).inWindow((BoundedWindow)new IntervalWindow(base.plus((ReadableDuration)Duration.standardSeconds((long)30L)), base.plus((ReadableDuration)Duration.standardSeconds((long)60L)))).containsInAnyOrder((Object[])new String[]{"k1", "k2", "k3"});
        PAssert.that((PCollection)distinctValues).inWindow((BoundedWindow)new IntervalWindow(base.plus((ReadableDuration)Duration.standardSeconds((long)60L)), base.plus((ReadableDuration)Duration.standardSeconds((long)90L)))).containsInAnyOrder((Object[])new String[]{"k4", "k5", "k6"});
        this.windowedDistinctPipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testTriggeredDistinct() {
        Instant base = new Instant(0L);
        TestStream values = TestStream.create((Coder)StringUtf8Coder.of()).advanceWatermarkTo(base).addElements(TimestampedValue.of((Object)"k1", (Instant)base), new TimestampedValue[]{TimestampedValue.of((Object)"k2", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)10L))), TimestampedValue.of((Object)"k3", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)20L)))}).advanceProcessingTime(Duration.standardMinutes((long)1L)).addElements(TimestampedValue.of((Object)"k1", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)30L))), new TimestampedValue[]{TimestampedValue.of((Object)"k2", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)40L))), TimestampedValue.of((Object)"k3", (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)50L)))}).advanceWatermarkToInfinity();
        PCollection distinctValues = (PCollection)((PCollection)((PCollection)this.triggeredDistinctPipeline.apply((PTransform)values)).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))).triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds((long)30L)))).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes())).apply((PTransform)Distinct.create());
        PAssert.that((PCollection)distinctValues).containsInAnyOrder((Object[])new String[]{"k1", "k2", "k3"});
        this.triggeredDistinctPipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testTriggeredDistinctRepresentativeValues() {
        Instant base = new Instant(0L);
        TestStream values = TestStream.create((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)StringUtf8Coder.of())).advanceWatermarkTo(base).addElements(TimestampedValue.of((Object)KV.of((Object)1, (Object)"k1"), (Instant)base), new TimestampedValue[]{TimestampedValue.of((Object)KV.of((Object)2, (Object)"k2"), (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)10L))), TimestampedValue.of((Object)KV.of((Object)3, (Object)"k3"), (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)20L)))}).advanceProcessingTime(Duration.standardMinutes((long)1L)).addElements(TimestampedValue.of((Object)KV.of((Object)1, (Object)"k1"), (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)30L))), new TimestampedValue[]{TimestampedValue.of((Object)KV.of((Object)2, (Object)"k2"), (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)40L))), TimestampedValue.of((Object)KV.of((Object)3, (Object)"k3"), (Instant)base.plus((ReadableDuration)Duration.standardSeconds((long)50L)))}).advanceWatermarkToInfinity();
        PCollection distinctValues = (PCollection)((PCollection)((PCollection)this.triggeredDistinctRepresentativePipeline.apply((PTransform)values)).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))).triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds((long)30L)))).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes())).apply((PTransform)Distinct.withRepresentativeValueFn(new Keys()).withRepresentativeType(TypeDescriptor.of(Integer.class)));
        PAssert.that((PCollection)distinctValues).containsInAnyOrder((Object[])new KV[]{KV.of((Object)1, (Object)"k1"), KV.of((Object)2, (Object)"k2"), KV.of((Object)3, (Object)"k3")});
        this.triggeredDistinctRepresentativePipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testTriggeredDistinctRepresentativeValuesEmpty() {
        Instant base = new Instant(0L);
        TestStream values = TestStream.create((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)StringUtf8Coder.of())).advanceWatermarkTo(base).addElements(TimestampedValue.of((Object)KV.of((Object)1, (Object)"k1"), (Instant)base), new TimestampedValue[0]).advanceProcessingTime(Duration.standardMinutes((long)1L)).advanceWatermarkToInfinity();
        PCollection distinctValues = (PCollection)((PCollection)((PCollection)this.triggeredDistinctRepresentativePipeline.apply((PTransform)values)).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))).triggering((Trigger)AfterWatermark.pastEndOfWindow().withEarlyFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds((long)30L)))).withAllowedLateness(Duration.ZERO).discardingFiredPanes())).apply((PTransform)Distinct.withRepresentativeValueFn(new Keys()).withRepresentativeType(TypeDescriptor.of(Integer.class)));
        PAssert.that((PCollection)distinctValues).containsInAnyOrder((Object[])new KV[]{KV.of((Object)1, (Object)"k1")});
        this.triggeredDistinctRepresentativePipeline.run();
    }

    @Test
    public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
        HashMultimap<Integer, String> predupedContents = HashMultimap.create();
        predupedContents.put(3, "foo");
        predupedContents.put(4, "foos");
        predupedContents.put(6, "barbaz");
        predupedContents.put(6, "bazbar");
        PCollection dupes = (PCollection)this.p.apply((PTransform)Create.of((Object)"foo", (Object[])new String[]{"foos", "barbaz", "barbaz", "bazbar", "foo"}));
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes");
        dupes.apply("RemoveRepresentativeDupes", (PTransform)Distinct.withRepresentativeValueFn(String::length));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() {
        PCollection dupes = (PCollection)this.p.apply((PTransform)Create.of((Object)"foo", (Object[])new String[]{"foos", "barbaz", "barbaz", "bazbar", "foo"}));
        PCollection deduped = (PCollection)dupes.apply((PTransform)Distinct.withRepresentativeValueFn(String::length).withRepresentativeType(TypeDescriptor.of(Integer.class)));
        PAssert.that((PCollection)deduped).satisfies((SerializableFunction & Serializable)strs -> {
            HashMultimap<Integer, String> predupedContents = HashMultimap.create();
            predupedContents.put(3, "foo");
            predupedContents.put(4, "foos");
            predupedContents.put(6, "barbaz");
            predupedContents.put(6, "bazbar");
            HashSet<Integer> seenLengths = new HashSet<Integer>();
            for (String s : strs) {
                Assert.assertThat(predupedContents.values(), (Matcher)Matchers.hasItem((Object)s));
                Assert.assertThat(seenLengths, (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new Integer[]{s.length()})));
                seenLengths.add(s.length());
            }
            return null;
        });
        this.p.run();
    }

    private static class Checker
    implements SerializableFunction<Iterable<KV<String, String>>, Void> {
        private Checker() {
        }

        public Void apply(Iterable<KV<String, String>> input) {
            HashMap<String, String> values = new HashMap<String, String>();
            for (KV<String, String> kv : input) {
                values.put((String)kv.getKey(), (String)kv.getValue());
            }
            Assert.assertEquals((long)2L, (long)values.size());
            Assert.assertTrue(("v1".equals(values.get("k1")) || "v2".equals(values.get("k1")) ? 1 : 0) != 0);
            Assert.assertEquals((Object)"v1", values.get("k2"));
            return null;
        }
    }

    private static class Keys<T>
    implements SerializableFunction<KV<T, String>, T> {
        private Keys() {
        }

        public T apply(KV<T, String> input) {
            return (T)input.getKey();
        }
    }
}

