/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms.windowing;

import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Splitter;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class WindowingTest
implements Serializable {
    @Rule
    public final transient TestPipeline p = TestPipeline.create();
    @Rule
    public transient TemporaryFolder tmpFolder = new TemporaryFolder();

    private String output(String value, int count, int timestamp, int windowStart, int windowEnd) {
        return value + ":" + count + ":" + timestamp + ":[" + new Instant((long)windowStart) + ".." + new Instant((long)windowEnd) + ")";
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testPartitioningWindowing() {
        PCollection input = (PCollection)this.p.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)"a", (Instant)new Instant(1L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)"b", (Instant)new Instant(2L)), TimestampedValue.of((Object)"b", (Instant)new Instant(3L)), TimestampedValue.of((Object)"c", (Instant)new Instant(11L)), TimestampedValue.of((Object)"d", (Instant)new Instant(11L))}));
        PCollection output = (PCollection)input.apply((PTransform)new WindowedCount((WindowFn<? super String, ?>)FixedWindows.of((Duration)new Duration(10L))));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{this.output("a", 1, 1, 0, 10), this.output("b", 2, 2, 0, 10), this.output("c", 1, 11, 10, 20), this.output("d", 1, 11, 10, 20)});
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testNonPartitioningWindowing() {
        PCollection input = (PCollection)this.p.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)"a", (Instant)new Instant(1L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)"a", (Instant)new Instant(7L)), TimestampedValue.of((Object)"b", (Instant)new Instant(8L))}));
        PCollection output = (PCollection)input.apply((PTransform)new WindowedCount((WindowFn<? super String, ?>)SlidingWindows.of((Duration)new Duration(10L)).every(new Duration(5L))));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{this.output("a", 1, 1, -5, 5), this.output("a", 2, 5, 0, 10), this.output("a", 1, 10, 5, 15), this.output("b", 1, 8, 0, 10), this.output("b", 1, 10, 5, 15)});
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testMergingWindowing() {
        PCollection input = (PCollection)this.p.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)"a", (Instant)new Instant(1L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)"a", (Instant)new Instant(5L)), TimestampedValue.of((Object)"a", (Instant)new Instant(20L))}));
        PCollection output = (PCollection)input.apply((PTransform)new WindowedCount((WindowFn<? super String, ?>)Sessions.withGapDuration((Duration)new Duration(10L))));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{this.output("a", 2, 1, 1, 15), this.output("a", 1, 20, 20, 30)});
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testWindowPreservation() {
        PCollection input1 = (PCollection)this.p.apply("Create12", (PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)"a", (Instant)new Instant(1L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)"b", (Instant)new Instant(2L))}));
        PCollection input2 = (PCollection)this.p.apply("Create34", (PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)"a", (Instant)new Instant(3L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)"b", (Instant)new Instant(4L))}));
        PCollectionList input = PCollectionList.of((PCollection)input1).and(input2);
        PCollection output = (PCollection)((PCollection)input.apply((PTransform)Flatten.pCollections())).apply((PTransform)new WindowedCount((WindowFn<? super String, ?>)FixedWindows.of((Duration)new Duration(5L))));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{this.output("a", 2, 1, 0, 5), this.output("b", 2, 2, 0, 5)});
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testEmptyInput() {
        PCollection input = (PCollection)this.p.apply((PTransform)Create.empty((Coder)StringUtf8Coder.of()));
        PCollection output = (PCollection)input.apply((PTransform)new WindowedCount((WindowFn<? super String, ?>)FixedWindows.of((Duration)new Duration(10L))));
        PAssert.that((PCollection)output).empty();
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testTextIoInput() throws Exception {
        File tmpFile = this.tmpFolder.newFile("file.txt");
        String filename = tmpFile.getPath();
        try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile));){
            writer.println("a 1");
            writer.println("b 2");
            writer.println("b 3");
            writer.println("c 11");
            writer.println("d 11");
        }
        PCollection output = (PCollection)((PCollection)((PCollection)this.p.begin().apply("ReadLines", (PTransform)TextIO.read().from(filename))).apply((PTransform)ParDo.of((DoFn)new ExtractWordsWithTimestampsFn()))).apply((PTransform)new WindowedCount((WindowFn<? super String, ?>)FixedWindows.of((Duration)Duration.millis((long)10L))));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{this.output("a", 1, 1, 0, 10), this.output("b", 2, 2, 0, 10), this.output("c", 1, 11, 10, 20), this.output("d", 1, 11, 10, 20)});
        this.p.run();
    }

    static class ExtractWordsWithTimestampsFn
    extends DoFn<String, String> {
        ExtractWordsWithTimestampsFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            List<String> words = Splitter.onPattern("[^a-zA-Z0-9']+").splitToList((CharSequence)c.element());
            if (words.size() == 2) {
                c.outputWithTimestamp((Object)words.get(0), new Instant(Long.parseLong(words.get(1))));
            }
        }
    }

    private static class WindowedCount
    extends PTransform<PCollection<String>, PCollection<String>> {
        private WindowFn<? super String, ?> windowFn;

        public WindowedCount(WindowFn<? super String, ?> windowFn) {
            this.windowFn = windowFn;
        }

        public PCollection<String> expand(PCollection<String> in) {
            return ((PCollection)((PCollection)((PCollection)in.apply("Window", (PTransform)Window.into(this.windowFn).withTimestampCombiner(TimestampCombiner.EARLIEST))).apply(Count.perElement())).apply("FormatCounts", (PTransform)ParDo.of((DoFn)new FormatCountsDoFn()))).setCoder((Coder)StringUtf8Coder.of());
        }

        private static final class FormatCountsDoFn
        extends DoFn<KV<String, Long>, String> {
            private FormatCountsDoFn() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, BoundedWindow window) {
                c.output((Object)((String)((KV)c.element()).getKey() + ":" + ((KV)c.element()).getValue() + ":" + c.timestamp().getMillis() + ":" + window));
            }
        }
    }
}

