/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesParDoLifecycle;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class ParDoLifecycleTest
implements Serializable {
    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Test
    @Category(value={ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testFnCallSequence() {
        ((PCollection)PCollectionList.of((PCollection)((PCollection)this.p.apply("Impolite", (PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 4})))).and((PCollection)this.p.apply("Polite", (PTransform)Create.of((Object)3, (Object[])new Integer[]{5, 6, 7}))).apply((PTransform)Flatten.pCollections())).apply((PTransform)ParDo.of(new CallSequenceEnforcingFn()));
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testFnCallSequenceMulti() {
        ((PCollection)PCollectionList.of((PCollection)((PCollection)this.p.apply("Impolite", (PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 4})))).and((PCollection)this.p.apply("Polite", (PTransform)Create.of((Object)3, (Object[])new Integer[]{5, 6, 7}))).apply((PTransform)Flatten.pCollections())).apply((PTransform)ParDo.of(new CallSequenceEnforcingFn()).withOutputTags((TupleTag)new TupleTag<Integer>(){}, TupleTagList.empty()));
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesStatefulParDo.class, UsesParDoLifecycle.class})
    public void testFnCallSequenceStateful() {
        ((PCollection)PCollectionList.of((PCollection)((PCollection)this.p.apply("Impolite", (PTransform)Create.of((Object)KV.of((Object)"a", (Object)1), (Object[])new KV[]{KV.of((Object)"b", (Object)2), KV.of((Object)"a", (Object)4)})))).and((PCollection)this.p.apply("Polite", (PTransform)Create.of((Object)KV.of((Object)"b", (Object)3), (Object[])new KV[]{KV.of((Object)"a", (Object)5), KV.of((Object)"c", (Object)6), KV.of((Object)"c", (Object)7)}))).apply((PTransform)Flatten.pCollections())).apply((PTransform)ParDo.of(new CallSequenceEnforcingStatefulFn()).withOutputTags((TupleTag)new TupleTag<KV<String, Integer>>(){}, TupleTagList.empty()));
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testTeardownCalledAfterExceptionInStartBundle() {
        ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
        ((PCollection)this.p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}))).apply((PTransform)ParDo.of((DoFn)fn));
        try {
            this.p.run();
            Assert.fail((String)"Pipeline should have failed with an exception");
        }
        catch (Exception e) {
            Assert.assertThat((String)"Function should have been torn down after exception", (Object)ExceptionThrowingOldFn.teardownCalled.get(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testTeardownCalledAfterExceptionInProcessElement() {
        ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
        ((PCollection)this.p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}))).apply((PTransform)ParDo.of((DoFn)fn));
        try {
            this.p.run();
            Assert.fail((String)"Pipeline should have failed with an exception");
        }
        catch (Exception e) {
            Assert.assertThat((String)"Function should have been torn down after exception", (Object)ExceptionThrowingOldFn.teardownCalled.get(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testTeardownCalledAfterExceptionInFinishBundle() {
        ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
        ((PCollection)this.p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}))).apply((PTransform)ParDo.of((DoFn)fn));
        try {
            this.p.run();
            Assert.fail((String)"Pipeline should have failed with an exception");
        }
        catch (Exception e) {
            Assert.assertThat((String)"Function should have been torn down after exception", (Object)ExceptionThrowingOldFn.teardownCalled.get(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testWithContextTeardownCalledAfterExceptionInSetup() {
        ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP);
        ((PCollection)this.p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}))).apply((PTransform)ParDo.of((DoFn)fn));
        try {
            this.p.run();
            Assert.fail((String)"Pipeline should have failed with an exception");
        }
        catch (Exception e) {
            Assert.assertThat((String)"Function should have been torn down after exception", (Object)ExceptionThrowingOldFn.teardownCalled.get(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testWithContextTeardownCalledAfterExceptionInStartBundle() {
        ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
        ((PCollection)this.p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}))).apply((PTransform)ParDo.of((DoFn)fn));
        try {
            this.p.run();
            Assert.fail((String)"Pipeline should have failed with an exception");
        }
        catch (Exception e) {
            Assert.assertThat((String)"Function should have been torn down after exception", (Object)ExceptionThrowingOldFn.teardownCalled.get(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testWithContextTeardownCalledAfterExceptionInProcessElement() {
        ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
        ((PCollection)this.p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}))).apply((PTransform)ParDo.of((DoFn)fn));
        try {
            this.p.run();
            Assert.fail((String)"Pipeline should have failed with an exception");
        }
        catch (Exception e) {
            Assert.assertThat((String)"Function should have been torn down after exception", (Object)ExceptionThrowingOldFn.teardownCalled.get(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    @Category(value={ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testWithContextTeardownCalledAfterExceptionInFinishBundle() {
        ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
        ((PCollection)this.p.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}))).apply((PTransform)ParDo.of((DoFn)fn));
        try {
            this.p.run();
            Assert.fail((String)"Pipeline should have failed with an exception");
        }
        catch (Exception e) {
            Assert.assertThat((String)"Function should have been torn down after exception", (Object)ExceptionThrowingOldFn.teardownCalled.get(), (Matcher)Matchers.is((Object)true));
        }
    }

    private static enum MethodForException {
        SETUP,
        START_BUNDLE,
        PROCESS_ELEMENT,
        FINISH_BUNDLE;

    }

    private static class ExceptionThrowingFn
    extends DoFn<Object, Object> {
        static AtomicBoolean teardownCalled = new AtomicBoolean(false);
        private final MethodForException toThrow;
        private boolean thrown;

        private ExceptionThrowingFn(MethodForException toThrow) {
            this.toThrow = toThrow;
        }

        @DoFn.Setup
        public void before() throws Exception {
            this.throwIfNecessary(MethodForException.SETUP);
        }

        @DoFn.StartBundle
        public void preBundle() throws Exception {
            this.throwIfNecessary(MethodForException.START_BUNDLE);
        }

        @DoFn.ProcessElement
        public void perElement(DoFn.ProcessContext c) throws Exception {
            this.throwIfNecessary(MethodForException.PROCESS_ELEMENT);
        }

        @DoFn.FinishBundle
        public void postBundle() throws Exception {
            this.throwIfNecessary(MethodForException.FINISH_BUNDLE);
        }

        private void throwIfNecessary(MethodForException method) throws Exception {
            if (this.toThrow == method && !this.thrown) {
                this.thrown = true;
                throw new Exception("Hasn't yet thrown");
            }
        }

        @DoFn.Teardown
        public void after() {
            if (!this.thrown) {
                Assert.fail((String)"Excepted to have a processing method throw an exception");
            }
            teardownCalled.set(true);
        }
    }

    private static class ExceptionThrowingOldFn
    extends DoFn<Object, Object> {
        static AtomicBoolean teardownCalled = new AtomicBoolean(false);
        private final MethodForException toThrow;
        private boolean thrown;

        private ExceptionThrowingOldFn(MethodForException toThrow) {
            this.toThrow = toThrow;
        }

        @DoFn.Setup
        public void setup() throws Exception {
            this.throwIfNecessary(MethodForException.SETUP);
        }

        @DoFn.StartBundle
        public void startBundle() throws Exception {
            this.throwIfNecessary(MethodForException.START_BUNDLE);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            this.throwIfNecessary(MethodForException.PROCESS_ELEMENT);
        }

        @DoFn.FinishBundle
        public void finishBundle() throws Exception {
            this.throwIfNecessary(MethodForException.FINISH_BUNDLE);
        }

        private void throwIfNecessary(MethodForException method) throws Exception {
            if (this.toThrow == method && !this.thrown) {
                this.thrown = true;
                throw new Exception("Hasn't yet thrown");
            }
        }

        @DoFn.Teardown
        public void teardown() {
            if (!this.thrown) {
                Assert.fail((String)"Excepted to have a processing method throw an exception");
            }
            teardownCalled.set(true);
        }
    }

    private static class CallSequenceEnforcingStatefulFn<K, V>
    extends CallSequenceEnforcingDoFn<KV<K, V>> {
        private static final String STATE_ID = "foo";
        @DoFn.StateId(value="foo")
        private final StateSpec<ValueState<String>> valueSpec = StateSpecs.value();

        private CallSequenceEnforcingStatefulFn() {
        }
    }

    private static class CallSequenceEnforcingFn<T>
    extends DoFn<T, T> {
        private boolean setupCalled = false;
        private int startBundleCalls = 0;
        private int finishBundleCalls = 0;
        private boolean teardownCalled = false;

        private CallSequenceEnforcingFn() {
        }

        @DoFn.Setup
        public void before() {
            Assert.assertThat((String)"setup should not be called twice", (Object)this.setupCalled, (Matcher)Matchers.is((Object)false));
            Assert.assertThat((String)"setup should be called before startBundle", (Object)this.startBundleCalls, (Matcher)Matchers.equalTo((Object)0));
            Assert.assertThat((String)"setup should be called before finishBundle", (Object)this.finishBundleCalls, (Matcher)Matchers.equalTo((Object)0));
            Assert.assertThat((String)"setup should be called before teardown", (Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
            this.setupCalled = true;
        }

        @DoFn.StartBundle
        public void begin() {
            Assert.assertThat((String)"setup should have been called", (Object)this.setupCalled, (Matcher)Matchers.is((Object)true));
            Assert.assertThat((String)"Even number of startBundle and finishBundle calls in startBundle", (Object)this.startBundleCalls, (Matcher)Matchers.equalTo((Object)this.finishBundleCalls));
            Assert.assertThat((String)"teardown should not have been called", (Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
            ++this.startBundleCalls;
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c) throws Exception {
            Assert.assertThat((String)"startBundle should have been called", (Object)this.startBundleCalls, (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
            Assert.assertThat((String)"there should be one startBundle call with no call to finishBundle", (Object)this.startBundleCalls, (Matcher)Matchers.equalTo((Object)(this.finishBundleCalls + 1)));
            Assert.assertThat((String)"teardown should not have been called", (Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
        }

        @DoFn.FinishBundle
        public void end() {
            Assert.assertThat((String)"startBundle should have been called", (Object)this.startBundleCalls, (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
            Assert.assertThat((String)"there should be one bundle that has been started but not finished", (Object)this.startBundleCalls, (Matcher)Matchers.equalTo((Object)(this.finishBundleCalls + 1)));
            Assert.assertThat((String)"teardown should not have been called", (Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
            ++this.finishBundleCalls;
        }

        @DoFn.Teardown
        public void after() {
            Assert.assertThat((Object)this.setupCalled, (Matcher)Matchers.is((Object)true));
            Assert.assertThat((Object)this.startBundleCalls, (Matcher)Matchers.anyOf((Matcher[])new Matcher[]{Matchers.equalTo((Object)this.finishBundleCalls)}));
            Assert.assertThat((Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
            this.teardownCalled = true;
        }
    }

    private static class CallSequenceEnforcingDoFn<T>
    extends DoFn<T, T> {
        private boolean setupCalled = false;
        private int startBundleCalls = 0;
        private int finishBundleCalls = 0;
        private boolean teardownCalled = false;

        private CallSequenceEnforcingDoFn() {
        }

        @DoFn.Setup
        public void setup() {
            Assert.assertThat((String)"setup should not be called twice", (Object)this.setupCalled, (Matcher)Matchers.is((Object)false));
            Assert.assertThat((String)"setup should be called before startBundle", (Object)this.startBundleCalls, (Matcher)Matchers.equalTo((Object)0));
            Assert.assertThat((String)"setup should be called before finishBundle", (Object)this.finishBundleCalls, (Matcher)Matchers.equalTo((Object)0));
            Assert.assertThat((String)"setup should be called before teardown", (Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
            this.setupCalled = true;
        }

        @DoFn.StartBundle
        public void startBundle() {
            Assert.assertThat((String)"setup should have been called", (Object)this.setupCalled, (Matcher)Matchers.is((Object)true));
            Assert.assertThat((String)"Even number of startBundle and finishBundle calls in startBundle", (Object)this.startBundleCalls, (Matcher)Matchers.equalTo((Object)this.finishBundleCalls));
            Assert.assertThat((String)"teardown should not have been called", (Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
            ++this.startBundleCalls;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            Assert.assertThat((String)"startBundle should have been called", (Object)this.startBundleCalls, (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
            Assert.assertThat((String)"there should be one startBundle call with no call to finishBundle", (Object)this.startBundleCalls, (Matcher)Matchers.equalTo((Object)(this.finishBundleCalls + 1)));
            Assert.assertThat((String)"teardown should not have been called", (Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
        }

        @DoFn.FinishBundle
        public void finishBundle() {
            Assert.assertThat((String)"startBundle should have been called", (Object)this.startBundleCalls, (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
            Assert.assertThat((String)"there should be one bundle that has been started but not finished", (Object)this.startBundleCalls, (Matcher)Matchers.equalTo((Object)(this.finishBundleCalls + 1)));
            Assert.assertThat((String)"teardown should not have been called", (Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
            ++this.finishBundleCalls;
        }

        @DoFn.Teardown
        public void teardown() {
            Assert.assertThat((Object)this.setupCalled, (Matcher)Matchers.is((Object)true));
            Assert.assertThat((Object)this.startBundleCalls, (Matcher)Matchers.anyOf((Matcher[])new Matcher[]{Matchers.equalTo((Object)this.finishBundleCalls)}));
            Assert.assertThat((Object)this.teardownCalled, (Matcher)Matchers.is((Object)false));
            this.teardownCalled = true;
        }
    }
}

