package org.apache.beam.sdk.transforms;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest.class */
public class DoFnTesterTest {

    @Rule
    public final TestPipeline p = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest$BundleCounterDoFn.class */
    private static class BundleCounterDoFn extends DoFn<Integer, Integer> {
        private int elements;

        private BundleCounterDoFn() {
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.elements = 0;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
            this.elements++;
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<Integer, Integer>.FinishBundleContext finishBundleContext) {
            finishBundleContext.output(Integer.valueOf(this.elements), Instant.now(), GlobalWindow.INSTANCE);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest$CountBundleCallsFn.class */
    private static class CountBundleCallsFn extends DoFn<Long, String> {
        private int numStartBundleCalls;
        private int numFinishBundleCalls;

        private CountBundleCallsFn() {
            this.numStartBundleCalls = 0;
            this.numFinishBundleCalls = 0;
        }

        @DoFn.ProcessElement
        public void process(DoFn<Long, String>.ProcessContext processContext) {
            processContext.output(this.numStartBundleCalls + "/" + this.numFinishBundleCalls);
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.numStartBundleCalls++;
        }

        @DoFn.FinishBundle
        public void finishBundle() {
            this.numFinishBundleCalls++;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest$CounterDoFn.class */
    private static class CounterDoFn extends DoFn<Long, String> {
        Counter agg;
        Counter startBundleCalls;
        Counter finishBundleCalls;
        private LifecycleState state;

        /* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest$CounterDoFn$LifecycleState.class */
        private enum LifecycleState {
            UNINITIALIZED,
            SET_UP,
            INSIDE_BUNDLE,
            TORN_DOWN
        }

        private CounterDoFn() {
            this.agg = Metrics.counter(CounterDoFn.class, "ctr");
            this.startBundleCalls = Metrics.counter(CounterDoFn.class, "startBundleCalls");
            this.finishBundleCalls = Metrics.counter(CounterDoFn.class, "finishBundleCalls");
            this.state = LifecycleState.UNINITIALIZED;
        }

        @DoFn.Setup
        public void setup() {
            Preconditions.checkState(this.state == LifecycleState.UNINITIALIZED, "Wrong state: %s", this.state);
            this.state = LifecycleState.SET_UP;
        }

        @DoFn.StartBundle
        public void startBundle() {
            Preconditions.checkState(this.state == LifecycleState.SET_UP, "Wrong state: %s", this.state);
            this.state = LifecycleState.INSIDE_BUNDLE;
            this.startBundleCalls.inc();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, String>.ProcessContext processContext) throws Exception {
            Preconditions.checkState(this.state == LifecycleState.INSIDE_BUNDLE, "Wrong state: %s", this.state);
            this.agg.inc(((Long) processContext.element()).longValue());
            processContext.outputWithTimestamp(((Long) processContext.element()).toString(), new Instant(1000 * ((Long) processContext.element()).longValue()));
        }

        @DoFn.FinishBundle
        public void finishBundle() {
            Preconditions.checkState(this.state == LifecycleState.INSIDE_BUNDLE, "Wrong state: %s", this.state);
            this.state = LifecycleState.SET_UP;
            this.finishBundleCalls.inc();
        }

        @DoFn.Teardown
        public void teardown() {
            Preconditions.checkState(this.state == LifecycleState.SET_UP, "Wrong state: %s", this.state);
            this.state = LifecycleState.TORN_DOWN;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest$DoFnWithWindowParameter.class */
    private static class DoFnWithWindowParameter extends DoFn<Integer, KV<Integer, BoundedWindow>> {
        private DoFnWithWindowParameter() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, KV<Integer, BoundedWindow>>.ProcessContext processContext, BoundedWindow boundedWindow) {
            processContext.output(KV.of((Integer) processContext.element(), boundedWindow));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest$ReifyTimestamps.class */
    static class ReifyTimestamps extends DoFn<Long, TimestampedValue<Long>> {
        ReifyTimestamps() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, TimestampedValue<Long>>.ProcessContext processContext) {
            processContext.output(TimestampedValue.of((Long) processContext.element(), processContext.timestamp()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest$SideInputDoFn.class */
    private static class SideInputDoFn extends DoFn<Integer, Integer> {
        private final PCollectionView<Integer> value;

        private SideInputDoFn(PCollectionView<Integer> pCollectionView) {
            this.value = pCollectionView;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) throws Exception {
            processContext.output((Integer) processContext.sideInput(this.value));
        }
    }

    @Test
    public void processElement() throws Exception {
        for (DoFnTester.CloningBehavior cloningBehavior : DoFnTester.CloningBehavior.values()) {
            DoFnTester of = DoFnTester.of(new CounterDoFn());
            Throwable th = null;
            try {
                try {
                    of.setCloningBehavior(cloningBehavior);
                    of.processElement(1L);
                    Assert.assertThat(of.takeOutputElements(), Matchers.hasItems(new String[]{"1"}));
                    Assert.assertTrue(of.takeOutputElements().isEmpty());
                    Assert.assertTrue(of.peekOutputElements().isEmpty());
                    if (of != null) {
                        $closeResource(null, of);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (of != null) {
                    $closeResource(th, of);
                }
                throw th2;
            }
        }
    }

    @Test
    public void processElementsWithPeeks() throws Exception {
        for (DoFnTester.CloningBehavior cloningBehavior : DoFnTester.CloningBehavior.values()) {
            DoFnTester of = DoFnTester.of(new CounterDoFn());
            Throwable th = null;
            try {
                try {
                    of.setCloningBehavior(cloningBehavior);
                    of.startBundle();
                    of.processElement(1L);
                    of.processElement(2L);
                    Assert.assertThat(of.peekOutputElements(), Matchers.hasItems(new String[]{"1", "2"}));
                    of.processElement(3L);
                    of.processElement(4L);
                    Assert.assertThat(of.peekOutputElements(), Matchers.hasItems(new String[]{"1", "2", "3", "4"}));
                    Assert.assertThat(of.takeOutputElements(), Matchers.hasItems(new String[]{"1", "2", "3", "4"}));
                    Assert.assertTrue(of.peekOutputElements().isEmpty());
                    Assert.assertTrue(of.takeOutputElements().isEmpty());
                    of.processElement(5L);
                    of.processElement(6L);
                    Assert.assertThat(of.peekOutputElements(), Matchers.hasItems(new String[]{"5", "6"}));
                    Assert.assertThat(of.takeOutputElements(), Matchers.hasItems(new String[]{"5", "6"}));
                    of.finishBundle();
                    if (of != null) {
                        $closeResource(null, of);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (of != null) {
                    $closeResource(th, of);
                }
                throw th2;
            }
        }
    }

    @Test
    public void processBundle() throws Exception {
        for (DoFnTester.CloningBehavior cloningBehavior : DoFnTester.CloningBehavior.values()) {
            DoFnTester of = DoFnTester.of(new CounterDoFn());
            Throwable th = null;
            try {
                try {
                    of.setCloningBehavior(cloningBehavior);
                    Assert.assertThat(of.processBundle(new Long[]{1L, 2L, 3L, 4L}), Matchers.hasItems(new String[]{"1", "2", "3", "4"}));
                    Assert.assertTrue(of.peekOutputElements().isEmpty());
                    if (of != null) {
                        $closeResource(null, of);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (of != null) {
                    $closeResource(th, of);
                }
                throw th2;
            }
        }
    }

    @Test
    public void processMultipleBundles() throws Exception {
        for (DoFnTester.CloningBehavior cloningBehavior : DoFnTester.CloningBehavior.values()) {
            DoFnTester of = DoFnTester.of(new CounterDoFn());
            Throwable th = null;
            try {
                try {
                    of.setCloningBehavior(cloningBehavior);
                    Assert.assertThat(of.processBundle(new Long[]{1L, 2L, 3L, 4L}), Matchers.hasItems(new String[]{"1", "2", "3", "4"}));
                    Assert.assertThat(of.processBundle(new Long[]{5L, 6L, 7L}), Matchers.hasItems(new String[]{"5", "6", "7"}));
                    Assert.assertThat(of.processBundle(new Long[]{8L, 9L}), Matchers.hasItems(new String[]{"8", "9"}));
                    Assert.assertTrue(of.peekOutputElements().isEmpty());
                    if (of != null) {
                        $closeResource(null, of);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (of != null) {
                    $closeResource(th, of);
                }
                throw th2;
            }
        }
    }

    @Test
    public void doNotClone() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        DoFnTester of = DoFnTester.of(new DoFn<Long, String>() { // from class: org.apache.beam.sdk.transforms.DoFnTesterTest.1
            @DoFn.ProcessElement
            public void process(DoFn<Long, String>.ProcessContext processContext) {
            }

            @DoFn.Setup
            public void setup() {
                atomicInteger.addAndGet(1);
            }

            @DoFn.Teardown
            public void teardown() {
                atomicInteger2.addAndGet(1);
            }
        });
        Throwable th = null;
        try {
            try {
                of.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
                of.processBundle(new Long[]{1L, 2L, 3L});
                of.processBundle(new Long[]{4L, 5L});
                of.processBundle(new Long[]{6L});
                if (of != null) {
                    $closeResource(null, of);
                }
                Assert.assertEquals(1L, atomicInteger.get());
                Assert.assertEquals(1L, atomicInteger2.get());
            } finally {
            }
        } catch (Throwable th2) {
            if (of != null) {
                $closeResource(th, of);
            }
            throw th2;
        }
    }

    @Test
    public void cloneOnce() throws Exception {
        DoFnTester of = DoFnTester.of(new CountBundleCallsFn());
        try {
            of.setCloningBehavior(DoFnTester.CloningBehavior.CLONE_ONCE);
            Assert.assertThat(of.processBundle(new Long[]{1L, 2L, 3L}), Matchers.contains(new String[]{"1/0", "1/0", "1/0"}));
            Assert.assertThat(of.processBundle(new Long[]{4L, 5L}), Matchers.contains(new String[]{"2/1", "2/1"}));
            Assert.assertThat(of.processBundle(new Long[]{6L}), Matchers.contains(new String[]{"3/2"}));
            if (of != null) {
                $closeResource(null, of);
            }
        } catch (Throwable th) {
            if (of != null) {
                $closeResource(null, of);
            }
            throw th;
        }
    }

    @Test
    public void clonePerBundle() throws Exception {
        DoFnTester of = DoFnTester.of(new CountBundleCallsFn());
        try {
            of.setCloningBehavior(DoFnTester.CloningBehavior.CLONE_PER_BUNDLE);
            Assert.assertThat(of.processBundle(new Long[]{1L, 2L, 3L}), Matchers.contains(new String[]{"1/0", "1/0", "1/0"}));
            Assert.assertThat(of.processBundle(new Long[]{4L, 5L}), Matchers.contains(new String[]{"1/0", "1/0"}));
            Assert.assertThat(of.processBundle(new Long[]{6L}), Matchers.contains(new String[]{"1/0"}));
            if (of != null) {
                $closeResource(null, of);
            }
        } catch (Throwable th) {
            if (of != null) {
                $closeResource(null, of);
            }
            throw th;
        }
    }

    @Test
    public void processTimestampedElement() throws Exception {
        DoFnTester of = DoFnTester.of(new ReifyTimestamps());
        try {
            TimestampedValue of2 = TimestampedValue.of(1L, new Instant(100L));
            of.processTimestampedElement(of2);
            Assert.assertThat(of.takeOutputElements(), Matchers.contains(new TimestampedValue[]{of2}));
            if (of != null) {
                $closeResource(null, of);
            }
        } catch (Throwable th) {
            if (of != null) {
                $closeResource(null, of);
            }
            throw th;
        }
    }

    @Test
    public void processElementWithOutputTimestamp() throws Exception {
        DoFnTester of = DoFnTester.of(new CounterDoFn());
        try {
            of.processElement(1L);
            of.processElement(2L);
            List peekOutputElementsWithTimestamp = of.peekOutputElementsWithTimestamp();
            TimestampedValue of2 = TimestampedValue.of("1", new Instant(1000L));
            TimestampedValue of3 = TimestampedValue.of("2", new Instant(2000L));
            Assert.assertThat(peekOutputElementsWithTimestamp, Matchers.hasItems(new TimestampedValue[]{of2, of3}));
            of.processElement(3L);
            of.processElement(4L);
            TimestampedValue of4 = TimestampedValue.of("3", new Instant(3000L));
            TimestampedValue of5 = TimestampedValue.of("4", new Instant(4000L));
            Assert.assertThat(of.peekOutputElementsWithTimestamp(), Matchers.hasItems(new TimestampedValue[]{of2, of3, of4, of5}));
            Assert.assertThat(of.takeOutputElementsWithTimestamp(), Matchers.hasItems(new TimestampedValue[]{of2, of3, of4, of5}));
            Assert.assertTrue(of.takeOutputElementsWithTimestamp().isEmpty());
            Assert.assertTrue(of.peekOutputElementsWithTimestamp().isEmpty());
            Assert.assertTrue(of.peekOutputElements().isEmpty());
            Assert.assertTrue(of.takeOutputElements().isEmpty());
            if (of != null) {
                $closeResource(null, of);
            }
        } catch (Throwable th) {
            if (of != null) {
                $closeResource(null, of);
            }
            throw th;
        }
    }

    @Test
    public void peekValuesInWindow() throws Exception {
        DoFnTester of = DoFnTester.of(new CounterDoFn());
        try {
            of.startBundle();
            of.processElement(1L);
            of.processElement(2L);
            of.finishBundle();
            Assert.assertThat(of.peekOutputElementsInWindow(GlobalWindow.INSTANCE), Matchers.containsInAnyOrder(new TimestampedValue[]{TimestampedValue.of("1", new Instant(1000L)), TimestampedValue.of("2", new Instant(2000L))}));
            Assert.assertThat(of.peekOutputElementsInWindow(new IntervalWindow(new Instant(0L), new Instant(10L))), Matchers.emptyIterable());
            if (of != null) {
                $closeResource(null, of);
            }
        } catch (Throwable th) {
            if (of != null) {
                $closeResource(null, of);
            }
            throw th;
        }
    }

    @Test
    public void fnWithSideInputDefault() throws Exception {
        DoFnTester of = DoFnTester.of(new SideInputDoFn(this.p.apply(Create.empty(VarIntCoder.of())).apply(View.asSingleton().withDefaultValue(0))));
        Throwable th = null;
        try {
            try {
                of.processElement(1);
                of.processElement(2);
                of.processElement(4);
                of.processElement(8);
                Assert.assertThat(of.peekOutputElements(), Matchers.containsInAnyOrder(new Integer[]{0, 0, 0, 0}));
                if (of != null) {
                    $closeResource(null, of);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (of != null) {
                $closeResource(th, of);
            }
            throw th3;
        }
    }

    @Test
    public void fnWithSideInputExplicit() throws Exception {
        PCollectionView apply = this.p.apply(Create.of(-2, new Integer[0])).apply(View.asSingleton().withDefaultValue(0));
        DoFnTester of = DoFnTester.of(new SideInputDoFn(apply));
        Throwable th = null;
        try {
            try {
                of.setSideInput(apply, GlobalWindow.INSTANCE, -2);
                of.processElement(16);
                of.processElement(32);
                of.processElement(64);
                of.processElement(128);
                of.finishBundle();
                Assert.assertThat(of.peekOutputElements(), Matchers.containsInAnyOrder(new Integer[]{-2, -2, -2, -2}));
                if (of != null) {
                    $closeResource(null, of);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (of != null) {
                $closeResource(th, of);
            }
            throw th3;
        }
    }

    @Test
    public void testSupportsWindowParameter() throws Exception {
        Instant now = Instant.now();
        DoFnTester of = DoFnTester.of(new DoFnWithWindowParameter());
        try {
            IntervalWindow intervalWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(1L)));
            of.processWindowedElement(1, now, intervalWindow);
            of.processWindowedElement(2, now, intervalWindow);
            IntervalWindow intervalWindow2 = new IntervalWindow(now, now.plus(Duration.standardMinutes(4L)));
            of.processWindowedElement(3, now, intervalWindow2);
            of.finishBundle();
            Assert.assertThat(of.peekOutputElementsInWindow(intervalWindow), Matchers.containsInAnyOrder(new TimestampedValue[]{TimestampedValue.of(KV.of(1, intervalWindow), now), TimestampedValue.of(KV.of(2, intervalWindow), now)}));
            Assert.assertThat(of.peekOutputElementsInWindow(intervalWindow2), Matchers.containsInAnyOrder(new TimestampedValue[]{TimestampedValue.of(KV.of(3, intervalWindow2), now)}));
            if (of != null) {
                $closeResource(null, of);
            }
        } catch (Throwable th) {
            if (of != null) {
                $closeResource(null, of);
            }
            throw th;
        }
    }

    @Test
    public void testSupportsFinishBundleOutput() throws Exception {
        for (DoFnTester.CloningBehavior cloningBehavior : DoFnTester.CloningBehavior.values()) {
            DoFnTester of = DoFnTester.of(new BundleCounterDoFn());
            Throwable th = null;
            try {
                try {
                    of.setCloningBehavior(cloningBehavior);
                    Assert.assertThat(of.processBundle(new Integer[]{1, 2, 3, 4}), Matchers.contains(new Integer[]{4}));
                    Assert.assertThat(of.processBundle(new Integer[]{5, 6, 7}), Matchers.contains(new Integer[]{3}));
                    Assert.assertThat(of.processBundle(new Integer[]{8, 9}), Matchers.contains(new Integer[]{2}));
                    if (of != null) {
                        $closeResource(null, of);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (of != null) {
                    $closeResource(th, of);
                }
                throw th2;
            }
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
