/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class DoFnTesterTest {
    @Rule
    public final TestPipeline p = TestPipeline.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void processElement() throws Exception {
        for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
            try (DoFnTester tester = DoFnTester.of((DoFn)new CounterDoFn());){
                tester.setCloningBehavior(cloning);
                tester.processElement((Object)1L);
                List take = tester.takeOutputElements();
                Assert.assertThat((Object)take, (Matcher)Matchers.hasItems((Object[])new String[]{"1"}));
                Assert.assertTrue((boolean)tester.takeOutputElements().isEmpty());
                Assert.assertTrue((boolean)tester.peekOutputElements().isEmpty());
            }
        }
    }

    @Test
    public void processElementsWithPeeks() throws Exception {
        for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
            try (DoFnTester tester = DoFnTester.of((DoFn)new CounterDoFn());){
                tester.setCloningBehavior(cloning);
                tester.startBundle();
                tester.processElement((Object)1L);
                tester.processElement((Object)2L);
                List peek = tester.peekOutputElements();
                Assert.assertThat((Object)peek, (Matcher)Matchers.hasItems((Object[])new String[]{"1", "2"}));
                tester.processElement((Object)3L);
                tester.processElement((Object)4L);
                peek = tester.peekOutputElements();
                Assert.assertThat((Object)peek, (Matcher)Matchers.hasItems((Object[])new String[]{"1", "2", "3", "4"}));
                List take = tester.takeOutputElements();
                Assert.assertThat((Object)take, (Matcher)Matchers.hasItems((Object[])new String[]{"1", "2", "3", "4"}));
                Assert.assertTrue((boolean)tester.peekOutputElements().isEmpty());
                Assert.assertTrue((boolean)tester.takeOutputElements().isEmpty());
                tester.processElement((Object)5L);
                tester.processElement((Object)6L);
                peek = tester.peekOutputElements();
                Assert.assertThat((Object)peek, (Matcher)Matchers.hasItems((Object[])new String[]{"5", "6"}));
                take = tester.takeOutputElements();
                Assert.assertThat((Object)take, (Matcher)Matchers.hasItems((Object[])new String[]{"5", "6"}));
                tester.finishBundle();
            }
        }
    }

    @Test
    public void processBundle() throws Exception {
        for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
            try (DoFnTester tester = DoFnTester.of((DoFn)new CounterDoFn());){
                tester.setCloningBehavior(cloning);
                Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{1L, 2L, 3L, 4L}), (Matcher)Matchers.hasItems((Object[])new String[]{"1", "2", "3", "4"}));
                Assert.assertTrue((boolean)tester.peekOutputElements().isEmpty());
            }
        }
    }

    @Test
    public void processMultipleBundles() throws Exception {
        for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
            try (DoFnTester tester = DoFnTester.of((DoFn)new CounterDoFn());){
                tester.setCloningBehavior(cloning);
                Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{1L, 2L, 3L, 4L}), (Matcher)Matchers.hasItems((Object[])new String[]{"1", "2", "3", "4"}));
                Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{5L, 6L, 7L}), (Matcher)Matchers.hasItems((Object[])new String[]{"5", "6", "7"}));
                Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{8L, 9L}), (Matcher)Matchers.hasItems((Object[])new String[]{"8", "9"}));
                Assert.assertTrue((boolean)tester.peekOutputElements().isEmpty());
            }
        }
    }

    @Test
    public void doNotClone() throws Exception {
        final AtomicInteger numSetupCalls = new AtomicInteger();
        final AtomicInteger numTeardownCalls = new AtomicInteger();
        DoFn<Long, String> fn = new DoFn<Long, String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext context) {
            }

            @DoFn.Setup
            public void setup() {
                numSetupCalls.addAndGet(1);
            }

            @DoFn.Teardown
            public void teardown() {
                numTeardownCalls.addAndGet(1);
            }
        };
        try (DoFnTester tester = DoFnTester.of((DoFn)fn);){
            tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
            tester.processBundle((Object[])new Long[]{1L, 2L, 3L});
            tester.processBundle((Object[])new Long[]{4L, 5L});
            tester.processBundle((Object[])new Long[]{6L});
        }
        Assert.assertEquals((long)1L, (long)numSetupCalls.get());
        Assert.assertEquals((long)1L, (long)numTeardownCalls.get());
    }

    @Test
    public void cloneOnce() throws Exception {
        try (DoFnTester tester = DoFnTester.of((DoFn)new CountBundleCallsFn());){
            tester.setCloningBehavior(DoFnTester.CloningBehavior.CLONE_ONCE);
            Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{1L, 2L, 3L}), (Matcher)Matchers.contains((Object[])new String[]{"1/0", "1/0", "1/0"}));
            Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{4L, 5L}), (Matcher)Matchers.contains((Object[])new String[]{"2/1", "2/1"}));
            Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{6L}), (Matcher)Matchers.contains((Object[])new String[]{"3/2"}));
        }
    }

    @Test
    public void clonePerBundle() throws Exception {
        try (DoFnTester tester = DoFnTester.of((DoFn)new CountBundleCallsFn());){
            tester.setCloningBehavior(DoFnTester.CloningBehavior.CLONE_PER_BUNDLE);
            Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{1L, 2L, 3L}), (Matcher)Matchers.contains((Object[])new String[]{"1/0", "1/0", "1/0"}));
            Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{4L, 5L}), (Matcher)Matchers.contains((Object[])new String[]{"1/0", "1/0"}));
            Assert.assertThat((Object)tester.processBundle((Object[])new Long[]{6L}), (Matcher)Matchers.contains((Object[])new String[]{"1/0"}));
        }
    }

    @Test
    public void processTimestampedElement() throws Exception {
        try (DoFnTester tester = DoFnTester.of((DoFn)new ReifyTimestamps());){
            TimestampedValue input = TimestampedValue.of((Object)1L, (Instant)new Instant(100L));
            tester.processTimestampedElement(input);
            Assert.assertThat((Object)tester.takeOutputElements(), (Matcher)Matchers.contains((Object[])new TimestampedValue[]{input}));
        }
    }

    @Test
    public void processElementWithOutputTimestamp() throws Exception {
        try (DoFnTester tester = DoFnTester.of((DoFn)new CounterDoFn());){
            tester.processElement((Object)1L);
            tester.processElement((Object)2L);
            List peek = tester.peekOutputElementsWithTimestamp();
            TimestampedValue one = TimestampedValue.of((Object)"1", (Instant)new Instant(1000L));
            TimestampedValue two = TimestampedValue.of((Object)"2", (Instant)new Instant(2000L));
            Assert.assertThat((Object)peek, (Matcher)Matchers.hasItems((Object[])new TimestampedValue[]{one, two}));
            tester.processElement((Object)3L);
            tester.processElement((Object)4L);
            TimestampedValue three = TimestampedValue.of((Object)"3", (Instant)new Instant(3000L));
            TimestampedValue four = TimestampedValue.of((Object)"4", (Instant)new Instant(4000L));
            peek = tester.peekOutputElementsWithTimestamp();
            Assert.assertThat((Object)peek, (Matcher)Matchers.hasItems((Object[])new TimestampedValue[]{one, two, three, four}));
            List take = tester.takeOutputElementsWithTimestamp();
            Assert.assertThat((Object)take, (Matcher)Matchers.hasItems((Object[])new TimestampedValue[]{one, two, three, four}));
            Assert.assertTrue((boolean)tester.takeOutputElementsWithTimestamp().isEmpty());
            Assert.assertTrue((boolean)tester.peekOutputElementsWithTimestamp().isEmpty());
            Assert.assertTrue((boolean)tester.peekOutputElements().isEmpty());
            Assert.assertTrue((boolean)tester.takeOutputElements().isEmpty());
        }
    }

    @Test
    public void peekValuesInWindow() throws Exception {
        try (DoFnTester tester = DoFnTester.of((DoFn)new CounterDoFn());){
            tester.startBundle();
            tester.processElement((Object)1L);
            tester.processElement((Object)2L);
            tester.finishBundle();
            Assert.assertThat((Object)tester.peekOutputElementsInWindow((BoundedWindow)GlobalWindow.INSTANCE), (Matcher)Matchers.containsInAnyOrder((Object[])new TimestampedValue[]{TimestampedValue.of((Object)"1", (Instant)new Instant(1000L)), TimestampedValue.of((Object)"2", (Instant)new Instant(2000L))}));
            Assert.assertThat((Object)tester.peekOutputElementsInWindow((BoundedWindow)new IntervalWindow(new Instant(0L), new Instant(10L))), (Matcher)Matchers.emptyIterable());
        }
    }

    @Test
    public void fnWithSideInputDefault() throws Exception {
        PCollection pCollection = (PCollection)this.p.apply((PTransform)Create.empty((Coder)VarIntCoder.of()));
        PCollectionView value = (PCollectionView)pCollection.apply((PTransform)View.asSingleton().withDefaultValue((Object)0));
        try (DoFnTester tester = DoFnTester.of((DoFn)new SideInputDoFn(value));){
            tester.processElement((Object)1);
            tester.processElement((Object)2);
            tester.processElement((Object)4);
            tester.processElement((Object)8);
            Assert.assertThat((Object)tester.peekOutputElements(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{0, 0, 0, 0}));
        }
    }

    @Test
    public void fnWithSideInputExplicit() throws Exception {
        PCollection pCollection = (PCollection)this.p.apply((PTransform)Create.of((Object)-2, (Object[])new Integer[0]));
        PCollectionView value = (PCollectionView)pCollection.apply((PTransform)View.asSingleton().withDefaultValue((Object)0));
        try (DoFnTester tester = DoFnTester.of((DoFn)new SideInputDoFn(value));){
            tester.setSideInput(value, (BoundedWindow)GlobalWindow.INSTANCE, (Object)-2);
            tester.processElement((Object)16);
            tester.processElement((Object)32);
            tester.processElement((Object)64);
            tester.processElement((Object)128);
            tester.finishBundle();
            Assert.assertThat((Object)tester.peekOutputElements(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{-2, -2, -2, -2}));
        }
    }

    @Test
    public void testSupportsWindowParameter() throws Exception {
        Instant now = Instant.now();
        try (DoFnTester tester = DoFnTester.of((DoFn)new DoFnWithWindowParameter());){
            IntervalWindow firstWindow = new IntervalWindow(now, now.plus((ReadableDuration)Duration.standardMinutes((long)1L)));
            tester.processWindowedElement((Object)1, now, (BoundedWindow)firstWindow);
            tester.processWindowedElement((Object)2, now, (BoundedWindow)firstWindow);
            IntervalWindow secondWindow = new IntervalWindow(now, now.plus((ReadableDuration)Duration.standardMinutes((long)4L)));
            tester.processWindowedElement((Object)3, now, (BoundedWindow)secondWindow);
            tester.finishBundle();
            Assert.assertThat((Object)tester.peekOutputElementsInWindow((BoundedWindow)firstWindow), (Matcher)Matchers.containsInAnyOrder((Object[])new TimestampedValue[]{TimestampedValue.of((Object)KV.of((Object)1, (Object)firstWindow), (Instant)now), TimestampedValue.of((Object)KV.of((Object)2, (Object)firstWindow), (Instant)now)}));
            Assert.assertThat((Object)tester.peekOutputElementsInWindow((BoundedWindow)secondWindow), (Matcher)Matchers.containsInAnyOrder((Object[])new TimestampedValue[]{TimestampedValue.of((Object)KV.of((Object)3, (Object)secondWindow), (Instant)now)}));
        }
    }

    @Test
    public void testSupportsFinishBundleOutput() throws Exception {
        for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
            try (DoFnTester tester = DoFnTester.of((DoFn)new BundleCounterDoFn());){
                tester.setCloningBehavior(cloning);
                Assert.assertThat((Object)tester.processBundle((Object[])new Integer[]{1, 2, 3, 4}), (Matcher)Matchers.contains((Object[])new Integer[]{4}));
                Assert.assertThat((Object)tester.processBundle((Object[])new Integer[]{5, 6, 7}), (Matcher)Matchers.contains((Object[])new Integer[]{3}));
                Assert.assertThat((Object)tester.processBundle((Object[])new Integer[]{8, 9}), (Matcher)Matchers.contains((Object[])new Integer[]{2}));
            }
        }
    }

    private static class CounterDoFn
    extends DoFn<Long, String> {
        Counter agg = Metrics.counter(CounterDoFn.class, (String)"ctr");
        Counter startBundleCalls = Metrics.counter(CounterDoFn.class, (String)"startBundleCalls");
        Counter finishBundleCalls = Metrics.counter(CounterDoFn.class, (String)"finishBundleCalls");
        private LifecycleState state = LifecycleState.UNINITIALIZED;

        private CounterDoFn() {
        }

        @DoFn.Setup
        public void setup() {
            Preconditions.checkState(this.state == LifecycleState.UNINITIALIZED, "Wrong state: %s", (Object)this.state);
            this.state = LifecycleState.SET_UP;
        }

        @DoFn.StartBundle
        public void startBundle() {
            Preconditions.checkState(this.state == LifecycleState.SET_UP, "Wrong state: %s", (Object)this.state);
            this.state = LifecycleState.INSIDE_BUNDLE;
            this.startBundleCalls.inc();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            Preconditions.checkState(this.state == LifecycleState.INSIDE_BUNDLE, "Wrong state: %s", (Object)this.state);
            this.agg.inc(((Long)c.element()).longValue());
            Instant instant = new Instant(1000L * (Long)c.element());
            c.outputWithTimestamp((Object)((Long)c.element()).toString(), instant);
        }

        @DoFn.FinishBundle
        public void finishBundle() {
            Preconditions.checkState(this.state == LifecycleState.INSIDE_BUNDLE, "Wrong state: %s", (Object)this.state);
            this.state = LifecycleState.SET_UP;
            this.finishBundleCalls.inc();
        }

        @DoFn.Teardown
        public void teardown() {
            Preconditions.checkState(this.state == LifecycleState.SET_UP, "Wrong state: %s", (Object)this.state);
            this.state = LifecycleState.TORN_DOWN;
        }

        private static enum LifecycleState {
            UNINITIALIZED,
            SET_UP,
            INSIDE_BUNDLE,
            TORN_DOWN;

        }
    }

    private static class SideInputDoFn
    extends DoFn<Integer, Integer> {
        private final PCollectionView<Integer> value;

        private SideInputDoFn(PCollectionView<Integer> value) {
            this.value = value;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            c.output((Object)((Integer)c.sideInput(this.value)));
        }
    }

    private static class BundleCounterDoFn
    extends DoFn<Integer, Integer> {
        private int elements;

        private BundleCounterDoFn() {
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.elements = 0;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            ++this.elements;
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext c) {
            c.output((Object)this.elements, Instant.now(), (BoundedWindow)GlobalWindow.INSTANCE);
        }
    }

    private static class DoFnWithWindowParameter
    extends DoFn<Integer, KV<Integer, BoundedWindow>> {
        private DoFnWithWindowParameter() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, BoundedWindow window) {
            c.output((Object)KV.of((Object)((Integer)c.element()), (Object)window));
        }
    }

    static class ReifyTimestamps
    extends DoFn<Long, TimestampedValue<Long>> {
        ReifyTimestamps() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            c.output((Object)TimestampedValue.of((Object)((Long)c.element()), (Instant)c.timestamp()));
        }
    }

    private static class CountBundleCallsFn
    extends DoFn<Long, String> {
        private int numStartBundleCalls = 0;
        private int numFinishBundleCalls = 0;

        private CountBundleCallsFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext context) {
            context.output((Object)(this.numStartBundleCalls + "/" + this.numFinishBundleCalls));
        }

        @DoFn.StartBundle
        public void startBundle() {
            ++this.numStartBundleCalls;
        }

        @DoFn.FinishBundle
        public void finishBundle() {
            ++this.numFinishBundleCalls;
        }
    }
}

