/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.LargeKeys;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

public class GroupByKeyTest
implements Serializable {
    private static KV<String, Collection<Integer>> kv(String key, Integer ... values) {
        return KV.of((Object)key, ImmutableList.copyOf(values));
    }

    private static SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> containsKvs(KV<String, Collection<Integer>> ... kvs) {
        return new ContainsKVs(ImmutableList.copyOf(kvs));
    }

    private static String bigString(char c, int size) {
        char[] buf = new char[size];
        for (int i = 0; i < size; ++i) {
            buf[i] = c;
        }
        return new String(buf);
    }

    private static void runLargeKeysTest(TestPipeline p, final int keySize) throws Exception {
        PCollection result = (PCollection)((PCollection)((PCollection)((PCollection)p.apply((PTransform)Create.of((Object)"a", (Object[])new String[]{"a", "b"}))).apply("Expand", (PTransform)ParDo.of((DoFn)new DoFn<String, KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
                c.output((Object)KV.of((Object)GroupByKeyTest.bigString(((String)c.element()).charAt(0), keySize), (Object)((String)c.element())));
            }
        }))).apply((PTransform)GroupByKey.create())).apply("Count", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, Iterable<String>>, KV<String, Integer>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
                int size = 0;
                for (String value : (Iterable)((KV)c.element()).getValue()) {
                    ++size;
                }
                c.output((Object)KV.of((Object)((String)((KV)c.element()).getKey()), (Object)size));
            }
        }));
        PAssert.that((PCollection)result).satisfies((SerializableFunction & Serializable)values -> {
            Assert.assertThat((Object)values, (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new KV[]{KV.of((Object)GroupByKeyTest.bigString('a', keySize), (Object)2), KV.of((Object)GroupByKeyTest.bigString('b', keySize), (Object)1)}));
            return null;
        });
        p.run();
    }

    static class AssertThatAllKeysExist
    implements SerializableFunction<Iterable<BadEqualityKey>, Void> {
        private final int numKeys;

        AssertThatAllKeysExist(int numKeys) {
            this.numKeys = numKeys;
        }

        private static <T> Iterable<Object> asStructural(Iterable<T> iterable, Coder<T> coder) {
            return StreamSupport.stream(iterable.spliterator(), false).map(input -> {
                try {
                    return coder.structuralValue(input);
                }
                catch (Exception e) {
                    Assert.fail((String)"Could not structural values.");
                    throw new RuntimeException();
                }
            }).collect(Collectors.toList());
        }

        public Void apply(Iterable<BadEqualityKey> input) {
            DeterministicKeyCoder keyCoder = DeterministicKeyCoder.of();
            ArrayList<BadEqualityKey> expectedList = new ArrayList<BadEqualityKey>();
            for (int key = 0; key < this.numKeys; ++key) {
                expectedList.add(new BadEqualityKey(key));
            }
            Iterable<Object> structuralInput = AssertThatAllKeysExist.asStructural(input, keyCoder);
            Iterable<Object> structuralExpected = AssertThatAllKeysExist.asStructural(expectedList, keyCoder);
            for (Object expected : structuralExpected) {
                Assert.assertThat(structuralInput, (Matcher)CoreMatchers.hasItem((Object)expected));
            }
            return null;
        }
    }

    static class AssertThatCountPerKeyCorrect
    implements SerializableFunction<Iterable<KV<BadEqualityKey, Long>>, Void> {
        private final int numValues;

        AssertThatCountPerKeyCorrect(int numValues) {
            this.numValues = numValues;
        }

        public Void apply(Iterable<KV<BadEqualityKey, Long>> input) {
            for (KV<BadEqualityKey, Long> val : input) {
                Assert.assertEquals((long)this.numValues, (long)((Long)val.getValue()));
            }
            return null;
        }
    }

    static class CountFn
    implements SerializableFunction<Iterable<Long>, Long> {
        CountFn() {
        }

        public Long apply(Iterable<Long> input) {
            long result = 0L;
            for (Long in : input) {
                result += in.longValue();
            }
            return result;
        }
    }

    static class AssignRandomKey
    extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
        AssignRandomKey() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            c.output((Object)KV.of((Object)ThreadLocalRandom.current().nextLong(), (Object)((KV)c.element())));
        }
    }

    static class DeterministicKeyCoder
    extends AtomicCoder<BadEqualityKey> {
        private static final DeterministicKeyCoder INSTANCE = new DeterministicKeyCoder();

        public static DeterministicKeyCoder of() {
            return INSTANCE;
        }

        private DeterministicKeyCoder() {
        }

        public void encode(BadEqualityKey value, OutputStream outStream) throws IOException {
            new DataOutputStream(outStream).writeLong(value.key);
        }

        public BadEqualityKey decode(InputStream inStream) throws IOException {
            return new BadEqualityKey(new DataInputStream(inStream).readLong());
        }

        public void verifyDeterministic() {
        }
    }

    static class BadEqualityKey {
        long key;

        public BadEqualityKey() {
        }

        public BadEqualityKey(long key) {
            this.key = key;
        }

        public boolean equals(Object o) {
            return false;
        }

        public int hashCode() {
            return ThreadLocalRandom.current().nextInt();
        }
    }

    private static class AssertTimestamp<K, V>
    extends DoFn<KV<K, V>, Void> {
        private final Instant timestamp;

        public AssertTimestamp(Instant timestamp) {
            this.timestamp = timestamp;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            Assert.assertThat((Object)c.timestamp(), (Matcher)CoreMatchers.equalTo((Object)this.timestamp));
        }
    }

    private static class ContainsKVs
    implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> {
        private final List<KV<String, Collection<Integer>>> expectedKvs;

        private ContainsKVs(List<KV<String, Collection<Integer>>> expectedKvs) {
            this.expectedKvs = expectedKvs;
        }

        public Void apply(Iterable<KV<String, Iterable<Integer>>> input) {
            ArrayList matchers = new ArrayList();
            for (KV<String, Collection<Integer>> expected : this.expectedKvs) {
                Object[] values = ((Collection)expected.getValue()).toArray(new Integer[0]);
                matchers.add(TestUtils.KvMatcher.isKv(CoreMatchers.equalTo((Object)((String)expected.getKey())), IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])values)));
            }
            Assert.assertThat(input, (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Matcher[])matchers.toArray(new Matcher[0])));
            return null;
        }
    }

    @RunWith(value=JUnit4.class)
    public static class WindowTests
    extends SharedTestBase {
        @Test
        @Category(value={ValidatesRunner.class})
        public void testGroupByKeyAndWindows() {
            List<KV> ungroupedPairs = Arrays.asList(KV.of((Object)"k1", (Object)3), KV.of((Object)"k5", (Object)Integer.MAX_VALUE), KV.of((Object)"k5", (Object)Integer.MIN_VALUE), KV.of((Object)"k2", (Object)66), KV.of((Object)"k1", (Object)4), KV.of((Object)"k2", (Object)-33), KV.of((Object)"k3", (Object)0));
            PCollection input = (PCollection)this.p.apply((PTransform)Create.timestamped(ungroupedPairs, Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L)).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianIntegerCoder.of())));
            PCollection output = (PCollection)((PCollection)input.apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)new Duration(5L))))).apply((PTransform)GroupByKey.create());
            PAssert.that((PCollection)output).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("k1", new Integer[]{3}), GroupByKeyTest.kv("k1", new Integer[]{4}), GroupByKeyTest.kv("k5", new Integer[]{Integer.MAX_VALUE, Integer.MIN_VALUE}), GroupByKeyTest.kv("k2", new Integer[]{66}), GroupByKeyTest.kv("k2", new Integer[]{-33}), GroupByKeyTest.kv("k3", new Integer[]{0})}));
            PAssert.that((PCollection)output).inWindow((BoundedWindow)new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.millis((long)5L))).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("k1", new Integer[]{3}), GroupByKeyTest.kv("k5", new Integer[]{Integer.MIN_VALUE, Integer.MAX_VALUE}), GroupByKeyTest.kv("k2", new Integer[]{66})}));
            PAssert.that((PCollection)output).inWindow((BoundedWindow)new IntervalWindow(new Instant(5L), (ReadableDuration)Duration.millis((long)5L))).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("k1", new Integer[]{4}), GroupByKeyTest.kv("k2", new Integer[]{-33}), GroupByKeyTest.kv("k3", new Integer[]{0})}));
            this.p.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testGroupByKeyMultipleWindows() {
            PCollection windowedInput = (PCollection)((PCollection)this.p.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)KV.of((Object)"foo", (Object)1), (Instant)new Instant(1L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)KV.of((Object)"foo", (Object)4), (Instant)new Instant(4L)), TimestampedValue.of((Object)KV.of((Object)"bar", (Object)3), (Instant)new Instant(3L))}))).apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.millis((long)5L)).every(Duration.millis((long)3L))));
            PCollection output = (PCollection)windowedInput.apply((PTransform)GroupByKey.create());
            PAssert.that((PCollection)output).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("foo", new Integer[]{1, 4}), GroupByKeyTest.kv("foo", new Integer[]{1}), GroupByKeyTest.kv("foo", new Integer[]{4}), GroupByKeyTest.kv("bar", new Integer[]{3}), GroupByKeyTest.kv("bar", new Integer[]{3})}));
            PAssert.that((PCollection)output).inWindow((BoundedWindow)new IntervalWindow(new Instant(-3L), (ReadableDuration)Duration.millis((long)5L))).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("foo", new Integer[]{1})}));
            PAssert.that((PCollection)output).inWindow((BoundedWindow)new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.millis((long)5L))).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("foo", new Integer[]{1, 4}), GroupByKeyTest.kv("bar", new Integer[]{3})}));
            PAssert.that((PCollection)output).inWindow((BoundedWindow)new IntervalWindow(new Instant(3L), (ReadableDuration)Duration.millis((long)5L))).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("foo", new Integer[]{4}), GroupByKeyTest.kv("bar", new Integer[]{3})}));
            this.p.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testGroupByKeyMergingWindows() {
            PCollection windowedInput = (PCollection)((PCollection)this.p.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)KV.of((Object)"foo", (Object)1), (Instant)new Instant(1L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)KV.of((Object)"foo", (Object)4), (Instant)new Instant(4L)), TimestampedValue.of((Object)KV.of((Object)"bar", (Object)3), (Instant)new Instant(3L)), TimestampedValue.of((Object)KV.of((Object)"foo", (Object)9), (Instant)new Instant(9L))}))).apply((PTransform)Window.into((WindowFn)Sessions.withGapDuration((Duration)Duration.millis((long)4L))));
            PCollection output = (PCollection)windowedInput.apply((PTransform)GroupByKey.create());
            PAssert.that((PCollection)output).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("foo", new Integer[]{1, 4}), GroupByKeyTest.kv("foo", new Integer[]{9}), GroupByKeyTest.kv("bar", new Integer[]{3})}));
            PAssert.that((PCollection)output).inWindow((BoundedWindow)new IntervalWindow(new Instant(1L), new Instant(8L))).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("foo", new Integer[]{1, 4})}));
            PAssert.that((PCollection)output).inWindow((BoundedWindow)new IntervalWindow(new Instant(3L), new Instant(7L))).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("bar", new Integer[]{3})}));
            PAssert.that((PCollection)output).inWindow((BoundedWindow)new IntervalWindow(new Instant(9L), new Instant(13L))).satisfies(GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("foo", new Integer[]{9})}));
            this.p.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testIdentityWindowFnPropagation() {
            List<KV> ungroupedPairs = Arrays.asList(new KV[0]);
            PCollection input = (PCollection)((PCollection)this.p.apply((PTransform)Create.of(ungroupedPairs).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianIntegerCoder.of())))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))));
            PCollection output = (PCollection)input.apply((PTransform)GroupByKey.create());
            this.p.run();
            Assert.assertTrue((boolean)output.getWindowingStrategy().getWindowFn().isCompatible((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))));
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testWindowFnInvalidation() {
            List<KV> ungroupedPairs = Arrays.asList(new KV[0]);
            PCollection input = (PCollection)((PCollection)this.p.apply((PTransform)Create.of(ungroupedPairs).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianIntegerCoder.of())))).apply((PTransform)Window.into((WindowFn)Sessions.withGapDuration((Duration)Duration.standardMinutes((long)1L))));
            PCollection output = (PCollection)input.apply((PTransform)GroupByKey.create());
            this.p.run();
            Assert.assertTrue((boolean)output.getWindowingStrategy().getWindowFn().isCompatible((WindowFn)new InvalidWindows("Invalid", (WindowFn)Sessions.withGapDuration((Duration)Duration.standardMinutes((long)1L)))));
        }

        @Test
        public void testInvalidWindowsDirect() {
            List<KV> ungroupedPairs = Arrays.asList(new KV[0]);
            PCollection input = (PCollection)((PCollection)this.p.apply((PTransform)Create.of(ungroupedPairs).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianIntegerCoder.of())))).apply((PTransform)Window.into((WindowFn)Sessions.withGapDuration((Duration)Duration.standardMinutes((long)1L))));
            this.thrown.expect(IllegalStateException.class);
            this.thrown.expectMessage("GroupByKey must have a valid Window merge function");
            ((PCollection)input.apply("GroupByKey", (PTransform)GroupByKey.create())).apply("GroupByKeyAgain", (PTransform)GroupByKey.create());
        }
    }

    @RunWith(value=JUnit4.class)
    public static class BasicTests
    extends SharedTestBase {
        @Test
        @Category(value={ValidatesRunner.class})
        public void testGroupByKey() {
            List<KV> ungroupedPairs = Arrays.asList(KV.of((Object)"k1", (Object)3), KV.of((Object)"k5", (Object)Integer.MAX_VALUE), KV.of((Object)"k5", (Object)Integer.MIN_VALUE), KV.of((Object)"k2", (Object)66), KV.of((Object)"k1", (Object)4), KV.of((Object)"k2", (Object)-33), KV.of((Object)"k3", (Object)0));
            PCollection input = (PCollection)this.p.apply((PTransform)Create.of(ungroupedPairs).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianIntegerCoder.of())));
            PCollection output = (PCollection)input.apply((PTransform)GroupByKey.create());
            SerializableFunction checker = GroupByKeyTest.containsKvs(new KV[]{GroupByKeyTest.kv("k1", new Integer[]{3, 4}), GroupByKeyTest.kv("k5", new Integer[]{Integer.MIN_VALUE, Integer.MAX_VALUE}), GroupByKeyTest.kv("k2", new Integer[]{66, -33}), GroupByKeyTest.kv("k3", new Integer[]{0})});
            PAssert.that((PCollection)output).satisfies(checker);
            PAssert.that((PCollection)output).inWindow((BoundedWindow)GlobalWindow.INSTANCE).satisfies(checker);
            this.p.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testGroupByKeyEmpty() {
            List<KV> ungroupedPairs = Arrays.asList(new KV[0]);
            PCollection input = (PCollection)this.p.apply((PTransform)Create.of(ungroupedPairs).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianIntegerCoder.of())));
            PCollection output = (PCollection)input.apply((PTransform)GroupByKey.create());
            PAssert.that((PCollection)output).empty();
            this.p.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTestStream.class})
        public void testCombiningAccumulatingProcessingTime() throws Exception {
            PCollection triggeredSums = (PCollection)((PCollection)((PCollection)this.p.apply((PTransform)TestStream.create((Coder)VarIntCoder.of()).advanceWatermarkTo(new Instant(0L)).addElements(TimestampedValue.of((Object)2, (Instant)new Instant(2L)), new TimestampedValue[]{TimestampedValue.of((Object)5, (Instant)new Instant(5L))}).advanceWatermarkTo(new Instant(100L)).advanceProcessingTime(Duration.millis((long)10L)).advanceWatermarkToInfinity())).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.millis((long)100L))).withTimestampCombiner(TimestampCombiner.EARLIEST).accumulatingFiredPanes().withAllowedLateness(Duration.ZERO).triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis((long)10L)))))).apply((PTransform)Sum.integersGlobally().withoutDefaults());
            PAssert.that((PCollection)triggeredSums).containsInAnyOrder((Object[])new Integer[]{7});
            this.p.run();
        }

        @Test
        public void testGroupByKeyNonDeterministic() throws Exception {
            List<KV> ungroupedPairs = Arrays.asList(new KV[0]);
            PCollection input = (PCollection)this.p.apply((PTransform)Create.of(ungroupedPairs).withCoder((Coder)KvCoder.of((Coder)MapCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of()), (Coder)BigEndianIntegerCoder.of())));
            this.thrown.expect(IllegalStateException.class);
            this.thrown.expectMessage("must be deterministic");
            input.apply((PTransform)GroupByKey.create());
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testRemerge() {
            List<KV> ungroupedPairs = Arrays.asList(new KV[0]);
            PCollection input = (PCollection)((PCollection)this.p.apply((PTransform)Create.of(ungroupedPairs).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianIntegerCoder.of())))).apply((PTransform)Window.into((WindowFn)Sessions.withGapDuration((Duration)Duration.standardMinutes((long)1L))));
            PCollection middle = (PCollection)((PCollection)((PCollection)((PCollection)input.apply("GroupByKey", (PTransform)GroupByKey.create())).apply("Remerge", (PTransform)Window.remerge())).apply("GroupByKeyAgain", (PTransform)GroupByKey.create())).apply("RemergeAgain", (PTransform)Window.remerge());
            this.p.run();
            Assert.assertTrue((boolean)middle.getWindowingStrategy().getWindowFn().isCompatible((WindowFn)Sessions.withGapDuration((Duration)Duration.standardMinutes((long)1L))));
        }

        @Test
        public void testGroupByKeyDirectUnbounded() {
            PCollection input = (PCollection)this.p.apply((PTransform)new PTransform<PBegin, PCollection<KV<String, Integer>>>(){

                public PCollection<KV<String, Integer>> expand(PBegin input) {
                    return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)PCollection.IsBounded.UNBOUNDED, (Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of()));
                }
            });
            this.thrown.expect(IllegalStateException.class);
            this.thrown.expectMessage("GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
            input.apply("GroupByKey", (PTransform)GroupByKey.create());
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testTimestampCombinerEarliest() {
            ((PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)KV.of((Object)0, (Object)"hello"), (Instant)new Instant(0L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)KV.of((Object)0, (Object)"goodbye"), (Instant)new Instant(10L))}))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)10L))).withTimestampCombiner(TimestampCombiner.EARLIEST))).apply((PTransform)GroupByKey.create())).apply((PTransform)ParDo.of(new AssertTimestamp(new Instant(0L))));
            this.p.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testTimestampCombinerLatest() {
            ((PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)KV.of((Object)0, (Object)"hello"), (Instant)new Instant(0L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)KV.of((Object)0, (Object)"goodbye"), (Instant)new Instant(10L))}))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)10L))).withTimestampCombiner(TimestampCombiner.LATEST))).apply((PTransform)GroupByKey.create())).apply((PTransform)ParDo.of(new AssertTimestamp(new Instant(10L))));
            this.p.run();
        }

        @Test
        public void testGroupByKeyGetName() {
            Assert.assertEquals((Object)"GroupByKey", (Object)GroupByKey.create().getName());
        }

        @Test
        public void testDisplayData() {
            GroupByKey groupByKey = GroupByKey.create();
            GroupByKey groupByFewKeys = GroupByKey.createWithFewKeys();
            DisplayData gbkDisplayData = DisplayData.from((HasDisplayData)groupByKey);
            DisplayData fewKeysDisplayData = DisplayData.from((HasDisplayData)groupByFewKeys);
            Assert.assertThat((Object)gbkDisplayData.items(), (Matcher)Matchers.empty());
            Assert.assertThat((Object)fewKeysDisplayData, DisplayDataMatchers.hasDisplayItem("fewKeys", true));
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testGroupByKeyWithBadEqualsHashCode() throws Exception {
            int numValues = 10;
            int numKeys = 5;
            this.p.getCoderRegistry().registerCoderProvider(CoderProviders.fromStaticMethods(BadEqualityKey.class, DeterministicKeyCoder.class));
            ArrayList<KV> input = new ArrayList<KV>();
            for (int i = 0; i < 10; ++i) {
                for (int key = 0; key < 5; ++key) {
                    input.add(KV.of((Object)new BadEqualityKey(key), (Object)1L));
                }
            }
            PCollection dataset1 = (PCollection)((PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.of(input))).apply((PTransform)ParDo.of((DoFn)new AssignRandomKey()))).apply((PTransform)Reshuffle.of())).apply((PTransform)Values.create());
            PCollection result = (PCollection)((PCollection)dataset1.apply((PTransform)GroupByKey.create())).apply((PTransform)Combine.groupedValues((SerializableFunction)new CountFn()));
            PAssert.that((PCollection)result).satisfies((SerializableFunction)new AssertThatCountPerKeyCorrect(10));
            PAssert.that((PCollection)((PCollection)result.apply((PTransform)Keys.create()))).satisfies((SerializableFunction)new AssertThatAllKeysExist(5));
            this.p.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, LargeKeys.Above10KB.class})
        public void testLargeKeys10KB() throws Exception {
            GroupByKeyTest.runLargeKeysTest(this.p, 10240);
        }

        @Test
        @Category(value={ValidatesRunner.class, LargeKeys.Above100KB.class})
        public void testLargeKeys100KB() throws Exception {
            GroupByKeyTest.runLargeKeysTest(this.p, 102400);
        }

        @Test
        @Category(value={ValidatesRunner.class, LargeKeys.Above1MB.class})
        public void testLargeKeys1MB() throws Exception {
            GroupByKeyTest.runLargeKeysTest(this.p, 0x100000);
        }

        @Test
        @Category(value={ValidatesRunner.class, LargeKeys.Above10MB.class})
        public void testLargeKeys10MB() throws Exception {
            GroupByKeyTest.runLargeKeysTest(this.p, 0xA00000);
        }

        @Test
        @Category(value={ValidatesRunner.class, LargeKeys.Above100MB.class})
        public void testLargeKeys100MB() throws Exception {
            GroupByKeyTest.runLargeKeysTest(this.p, 0x6400000);
        }
    }

    public static abstract class SharedTestBase {
        @Rule
        public transient TestPipeline p = TestPipeline.create();
        @Rule
        public transient ExpectedException thrown = ExpectedException.none();
    }
}

