package org.apache.beam.runners.spark.translation.streaming;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.spark.ReuseSparkContextRule;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;

@Category({StreamingTest.class})
/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.class */
public class CreateStreamTest implements Serializable {

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Rule
    public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();

    @Rule
    public final transient ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/CreateStreamTest$LifecycleDoFn.class */
    private static class LifecycleDoFn extends DoFn<Integer, Integer> {
        static AtomicInteger setupCalls = new AtomicInteger(0);
        static AtomicInteger teardownCalls = new AtomicInteger(0);

        private LifecycleDoFn() {
        }

        @DoFn.Setup
        public void setup() {
            setupCalls.incrementAndGet();
        }

        @DoFn.Teardown
        public void teardown() {
            teardownCalls.incrementAndGet();
        }

        @DoFn.ProcessElement
        public void process(DoFn<Integer, Integer>.ProcessContext processContext) {
            processContext.output((Integer) processContext.element());
        }
    }

    @Test
    public void testLateDataAccumulating() throws IOException {
        Instant instant = new Instant(0L);
        PCollection apply = this.p.apply(CreateStream.of(VarIntCoder.of(), batchDuration()).emptyBatch().advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(1, instant), TimestampedValue.of(2, instant), TimestampedValue.of(3, instant)}).advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(4, instant), TimestampedValue.of(5, instant)}).advanceNextBatchWatermarkToInfinity().nextBatch(new TimestampedValue[]{TimestampedValue.of(-1, instant), TimestampedValue.of(-2, instant), TimestampedValue.of(-3, instant)})).apply(Window.into(FixedWindows.of(Duration.standardMinutes(5L))).triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(2L))).withLateFirings(AfterPane.elementCountAtLeast(1))).accumulatingFiredPanes().withAllowedLateness(Duration.standardMinutes(5L), Window.ClosingBehavior.FIRE_ALWAYS));
        PCollection apply2 = apply.apply(WithKeys.of(1)).apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
        PCollection apply3 = apply.apply(Combine.globally(Count.combineFn()).withoutDefaults());
        PCollection apply4 = apply.apply(Sum.integersGlobally().withoutDefaults());
        IntervalWindow intervalWindow = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
        PAssert.that(apply2).inFinalPane(intervalWindow).containsInAnyOrder(new Integer[]{1, 2, 3, 4, 5});
        PAssert.that(apply2).inOnTimePane(intervalWindow).containsInAnyOrder(new Integer[]{1, 2, 3});
        PAssert.that(apply3).inWindow(intervalWindow).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                MatcherAssert.assertThat((Long) it.next(), Matchers.allOf(Matchers.greaterThanOrEqualTo(3L), Matchers.lessThanOrEqualTo(5L)));
            }
            return null;
        });
        PAssert.that(apply4).inWindow(intervalWindow).satisfies(iterable2 -> {
            Iterator it = iterable2.iterator();
            while (it.hasNext()) {
                MatcherAssert.assertThat((Integer) it.next(), Matchers.allOf(Matchers.greaterThanOrEqualTo(6), Matchers.lessThanOrEqualTo(15)));
            }
            return null;
        });
        this.p.run();
    }

    @Test
    public void testDiscardingMode() throws IOException {
        CreateStream nextBatch = CreateStream.of(StringUtf8Coder.of(), batchDuration()).nextBatch(new TimestampedValue[]{TimestampedValue.of("firstPane", new Instant(100L)), TimestampedValue.of("alsoFirstPane", new Instant(200L))}).advanceWatermarkForNextBatch(new Instant(1001L)).nextBatch(new TimestampedValue[]{TimestampedValue.of("onTimePane", new Instant(500L))}).advanceNextBatchWatermarkToInfinity().nextBatch(new TimestampedValue[]{TimestampedValue.of("finalLatePane", new Instant(750L)), TimestampedValue.of("alsoFinalLatePane", new Instant(250L))});
        FixedWindows of = FixedWindows.of(Duration.millis(1000L));
        PCollection apply = this.p.apply(nextBatch).apply(Window.into(of).triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(2)).withLateFirings(Never.ever())).discardingFiredPanes().withAllowedLateness(Duration.millis(5000L))).apply(WithKeys.of(1)).apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
        IntervalWindow assignWindow = of.assignWindow(new Instant(100L));
        PAssert.that(apply).inWindow(assignWindow).containsInAnyOrder(new String[]{"firstPane", "alsoFirstPane", "onTimePane", "finalLatePane", "alsoFinalLatePane"});
        PAssert.that(apply).inCombinedNonLatePanes(assignWindow).containsInAnyOrder(new String[]{"firstPane", "alsoFirstPane", "onTimePane"});
        PAssert.that(apply).inOnTimePane(assignWindow).containsInAnyOrder(new String[]{"onTimePane"});
        PAssert.that(apply).inFinalPane(assignWindow).containsInAnyOrder(new String[]{"finalLatePane", "alsoFinalLatePane"});
        this.p.run();
    }

    @Test
    public void testFirstElementLate() throws IOException {
        Instant instant = new Instant(-1000000L);
        CreateStream advanceNextBatchWatermarkToInfinity = CreateStream.of(StringUtf8Coder.of(), batchDuration()).emptyBatch().advanceWatermarkForNextBatch(new Instant(0L)).emptyBatch().nextBatch(new TimestampedValue[]{TimestampedValue.of("late", instant), TimestampedValue.of("onTime", new Instant(100L))}).advanceNextBatchWatermarkToInfinity();
        FixedWindows of = FixedWindows.of(Duration.millis(1000L));
        PCollection apply = this.p.apply(advanceNextBatchWatermarkToInfinity).apply(Window.into(of).triggering(DefaultTrigger.of()).discardingFiredPanes().withAllowedLateness(Duration.millis(5000L))).apply(WithKeys.of(1)).apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
        PAssert.that(apply).inWindow(of.assignWindow(instant)).empty();
        PAssert.that(apply).inWindow(of.assignWindow(new Instant(100L))).containsInAnyOrder(new String[]{"onTime"});
        this.p.run();
    }

    @Test
    public void testElementsAtAlmostPositiveInfinity() throws IOException {
        Instant maxTimestamp = GlobalWindow.INSTANCE.maxTimestamp();
        CreateStream advanceNextBatchWatermarkToInfinity = CreateStream.of(StringUtf8Coder.of(), batchDuration()).nextBatch(new TimestampedValue[]{TimestampedValue.of("foo", maxTimestamp), TimestampedValue.of("bar", maxTimestamp)}).advanceNextBatchWatermarkToInfinity();
        FixedWindows of = FixedWindows.of(Duration.standardHours(6L));
        PAssert.that(this.p.apply(advanceNextBatchWatermarkToInfinity).apply(Window.into(of)).apply(WithKeys.of(1)).apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables())).inWindow(of.assignWindow(GlobalWindow.INSTANCE.maxTimestamp())).containsInAnyOrder(new String[]{"foo", "bar"});
        this.p.run();
    }

    @Test
    public void testMultipleStreams() throws IOException {
        CreateStream advanceNextBatchWatermarkToInfinity = CreateStream.of(StringUtf8Coder.of(), batchDuration()).nextBatch(new String[]{"foo", "bar"}).advanceNextBatchWatermarkToInfinity();
        CreateStream advanceNextBatchWatermarkToInfinity2 = CreateStream.of(VarIntCoder.of(), batchDuration()).nextBatch(new Integer[]{1, 2, 3, 4}).advanceNextBatchWatermarkToInfinity();
        PAssert.that(this.p.apply("CreateStrings", advanceNextBatchWatermarkToInfinity).apply("WindowStrings", Window.configure().triggering(AfterPane.elementCountAtLeast(2)).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes())).containsInAnyOrder(new String[]{"foo", "bar"});
        PAssert.that(this.p.apply("CreateInts", advanceNextBatchWatermarkToInfinity2).apply("WindowInts", Window.configure().triggering(AfterPane.elementCountAtLeast(4)).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes())).containsInAnyOrder(new Integer[]{1, 2, 3, 4});
        this.p.run();
    }

    @Test
    public void testFlattenedWithWatermarkHold() throws IOException {
        Instant instant = new Instant(0L);
        CreateStream advanceWatermarkForNextBatch = CreateStream.of(VarIntCoder.of(), batchDuration()).emptyBatch().advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(1, instant), TimestampedValue.of(2, instant), TimestampedValue.of(3, instant)}).advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10L)));
        CreateStream advanceNextBatchWatermarkToInfinity = CreateStream.of(VarIntCoder.of(), batchDuration()).emptyBatch().advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(4, instant)}).advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(2L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(5, instant)}).advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5L))).emptyBatch().advanceNextBatchWatermarkToInfinity();
        PAssert.that(PCollectionList.of(this.p.apply("CreateStream1", advanceWatermarkForNextBatch).apply("Window1", Window.into(FixedWindows.of(Duration.standardMinutes(5L))).triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes().withAllowedLateness(Duration.ZERO))).and(this.p.apply("CreateStream2", advanceNextBatchWatermarkToInfinity).apply("Window2", Window.into(FixedWindows.of(Duration.standardMinutes(5L))).triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes().withAllowedLateness(Duration.ZERO))).apply(Flatten.pCollections()).apply(WithKeys.of(1)).apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables())).inOnTimePane(new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)))).containsInAnyOrder(new Integer[]{1, 2, 3, 4, 5});
        this.p.run();
    }

    @Test
    public void testMultiOutputParDo() throws IOException {
        Instant instant = new Instant(0L);
        PCollection apply = this.p.apply(CreateStream.of(VarIntCoder.of(), batchDuration()).emptyBatch().advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(1, instant), TimestampedValue.of(2, instant), TimestampedValue.of(3, instant)}).advanceNextBatchWatermarkToInfinity());
        TupleTag tupleTag = new TupleTag();
        final TupleTag tupleTag2 = new TupleTag();
        PCollectionTuple apply2 = apply.apply(ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.translation.streaming.CreateStreamTest.1
            @DoFn.ProcessElement
            public void process(DoFn<Integer, Integer>.ProcessContext processContext) {
                Integer num = (Integer) processContext.element();
                processContext.output(num);
                processContext.output(tupleTag2, Integer.valueOf(num.intValue() + 1));
            }
        }).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        PCollection coder = apply2.get(tupleTag).setCoder(VarIntCoder.of());
        PCollection coder2 = apply2.get(tupleTag2).setCoder(VarIntCoder.of());
        PAssert.that(coder).containsInAnyOrder(new Integer[]{1, 2, 3});
        PAssert.that(coder2).containsInAnyOrder(new Integer[]{2, 3, 4});
        this.p.run();
    }

    @Test
    public void testParDoCallsSetupAndTeardown() {
        Instant instant = new Instant(0L);
        this.p.apply(CreateStream.of(VarIntCoder.of(), batchDuration()).emptyBatch().advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(1, instant), TimestampedValue.of(2, instant), TimestampedValue.of(3, instant)}).advanceNextBatchWatermarkToInfinity()).apply(ParDo.of(new LifecycleDoFn()));
        this.p.run();
        MatcherAssert.assertThat("Function should have been torn down", Integer.valueOf(LifecycleDoFn.teardownCalls.intValue()), Matchers.is(Matchers.equalTo(Integer.valueOf(LifecycleDoFn.setupCalls.intValue()))));
    }

    @Test
    public void testElementAtPositiveInfinityThrows() {
        CreateStream advanceNextBatchWatermarkToInfinity = CreateStream.of(VarIntCoder.of(), batchDuration()).nextBatch(new TimestampedValue[]{TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L)))}).advanceNextBatchWatermarkToInfinity();
        this.thrown.expect(IllegalArgumentException.class);
        advanceNextBatchWatermarkToInfinity.nextBatch(new TimestampedValue[]{TimestampedValue.of(1, BoundedWindow.TIMESTAMP_MAX_VALUE)});
    }

    @Test
    public void testAdvanceWatermarkNonMonotonicThrows() {
        CreateStream advanceWatermarkForNextBatch = CreateStream.of(VarIntCoder.of(), batchDuration()).advanceWatermarkForNextBatch(new Instant(0L));
        this.thrown.expect(IllegalArgumentException.class);
        advanceWatermarkForNextBatch.advanceWatermarkForNextBatch(new Instant(-1L)).advanceNextBatchWatermarkToInfinity();
    }

    @Test
    public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
        CreateStream advanceWatermarkForNextBatch = CreateStream.of(VarIntCoder.of(), batchDuration()).advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L)));
        this.thrown.expect(IllegalArgumentException.class);
        advanceWatermarkForNextBatch.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    @Test
    public void testInStreamingModeCountByKey() throws Exception {
        Instant instant = new Instant(0L);
        PAssert.that("Wrong count value ", this.p.apply("create kv Source", CreateStream.of(KvCoder.of(VarIntCoder.of(), VarLongCoder.of()), batchDuration()).emptyBatch().advanceWatermarkForNextBatch(instant).nextBatch(new TimestampedValue[]{TimestampedValue.of(KV.of(1, 100L), instant.plus(Duration.standardSeconds(3L))), TimestampedValue.of(KV.of(1, 300L), instant.plus(Duration.standardSeconds(4L)))}).advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(7L))).nextBatch(new TimestampedValue[]{TimestampedValue.of(KV.of(1, 400L), instant.plus(Duration.standardSeconds(8L)))}).advanceNextBatchWatermarkToInfinity()).apply("window input", Window.into(FixedWindows.of(Duration.standardSeconds(3L))).withAllowedLateness(Duration.ZERO)).apply(Count.perKey())).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                KV kv = (KV) it.next();
                if (((Integer) kv.getKey()).intValue() == 1) {
                    Assert.assertNotEquals("Count Value is 0 !!!", 0L, ((Long) kv.getValue()).longValue());
                } else {
                    Assert.fail("Unknown key in the output PCollection");
                }
            }
            return null;
        });
        this.p.run();
    }

    private Duration batchDuration() {
        return Duration.millis(this.p.getOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis().longValue());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 413298162:
                if (implMethodName.equals("lambda$testLateDataAccumulating$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
            case 413298163:
                if (implMethodName.equals("lambda$testLateDataAccumulating$43268ee4$2")) {
                    z = true;
                    break;
                }
                break;
            case 820134726:
                if (implMethodName.equals("lambda$testInStreamingModeCountByKey$43268ee4$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/streaming/CreateStreamTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            MatcherAssert.assertThat((Long) it.next(), Matchers.allOf(Matchers.greaterThanOrEqualTo(3L), Matchers.lessThanOrEqualTo(5L)));
                        }
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/streaming/CreateStreamTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable2 -> {
                        Iterator it = iterable2.iterator();
                        while (it.hasNext()) {
                            MatcherAssert.assertThat((Integer) it.next(), Matchers.allOf(Matchers.greaterThanOrEqualTo(6), Matchers.lessThanOrEqualTo(15)));
                        }
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/streaming/CreateStreamTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable3 -> {
                        Iterator it = iterable3.iterator();
                        while (it.hasNext()) {
                            KV kv = (KV) it.next();
                            if (((Integer) kv.getKey()).intValue() == 1) {
                                Assert.assertNotEquals("Count Value is 0 !!!", 0L, ((Long) kv.getValue()).longValue());
                            } else {
                                Assert.fail("Unknown key in the output PCollection");
                            }
                        }
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
