package org.apache.beam.runners.spark.translation.streaming;

import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import org.apache.beam.runners.spark.ReuseSparkContextRule;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.class */
public class SparkCoGroupByKeyStreamingTest {
    private static final TupleTag<Integer> INPUT1_TAG = new TupleTag<>("input1");
    private static final TupleTag<Integer> INPUT2_TAG = new TupleTag<>("input2");

    @Rule
    public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();

    @Rule
    public final TestPipeline pipeline = TestPipeline.create();

    private Duration batchDuration() {
        return Duration.millis(this.pipeline.getOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis().longValue());
    }

    @Test
    @Category({StreamingTest.class})
    public void testInStreamingMode() throws Exception {
        Instant instant = new Instant(0L);
        CreateStream advanceNextBatchWatermarkToInfinity = CreateStream.of(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), batchDuration()).emptyBatch().advanceWatermarkForNextBatch(instant).nextBatch(new TimestampedValue[]{TimestampedValue.of(KV.of(1, 1), instant), TimestampedValue.of(KV.of(1, 2), instant), TimestampedValue.of(KV.of(1, 3), instant)}).advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(1L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(KV.of(2, 4), instant.plus(Duration.standardSeconds(1L))), TimestampedValue.of(KV.of(2, 5), instant.plus(Duration.standardSeconds(1L))), TimestampedValue.of(KV.of(2, 6), instant.plus(Duration.standardSeconds(1L)))}).advanceNextBatchWatermarkToInfinity();
        CreateStream advanceNextBatchWatermarkToInfinity2 = CreateStream.of(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), batchDuration()).emptyBatch().advanceWatermarkForNextBatch(instant).nextBatch(new TimestampedValue[]{TimestampedValue.of(KV.of(1, 11), instant), TimestampedValue.of(KV.of(1, 12), instant), TimestampedValue.of(KV.of(1, 13), instant)}).advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(1L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(KV.of(2, 14), instant.plus(Duration.standardSeconds(1L))), TimestampedValue.of(KV.of(2, 15), instant.plus(Duration.standardSeconds(1L))), TimestampedValue.of(KV.of(2, 16), instant.plus(Duration.standardSeconds(1L)))}).advanceNextBatchWatermarkToInfinity();
        PAssert.that("Wrong output of the join using CoGroupByKey in streaming mode", KeyedPCollectionTuple.of(INPUT1_TAG, this.pipeline.apply("create source1", advanceNextBatchWatermarkToInfinity).apply("window input1", Window.into(FixedWindows.of(Duration.standardSeconds(3L))).withAllowedLateness(Duration.ZERO))).and(INPUT2_TAG, this.pipeline.apply("create source2", advanceNextBatchWatermarkToInfinity2).apply("window input2", Window.into(FixedWindows.of(Duration.standardSeconds(3L))).withAllowedLateness(Duration.ZERO))).apply(CoGroupByKey.create())).satisfies(iterable -> {
            Assert.assertEquals("Wrong size of the output PCollection", 2L, Iterables.size(iterable));
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                KV kv = (KV) it.next();
                if (((Integer) kv.getKey()).intValue() == 1) {
                    Iterable all = ((CoGbkResult) kv.getValue()).getAll(INPUT1_TAG);
                    Assert.assertEquals("Wrong number of values for output elements for tag input1 and key 1", 3L, Iterables.size(all));
                    Assert.assertThat("Elements of PCollection input1 for key \"1\" are not present in the output PCollection", all, Matchers.containsInAnyOrder(new Integer[]{1, 2, 3}));
                    Iterable all2 = ((CoGbkResult) kv.getValue()).getAll(INPUT2_TAG);
                    Assert.assertEquals("Wrong number of values for output elements for tag input2 and key 1", 3L, Iterables.size(all2));
                    Assert.assertThat("Elements of PCollection input2 for key \"1\" are not present in the output PCollection", all2, Matchers.containsInAnyOrder(new Integer[]{11, 12, 13}));
                } else if (((Integer) kv.getKey()).intValue() == 2) {
                    Iterable all3 = ((CoGbkResult) kv.getValue()).getAll(INPUT1_TAG);
                    Assert.assertEquals("Wrong number of values for output elements for tag input1 and key 2", 3L, Iterables.size(all3));
                    Assert.assertThat("Elements of PCollection input1 for key \"2\" are not present in the output PCollection", all3, Matchers.containsInAnyOrder(new Integer[]{4, 5, 6}));
                    Iterable all4 = ((CoGbkResult) kv.getValue()).getAll(INPUT2_TAG);
                    Assert.assertEquals("Wrong number of values for output elements for tag input2 and key 2", 3L, Iterables.size(all4));
                    Assert.assertThat("Elements of PCollection input2 for key \"2\" are not present in the output PCollection", all4, Matchers.containsInAnyOrder(new Integer[]{14, 15, 16}));
                } else {
                    Assert.fail("Unknown key in the output PCollection");
                }
            }
            return null;
        });
        this.pipeline.run();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -35932545:
                if (implMethodName.equals("lambda$testInStreamingMode$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        Assert.assertEquals("Wrong size of the output PCollection", 2L, Iterables.size(iterable));
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            KV kv = (KV) it.next();
                            if (((Integer) kv.getKey()).intValue() == 1) {
                                Iterable all = ((CoGbkResult) kv.getValue()).getAll(INPUT1_TAG);
                                Assert.assertEquals("Wrong number of values for output elements for tag input1 and key 1", 3L, Iterables.size(all));
                                Assert.assertThat("Elements of PCollection input1 for key \"1\" are not present in the output PCollection", all, Matchers.containsInAnyOrder(new Integer[]{1, 2, 3}));
                                Iterable all2 = ((CoGbkResult) kv.getValue()).getAll(INPUT2_TAG);
                                Assert.assertEquals("Wrong number of values for output elements for tag input2 and key 1", 3L, Iterables.size(all2));
                                Assert.assertThat("Elements of PCollection input2 for key \"1\" are not present in the output PCollection", all2, Matchers.containsInAnyOrder(new Integer[]{11, 12, 13}));
                            } else if (((Integer) kv.getKey()).intValue() == 2) {
                                Iterable all3 = ((CoGbkResult) kv.getValue()).getAll(INPUT1_TAG);
                                Assert.assertEquals("Wrong number of values for output elements for tag input1 and key 2", 3L, Iterables.size(all3));
                                Assert.assertThat("Elements of PCollection input1 for key \"2\" are not present in the output PCollection", all3, Matchers.containsInAnyOrder(new Integer[]{4, 5, 6}));
                                Iterable all4 = ((CoGbkResult) kv.getValue()).getAll(INPUT2_TAG);
                                Assert.assertEquals("Wrong number of values for output elements for tag input2 and key 2", 3L, Iterables.size(all4));
                                Assert.assertThat("Elements of PCollection input2 for key \"2\" are not present in the output PCollection", all4, Matchers.containsInAnyOrder(new Integer[]{14, 15, 16}));
                            } else {
                                Assert.fail("Unknown key in the output PCollection");
                            }
                        }
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
