/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.core.Is;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class ReshuffleTest
implements Serializable {
    private static final ImmutableList<KV<String, Integer>> ARBITRARY_KVS = ImmutableList.of(KV.of((Object)"k1", (Object)3), KV.of((Object)"k5", (Object)Integer.MAX_VALUE), KV.of((Object)"k5", (Object)Integer.MIN_VALUE), KV.of((Object)"k2", (Object)66), KV.of((Object)"k1", (Object)4), KV.of((Object)"k2", (Object)-33), KV.of((Object)"k3", (Object)0));
    private static final ImmutableList<KV<String, Integer>> GBK_TESTABLE_KVS = ImmutableList.of(KV.of((Object)"k1", (Object)3), KV.of((Object)"k2", (Object)4));
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Test
    @Category(value={ValidatesRunner.class})
    public void testJustReshuffle() {
        PCollection input = (PCollection)this.pipeline.apply((PTransform)Create.of(ARBITRARY_KVS).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())));
        PCollection output = (PCollection)input.apply((PTransform)Reshuffle.of());
        PAssert.that((PCollection)output).containsInAnyOrder(ARBITRARY_KVS);
        Assert.assertEquals((Object)input.getWindowingStrategy(), (Object)output.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testReshufflePreservesTimestamps() {
        PCollection input = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)"foo", (Instant)BoundedWindow.TIMESTAMP_MIN_VALUE), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)"foo", (Instant)new Instant(0L)), TimestampedValue.of((Object)"bar", (Instant)new Instant(33L)), TimestampedValue.of((Object)"bar", (Instant)GlobalWindow.INSTANCE.maxTimestamp())}).withCoder((Coder)StringUtf8Coder.of()))).apply((PTransform)WithKeys.of((SerializableFunction & Serializable)input12 -> input12).withKeyType(TypeDescriptors.strings()))).apply("ReifyOriginalTimestamps", Reify.timestampsInValue());
        PCollection output = (PCollection)((PCollection)((PCollection)input.apply((PTransform)Reshuffle.of())).apply("ReifyReshuffledTimestamps", Reify.timestampsInValue())).apply((PTransform)Values.create());
        PAssert.that((PCollection)output).satisfies((SerializableFunction & Serializable)input1 -> {
            for (TimestampedValue elem : input1) {
                Instant originalTimestamp = ((TimestampedValue)elem.getValue()).getTimestamp();
                Instant afterReshuffleTimestamp = elem.getTimestamp();
                Assert.assertThat((String)"Reshuffle must preserve element timestamps", (Object)afterReshuffleTimestamp, (Matcher)Matchers.equalTo((Object)originalTimestamp));
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testReshuffleAfterSessionsAndGroupByKey() {
        PCollection input = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(GBK_TESTABLE_KVS).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())))).apply((PTransform)Window.into((WindowFn)Sessions.withGapDuration((Duration)Duration.standardMinutes((long)10L))))).apply((PTransform)GroupByKey.create());
        PCollection output = (PCollection)input.apply((PTransform)Reshuffle.of());
        PAssert.that((PCollection)output).satisfies((SerializableFunction)new AssertThatHasExpectedContents());
        Assert.assertEquals((Object)input.getWindowingStrategy(), (Object)output.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testReshuffleAfterFixedWindowsAndGroupByKey() {
        PCollection input = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(GBK_TESTABLE_KVS).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)10L))))).apply((PTransform)GroupByKey.create());
        PCollection output = (PCollection)input.apply((PTransform)Reshuffle.of());
        PAssert.that((PCollection)output).satisfies((SerializableFunction)new AssertThatHasExpectedContents());
        Assert.assertEquals((Object)input.getWindowingStrategy(), (Object)output.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
        PCollection input = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(GBK_TESTABLE_KVS).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)10L))))).apply((PTransform)GroupByKey.create());
        PCollection output = (PCollection)input.apply((PTransform)Reshuffle.of());
        PAssert.that((PCollection)output).satisfies((SerializableFunction)new AssertThatHasExpectedContents());
        Assert.assertEquals((Object)input.getWindowingStrategy(), (Object)output.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testReshuffleAfterFixedWindows() {
        PCollection input = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(ARBITRARY_KVS).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)10L))));
        PCollection output = (PCollection)input.apply((PTransform)Reshuffle.of());
        PAssert.that((PCollection)output).containsInAnyOrder(ARBITRARY_KVS);
        Assert.assertEquals((Object)input.getWindowingStrategy(), (Object)output.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testReshuffleAfterSlidingWindows() {
        PCollection input = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(ARBITRARY_KVS).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)10L))));
        PCollection output = (PCollection)input.apply((PTransform)Reshuffle.of());
        PAssert.that((PCollection)output).containsInAnyOrder(ARBITRARY_KVS);
        Assert.assertEquals((Object)input.getWindowingStrategy(), (Object)output.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesTestStream.class})
    public void testReshuffleWithTimestampsStreaming() {
        TestStream stream = TestStream.create((Coder)VarLongCoder.of()).advanceWatermarkTo(new Instant(0L).plus((ReadableDuration)Duration.standardDays((long)48L))).addElements(TimestampedValue.of((Object)0L, (Instant)new Instant(0L)), new TimestampedValue[]{TimestampedValue.of((Object)1L, (Instant)new Instant(0L).plus((ReadableDuration)Duration.standardDays((long)48L))), TimestampedValue.of((Object)2L, (Instant)BoundedWindow.TIMESTAMP_MAX_VALUE.minus((ReadableDuration)Duration.standardDays((long)48L)))}).advanceWatermarkToInfinity();
        PCollection input = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)stream)).apply((PTransform)WithKeys.of((Object)""))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)10L))));
        PCollection reshuffled = (PCollection)input.apply((PTransform)Reshuffle.of());
        PAssert.that((PCollection)((PCollection)reshuffled.apply((PTransform)Values.create()))).containsInAnyOrder((Object[])new Long[]{0L, 1L, 2L});
        this.pipeline.run();
    }

    private static class AssertThatHasExpectedContents
    implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> {
        private AssertThatHasExpectedContents() {
        }

        public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
            Assert.assertThat(actual, (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Matcher[])new Matcher[]{TestUtils.KvMatcher.isKv(Is.is((Object)"k1"), IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new Integer[]{3})), TestUtils.KvMatcher.isKv(Is.is((Object)"k2"), IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new Integer[]{4}))}));
            return null;
        }
    }
}

