package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.beam.repackaged.core.org.antlr.v4.runtime.atn.PredictionContext;
import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.ParDoTest;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.core.Is;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/ReshuffleTest.class */
public class ReshuffleTest implements Serializable {
    private static final ImmutableList<KV<String, Integer>> ARBITRARY_KVS = ImmutableList.of(KV.of("k1", 3), KV.of("k5", Integer.valueOf(PredictionContext.EMPTY_RETURN_STATE)), KV.of("k5", Integer.MIN_VALUE), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), KV.of("k3", 0));
    private static final ImmutableList<KV<String, Integer>> GBK_TESTABLE_KVS = ImmutableList.of(KV.of("k1", 3), KV.of("k2", 4));

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ReshuffleTest$AssertThatHasExpectedContents.class */
    private static class AssertThatHasExpectedContents implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> {
        private AssertThatHasExpectedContents() {
        }

        public Void apply(Iterable<KV<String, Iterable<Integer>>> iterable) {
            MatcherAssert.assertThat(iterable, IsIterableContainingInAnyOrder.containsInAnyOrder(new Matcher[]{TestUtils.KvMatcher.isKv(Is.is("k1"), IsIterableContainingInAnyOrder.containsInAnyOrder(new Integer[]{3})), TestUtils.KvMatcher.isKv(Is.is("k2"), IsIterableContainingInAnyOrder.containsInAnyOrder(new Integer[]{4}))}));
            return null;
        }
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testJustReshuffle() {
        PCollection apply = this.pipeline.apply(Create.of(ARBITRARY_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).containsInAnyOrder(ARBITRARY_KVS);
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testReshufflePreservesTimestamps() {
        PAssert.that(this.pipeline.apply(Create.timestamped(TimestampedValue.of(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, BoundedWindow.TIMESTAMP_MIN_VALUE), new TimestampedValue[]{TimestampedValue.of(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, new Instant(0L)), TimestampedValue.of("bar", new Instant(33L)), TimestampedValue.of("bar", GlobalWindow.INSTANCE.maxTimestamp())}).withCoder(StringUtf8Coder.of())).apply(WithKeys.of(str -> {
            return str;
        }).withKeyType(TypeDescriptors.strings())).apply("ReifyOriginalTimestamps", Reify.timestampsInValue()).apply(Reshuffle.of()).apply("ReifyReshuffledTimestamps", Reify.timestampsInValue()).apply(Values.create())).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                TimestampedValue timestampedValue = (TimestampedValue) it.next();
                MatcherAssert.assertThat("Reshuffle must preserve element timestamps", timestampedValue.getTimestamp(), Matchers.equalTo(((TimestampedValue) timestampedValue.getValue()).getTimestamp()));
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testReshuffleAfterSessionsAndGroupByKey() {
        PCollection apply = this.pipeline.apply(Create.of(GBK_TESTABLE_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(10L)))).apply(GroupByKey.create());
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).satisfies(new AssertThatHasExpectedContents());
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testReshuffleAfterFixedWindowsAndGroupByKey() {
        PCollection apply = this.pipeline.apply(Create.of(GBK_TESTABLE_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L)))).apply(GroupByKey.create());
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).satisfies(new AssertThatHasExpectedContents());
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
        PCollection apply = this.pipeline.apply(Create.of(GBK_TESTABLE_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L)))).apply(GroupByKey.create());
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).satisfies(new AssertThatHasExpectedContents());
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testReshuffleAfterFixedWindows() {
        PCollection apply = this.pipeline.apply(Create.of(ARBITRARY_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))));
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).containsInAnyOrder(ARBITRARY_KVS);
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testReshuffleAfterSlidingWindows() {
        PCollection apply = this.pipeline.apply(Create.of(ARBITRARY_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))));
        PCollection apply2 = apply.apply(Reshuffle.of());
        PAssert.that(apply2).containsInAnyOrder(ARBITRARY_KVS);
        Assert.assertEquals(apply.getWindowingStrategy(), apply2.getWindowingStrategy());
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesTestStream.class})
    public void testReshuffleWithTimestampsStreaming() {
        PAssert.that(this.pipeline.apply(TestStream.create(VarLongCoder.of()).advanceWatermarkTo(new Instant(0L).plus(Duration.standardDays(48L))).addElements(TimestampedValue.of(0L, new Instant(0L)), new TimestampedValue[]{TimestampedValue.of(1L, new Instant(0L).plus(Duration.standardDays(48L))), TimestampedValue.of(2L, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(48L)))}).advanceWatermarkToInfinity()).apply(WithKeys.of("")).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L)))).apply(Reshuffle.of()).apply(Values.create())).containsInAnyOrder(new Long[]{0L, 1L, 2L});
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testAssignShardFn() {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            newArrayList.addAll(ARBITRARY_KVS);
        }
        PAssert.that(this.pipeline.apply(Create.of(newArrayList).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(ParDo.of(new Reshuffle.AssignShardFn(2))).apply(GroupByKey.create()).apply(MapElements.into(TypeDescriptors.integers()).via((v0) -> {
            return v0.getKey();
        }))).containsInAnyOrder(ImmutableList.of(0, 1));
        this.pipeline.run();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = false;
                    break;
                }
                break;
            case -957893405:
                if (implMethodName.equals("lambda$testReshufflePreservesTimestamps$43268ee4$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1307059468:
                if (implMethodName.equals("lambda$testReshufflePreservesTimestamps$5a007597$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/ReshuffleTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/ReshuffleTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            TimestampedValue timestampedValue = (TimestampedValue) it.next();
                            MatcherAssert.assertThat("Reshuffle must preserve element timestamps", timestampedValue.getTimestamp(), Matchers.equalTo(((TimestampedValue) timestampedValue.getValue()).getTimestamp()));
                        }
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
