/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class GroupIntoBatchesTest
implements Serializable {
    private static final int BATCH_SIZE = 5;
    private static final long NUM_ELEMENTS = 10L;
    private static final int ALLOWED_LATENESS = 0;
    private static final Logger LOG = LoggerFactory.getLogger(GroupIntoBatchesTest.class);
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private transient ArrayList<KV<String, String>> data = GroupIntoBatchesTest.createTestData();

    private static ArrayList<KV<String, String>> createTestData() {
        String[] scientists = new String[]{"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
        ArrayList<KV<String, String>> data = new ArrayList<KV<String, String>>();
        int i = 0;
        while ((long)i < 10L) {
            int index = i % scientists.length;
            KV element = KV.of((Object)"key", (Object)scientists[index]);
            data.add((KV<String, String>)element);
            ++i;
        }
        return data;
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTimersInParDo.class, UsesStatefulParDo.class})
    public void testInGlobalWindow() {
        PCollection collection = ((PCollection)((PCollection)this.pipeline.apply("Input data", (PTransform)Create.of(this.data))).apply((PTransform)GroupIntoBatches.ofSize((long)5L))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)IterableCoder.of((Coder)StringUtf8Coder.of())));
        PAssert.that((String)"Incorrect batch size in one ore more elements", (PCollection)collection).satisfies((SerializableFunction)new SerializableFunction<Iterable<KV<String, Iterable<String>>>, Void>(){

            private boolean checkBatchSizes(Iterable<KV<String, Iterable<String>>> listToCheck) {
                for (KV<String, Iterable<String>> element : listToCheck) {
                    if (Iterables.size((Iterable)element.getValue()) == 5) continue;
                    return false;
                }
                return true;
            }

            public Void apply(Iterable<KV<String, Iterable<String>>> input) {
                Assert.assertTrue((boolean)this.checkBatchSizes(input));
                return null;
            }
        });
        PAssert.thatSingleton((String)"Incorrect collection size", (PCollection)((PCollection)collection.apply("Count", Count.globally()))).isEqualTo((Object)2L);
        this.pipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class, UsesStatefulParDo.class})
    public void testInStreamingMode() {
        int timestampInterval = 1;
        Instant startInstant = new Instant(0L);
        TestStream.Builder streamBuilder = TestStream.create((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).advanceWatermarkTo(startInstant);
        long offset = 0L;
        for (KV<String, String> element : this.data) {
            streamBuilder = streamBuilder.addElements(TimestampedValue.of(element, (Instant)startInstant.plus((ReadableDuration)Duration.standardSeconds((long)(offset * (long)timestampInterval)))), new TimestampedValue[0]);
            ++offset;
        }
        long windowDuration = 6L;
        TestStream stream = streamBuilder.advanceWatermarkTo(startInstant.plus((ReadableDuration)Duration.standardSeconds((long)5L))).advanceWatermarkTo(startInstant.plus((ReadableDuration)Duration.standardSeconds((long)7L))).advanceWatermarkTo(startInstant.plus((ReadableDuration)Duration.standardSeconds((long)10L))).advanceWatermarkToInfinity();
        PCollection inputCollection = (PCollection)((PCollection)this.pipeline.apply((PTransform)stream)).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)6L))).withAllowedLateness(Duration.millis((long)0L)));
        inputCollection.apply((PTransform)ParDo.of((DoFn)new DoFn<KV<String, String>, Void>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, BoundedWindow window) {
                LOG.debug("*** ELEMENT: ({},{}) *** with timestamp %s in window %s", new Object[]{((KV)c.element()).getKey(), ((KV)c.element()).getValue(), c.timestamp().toString(), window.toString()});
            }
        }));
        PCollection outputCollection = ((PCollection)inputCollection.apply((PTransform)GroupIntoBatches.ofSize((long)5L))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)IterableCoder.of((Coder)StringUtf8Coder.of())));
        PCollection countOutput = (PCollection)outputCollection.apply("Count elements in windows after applying GroupIntoBatches", Count.perKey());
        PAssert.that((String)"Wrong number of elements in windows after GroupIntoBatches", (PCollection)countOutput).satisfies((SerializableFunction & Serializable)input -> {
            Iterator inputIterator = input.iterator();
            long count0 = (Long)((KV)inputIterator.next()).getValue();
            Assert.assertEquals((String)"Wrong number of elements in first window", (long)2L, (long)count0);
            long count1 = (Long)((KV)inputIterator.next()).getValue();
            Assert.assertEquals((String)"Wrong number of elements in second window", (long)1L, (long)count1);
            return null;
        });
        PAssert.that((String)"Incorrect output collection after GroupIntoBatches", (PCollection)outputCollection).satisfies((SerializableFunction & Serializable)input -> {
            Iterator inputIterator = input.iterator();
            int size0 = Iterables.size((Iterable)((KV)inputIterator.next()).getValue());
            Assert.assertEquals((String)"Wrong first element batch Size", (long)5L, (long)size0);
            int size1 = Iterables.size((Iterable)((KV)inputIterator.next()).getValue());
            Assert.assertEquals((String)"Wrong second element batch Size", (long)1L, (long)size1);
            int size2 = Iterables.size((Iterable)((KV)inputIterator.next()).getValue());
            Assert.assertEquals((String)"Wrong third element batch Size", (long)4L, (long)size2);
            return null;
        });
        this.pipeline.run().waitUntilFinish();
    }
}

