package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.SerializableMatchers;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.hamcrest.MatcherAssert;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.class */
public class GroupByKeyTest implements Serializable {

    @Rule
    public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest$AssertContains.class */
    static class AssertContains<K, V> extends DoFn<KV<K, Iterable<V>>, Void> {
        private final Map<K, List<SerializableMatcher<Iterable<? extends V>>>> byKey;

        public AssertContains(KV<K, SerializableMatcher<Iterable<? extends V>>>... kvArr) {
            this.byKey = (Map) Arrays.stream(kvArr).collect(Collectors.groupingBy((v0) -> {
                return v0.getKey();
            }, Collectors.mapping((v0) -> {
                return v0.getValue();
            }, Collectors.toList())));
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<K, Iterable<V>> kv) {
            MatcherAssert.assertThat("Unexpected key: " + kv.getKey(), this.byKey.containsKey(kv.getKey()));
            ImmutableList copyOf = ImmutableList.copyOf((Iterable) kv.getValue());
            MatcherAssert.assertThat("Unexpected values " + copyOf + " for key " + kv.getKey(), this.byKey.get(kv.getKey()).stream().anyMatch(serializableMatcher -> {
                return serializableMatcher.matches(copyOf);
            }));
        }
    }

    private static PipelineOptions testOptions() {
        SparkStructuredStreamingPipelineOptions as = PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
        as.setRunner(SparkStructuredStreamingRunner.class);
        as.setTestMode(true);
        return as;
    }

    @Test
    public void testGroupByKeyPreservesWindowing() {
        this.pipeline.apply(Create.timestamped(shuffleRandomly(TimestampedValue.of(KV.of(1, 1), new Instant(1L)), TimestampedValue.of(KV.of(1, 3), new Instant(2L)), TimestampedValue.of(KV.of(1, 5), new Instant(11L)), TimestampedValue.of(KV.of(2, 2), new Instant(3L)), TimestampedValue.of(KV.of(2, 4), new Instant(11L)), TimestampedValue.of(KV.of(2, 6), new Instant(12L))))).apply(Window.into(FixedWindows.of(Duration.millis(10L)))).apply(GroupByKey.create()).apply(ParDo.of(new AssertContains(KV.of(1, SerializableMatchers.containsInAnyOrder(new Integer[]{1, 3})), KV.of(1, SerializableMatchers.containsInAnyOrder(new Integer[]{5})), KV.of(2, SerializableMatchers.containsInAnyOrder(new Integer[]{4, 6})), KV.of(2, SerializableMatchers.containsInAnyOrder(new Integer[]{2})))));
        this.pipeline.run();
    }

    @Test
    public void testGroupByKeyExplodesMultipleWindows() {
        this.pipeline.apply(Create.timestamped(shuffleRandomly(TimestampedValue.of(KV.of(1, 1), new Instant(5L)), TimestampedValue.of(KV.of(1, 3), new Instant(7L)), TimestampedValue.of(KV.of(1, 5), new Instant(11L)), TimestampedValue.of(KV.of(2, 2), new Instant(5L)), TimestampedValue.of(KV.of(2, 4), new Instant(11L)), TimestampedValue.of(KV.of(2, 6), new Instant(12L))))).apply(Window.into(SlidingWindows.of(Duration.millis(10L)).every(Duration.millis(5L)))).apply(GroupByKey.create()).apply(ParDo.of(new AssertContains(KV.of(1, SerializableMatchers.containsInAnyOrder(new Integer[]{1, 3})), KV.of(1, SerializableMatchers.containsInAnyOrder(new Integer[]{1, 3, 5})), KV.of(1, SerializableMatchers.containsInAnyOrder(new Integer[]{5})), KV.of(2, SerializableMatchers.containsInAnyOrder(new Integer[]{2})), KV.of(2, SerializableMatchers.containsInAnyOrder(new Integer[]{2, 4, 6})), KV.of(2, SerializableMatchers.containsInAnyOrder(new Integer[]{4, 6})))));
        this.pipeline.run();
    }

    @Test
    public void testGroupByKeyWithMergingWindows() {
        this.pipeline.apply(Create.timestamped(shuffleRandomly(TimestampedValue.of(KV.of(1, 1), new Instant(5L)), TimestampedValue.of(KV.of(1, 3), new Instant(7L)), TimestampedValue.of(KV.of(1, 5), new Instant(11L)), TimestampedValue.of(KV.of(2, 2), new Instant(5L)), TimestampedValue.of(KV.of(2, 4), new Instant(11L)), TimestampedValue.of(KV.of(2, 6), new Instant(12L))))).apply(Window.into(Sessions.withGapDuration(Duration.millis(5L)))).apply(GroupByKey.create()).apply(ParDo.of(new AssertContains(KV.of(1, SerializableMatchers.containsInAnyOrder(new Integer[]{1, 3, 5})), KV.of(2, SerializableMatchers.containsInAnyOrder(new Integer[]{2})), KV.of(2, SerializableMatchers.containsInAnyOrder(new Integer[]{4, 6})))));
        this.pipeline.run();
    }

    @Test
    public void testGroupByKey() {
        PAssert.thatMap(this.pipeline.apply(Create.of(shuffleRandomly(KV.of(1, 1), KV.of(1, 3), KV.of(1, 5), KV.of(2, 2), KV.of(2, 4), KV.of(2, 6)))).apply(GroupByKey.create())).satisfies(map -> {
            MatcherAssert.assertThat((Iterable) map.get(1), SerializableMatchers.containsInAnyOrder(new Integer[]{1, 3, 5}));
            MatcherAssert.assertThat((Iterable) map.get(2), SerializableMatchers.containsInAnyOrder(new Integer[]{2, 4, 6}));
            return null;
        });
        this.pipeline.run();
    }

    private <T> List<T> shuffleRandomly(T... tArr) {
        ArrayList newArrayList = Lists.newArrayList(tArr);
        Collections.shuffle(newArrayList);
        return newArrayList;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1915392808:
                if (implMethodName.equals("lambda$testGroupByKey$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;)Ljava/lang/Void;")) {
                    return map -> {
                        MatcherAssert.assertThat((Iterable) map.get(1), SerializableMatchers.containsInAnyOrder(new Integer[]{1, 3, 5}));
                        MatcherAssert.assertThat((Iterable) map.get(2), SerializableMatchers.containsInAnyOrder(new Integer[]{2, 4, 6}));
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
