package org.apache.beam.sdk.util;

import java.util.List;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/util/ReshuffleTest.class */
public class ReshuffleTest {
    private static final List<KV<String, Integer>> ARBITRARY_KVS = ImmutableList.of(KV.of("k1", 3), KV.of("k5", Integer.MAX_VALUE), KV.of("k5", Integer.MIN_VALUE), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), KV.of("k3", 0));
    private static final List<KV<String, Integer>> GBK_TESTABLE_KVS = ImmutableList.of(KV.of("k1", 3), KV.of("k2", 4));
    private static final List<KV<String, Iterable<Integer>>> GROUPED_TESTABLE_KVS = ImmutableList.of(KV.of("k1", ImmutableList.of(3)), KV.of("k2", ImmutableList.of(4)));

    @Test
    @Category({RunnableOnService.class})
    public void testJustReshuffle() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(ARBITRARY_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).containsInAnyOrder(ARBITRARY_KVS);
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testReshuffleAfterSessionsAndGroupByKey() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(GBK_TESTABLE_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(10L)))).apply(GroupByKey.create());
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).containsInAnyOrder(GROUPED_TESTABLE_KVS);
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testReshuffleAfterFixedWindowsAndGroupByKey() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(GBK_TESTABLE_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L)))).apply(GroupByKey.create());
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).containsInAnyOrder(GROUPED_TESTABLE_KVS);
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(GBK_TESTABLE_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L)))).apply(GroupByKey.create());
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).containsInAnyOrder(GROUPED_TESTABLE_KVS);
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testReshuffleAfterFixedWindows() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(ARBITRARY_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))));
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).containsInAnyOrder(ARBITRARY_KVS);
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testReshuffleAfterSlidingWindows() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(ARBITRARY_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))));
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).containsInAnyOrder(ARBITRARY_KVS);
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        create.run();
    }
}
