/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.MoreObjects;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Lists;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class WaitTest
implements Serializable {
    @Rule
    public transient TestPipeline p = TestPipeline.create();
    private static final AtomicReference<Instant> TEST_WAIT_MAX_MAIN_TIMESTAMP = new AtomicReference();

    private PCollection<Long> generateStreamWithBoundedDisorder(String name, Instant base, Duration totalDuration, int numElements, Duration allowedLateness) {
        TestStream.Builder stream = TestStream.create((Coder)VarLongCoder.of());
        ArrayList<Instant> watermarks = Lists.newArrayList();
        for (int i = 0; i < numElements; ++i) {
            watermarks.add(base.plus((ReadableDuration)new Duration((long)((double)totalDuration.getMillis() * Math.random()))));
        }
        Collections.sort(watermarks);
        ArrayList events = Lists.newArrayList();
        for (int i = 0; i < numElements; ++i) {
            Instant processingTimestamp = base.plus((long)(1.0 * (double)i * (double)totalDuration.getMillis() / (double)(numElements + 1)));
            Instant instant = (Instant)watermarks.get(i);
            Instant elementTimestamp = instant.minus((long)(Math.random() * (double)allowedLateness.getMillis()));
            events.add(new Event(processingTimestamp, instant));
            events.add(new Event(processingTimestamp, TimestampedValue.of((Object)i, (Instant)elementTimestamp)));
        }
        Instant lastProcessingTime = base;
        for (Event event : events) {
            Duration processingTimeDelta = new Duration((ReadableInstant)lastProcessingTime, (ReadableInstant)event.processingTime);
            if (processingTimeDelta.getMillis() > 0L) {
                stream = stream.advanceProcessingTime(processingTimeDelta);
            }
            lastProcessingTime = event.processingTime;
            if (event.element != null) {
                stream = stream.addElements(event.element, new TimestampedValue[0]);
                continue;
            }
            stream = stream.advanceWatermarkTo(event.watermarkUpdate);
        }
        return (PCollection)this.p.apply(name, (PTransform)stream.advanceWatermarkToInfinity());
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testWaitWithSameFixedWindows() {
        this.testWaitWithParameters(Duration.standardMinutes((long)1L), Duration.standardSeconds((long)15L), 20, (WindowFn<? super Long, ?>)FixedWindows.of((Duration)Duration.standardSeconds((long)15L)), 20, (WindowFn<? super Long, ?>)FixedWindows.of((Duration)Duration.standardSeconds((long)15L)));
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testWaitWithDifferentFixedWindows() {
        this.testWaitWithParameters(Duration.standardMinutes((long)1L), Duration.standardSeconds((long)15L), 20, (WindowFn<? super Long, ?>)FixedWindows.of((Duration)Duration.standardSeconds((long)15L)), 20, (WindowFn<? super Long, ?>)FixedWindows.of((Duration)Duration.standardSeconds((long)7L)));
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testWaitWithSignalInSlidingWindows() {
        this.testWaitWithParameters(Duration.standardMinutes((long)1L), Duration.standardSeconds((long)15L), 20, (WindowFn<? super Long, ?>)FixedWindows.of((Duration)Duration.standardSeconds((long)15L)), 20, (WindowFn<? super Long, ?>)SlidingWindows.of((Duration)Duration.standardSeconds((long)7L)).every(Duration.standardSeconds((long)1L)));
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testWaitInGlobalWindow() {
        this.testWaitWithParameters(Duration.standardMinutes((long)1L), Duration.standardSeconds((long)15L), 20, (WindowFn<? super Long, ?>)new GlobalWindows(), 20, (WindowFn<? super Long, ?>)new GlobalWindows());
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testWaitBoundedInDefaultWindow() {
        this.testWaitWithParameters(Duration.standardMinutes((long)1L), Duration.standardSeconds((long)15L), 20, null, 20, null);
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testWaitWithSomeSignalWindowsEmpty() {
        this.testWaitWithParameters(Duration.standardMinutes((long)1L), Duration.ZERO, 20, (WindowFn<? super Long, ?>)FixedWindows.of((Duration)Duration.standardSeconds((long)1L)), 10, (WindowFn<? super Long, ?>)FixedWindows.of((Duration)Duration.standardSeconds((long)1L)));
    }

    private void testWaitWithParameters(Duration duration, Duration lateness, int numMainElements, @Nullable WindowFn<? super Long, ?> mainWindowFn, int numSignalElements, @Nullable WindowFn<? super Long, ?> signalWindowFn) {
        TEST_WAIT_MAX_MAIN_TIMESTAMP.set(null);
        Instant base = Instant.now();
        PCollection input = this.generateStreamWithBoundedDisorder("main", base, duration, numMainElements, lateness);
        if (mainWindowFn == null) {
            input.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
        } else {
            input = (PCollection)input.apply("Window main", (PTransform)Window.into(mainWindowFn).discardingFiredPanes().triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)1))).withAllowedLateness(lateness));
        }
        input = (PCollection)input.apply("Fire main", new Fire());
        PCollection signal = this.generateStreamWithBoundedDisorder("signal", base, duration, numSignalElements, lateness);
        if (signalWindowFn == null) {
            signal.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
        } else {
            signal = (PCollection)signal.apply("Window signal", (PTransform)Window.into(signalWindowFn).discardingFiredPanes().triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)1))).withAllowedLateness(lateness));
        }
        signal = (PCollection)((PCollection)signal.apply("Fire signal", new Fire())).apply("Check sequencing", (PTransform)ParDo.of((DoFn)new DoFn<Long, Long>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
                Instant maxMainTimestamp = (Instant)TEST_WAIT_MAX_MAIN_TIMESTAMP.get();
                if (maxMainTimestamp != null) {
                    Assert.assertFalse((String)("Signal at timestamp " + c.timestamp() + " generated after main timestamp progressed to " + maxMainTimestamp), (boolean)c.timestamp().isBefore((ReadableInstant)maxMainTimestamp));
                }
                c.output((Object)((Long)c.element()));
            }
        }));
        PCollection output = (PCollection)input.apply((PTransform)Wait.on((PCollection[])new PCollection[]{signal}));
        output.apply("Update main timestamp", (PTransform)ParDo.of((DoFn)new DoFn<Long, Long>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c, BoundedWindow w) {
                Instant newMaxTimestamp;
                Instant maxMainTimestamp;
                do {
                    Instant instant = newMaxTimestamp = (maxMainTimestamp = (Instant)TEST_WAIT_MAX_MAIN_TIMESTAMP.get()) == null || c.timestamp().isAfter((ReadableInstant)maxMainTimestamp) ? c.timestamp() : maxMainTimestamp;
                } while (!TEST_WAIT_MAX_MAIN_TIMESTAMP.compareAndSet(maxMainTimestamp, newMaxTimestamp));
                c.output((Object)((Long)c.element()));
            }
        }));
        ArrayList<Long> expectedOutput = Lists.newArrayList();
        for (int i = 0; i < numMainElements; ++i) {
            expectedOutput.add(Long.valueOf(i));
        }
        PAssert.that((PCollection)output).containsInAnyOrder(expectedOutput);
        this.p.run();
    }

    private static class Fire<T>
    extends PTransform<PCollection<T>, PCollection<T>> {
        private Fire() {
        }

        public PCollection<T> expand(PCollection<T> input) {
            return (PCollection)((PCollection)((PCollection)((PCollection)input.apply((PTransform)WithKeys.of((Object)""))).apply((PTransform)GroupByKey.create())).apply((PTransform)Values.create())).apply((PTransform)Flatten.iterables());
        }
    }

    private static class Event<T> {
        private final Instant processingTime;
        private final TimestampedValue<T> element;
        private final Instant watermarkUpdate;

        private Event(Instant processingTime, TimestampedValue<T> element) {
            this.processingTime = processingTime;
            this.element = element;
            this.watermarkUpdate = null;
        }

        private Event(Instant processingTime, Instant watermarkUpdate) {
            this.processingTime = processingTime;
            this.element = null;
            this.watermarkUpdate = watermarkUpdate;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("processingTime", this.processingTime).add("element", this.element).add("watermarkUpdate", this.watermarkUpdate).toString();
        }
    }
}

