package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesParDoLifecycle;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoLifecycleTest.class */
public class ParDoLifecycleTest implements Serializable {

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoLifecycleTest$CallSequenceEnforcingDoFn.class */
    private static class CallSequenceEnforcingDoFn<T> extends DoFn<T, T> {
        private boolean setupCalled;
        private int startBundleCalls;
        private int finishBundleCalls;
        private boolean teardownCalled;

        private CallSequenceEnforcingDoFn() {
            this.setupCalled = false;
            this.startBundleCalls = 0;
            this.finishBundleCalls = 0;
            this.teardownCalled = false;
        }

        @DoFn.Setup
        public void setup() {
            Assert.assertThat("setup should not be called twice", Boolean.valueOf(this.setupCalled), Matchers.is(false));
            Assert.assertThat("setup should be called before startBundle", Integer.valueOf(this.startBundleCalls), Matchers.equalTo(0));
            Assert.assertThat("setup should be called before finishBundle", Integer.valueOf(this.finishBundleCalls), Matchers.equalTo(0));
            Assert.assertThat("setup should be called before teardown", Boolean.valueOf(this.teardownCalled), Matchers.is(false));
            this.setupCalled = true;
        }

        @DoFn.StartBundle
        public void startBundle() {
            Assert.assertThat("setup should have been called", Boolean.valueOf(this.setupCalled), Matchers.is(true));
            Assert.assertThat("Even number of startBundle and finishBundle calls in startBundle", Integer.valueOf(this.startBundleCalls), Matchers.equalTo(Integer.valueOf(this.finishBundleCalls)));
            Assert.assertThat("teardown should not have been called", Boolean.valueOf(this.teardownCalled), Matchers.is(false));
            this.startBundleCalls++;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, T>.ProcessContext processContext) throws Exception {
            Assert.assertThat("startBundle should have been called", Integer.valueOf(this.startBundleCalls), Matchers.greaterThan(0));
            Assert.assertThat("there should be one startBundle call with no call to finishBundle", Integer.valueOf(this.startBundleCalls), Matchers.equalTo(Integer.valueOf(this.finishBundleCalls + 1)));
            Assert.assertThat("teardown should not have been called", Boolean.valueOf(this.teardownCalled), Matchers.is(false));
        }

        @DoFn.FinishBundle
        public void finishBundle() {
            Assert.assertThat("startBundle should have been called", Integer.valueOf(this.startBundleCalls), Matchers.greaterThan(0));
            Assert.assertThat("there should be one bundle that has been started but not finished", Integer.valueOf(this.startBundleCalls), Matchers.equalTo(Integer.valueOf(this.finishBundleCalls + 1)));
            Assert.assertThat("teardown should not have been called", Boolean.valueOf(this.teardownCalled), Matchers.is(false));
            this.finishBundleCalls++;
        }

        @DoFn.Teardown
        public void teardown() {
            Assert.assertThat(Boolean.valueOf(this.setupCalled), Matchers.is(true));
            Assert.assertThat(Integer.valueOf(this.startBundleCalls), Matchers.anyOf(new Matcher[]{Matchers.equalTo(Integer.valueOf(this.finishBundleCalls))}));
            Assert.assertThat(Boolean.valueOf(this.teardownCalled), Matchers.is(false));
            this.teardownCalled = true;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoLifecycleTest$CallSequenceEnforcingFn.class */
    private static class CallSequenceEnforcingFn<T> extends DoFn<T, T> {
        private boolean setupCalled;
        private int startBundleCalls;
        private int finishBundleCalls;
        private boolean teardownCalled;

        private CallSequenceEnforcingFn() {
            this.setupCalled = false;
            this.startBundleCalls = 0;
            this.finishBundleCalls = 0;
            this.teardownCalled = false;
        }

        @DoFn.Setup
        public void before() {
            Assert.assertThat("setup should not be called twice", Boolean.valueOf(this.setupCalled), Matchers.is(false));
            Assert.assertThat("setup should be called before startBundle", Integer.valueOf(this.startBundleCalls), Matchers.equalTo(0));
            Assert.assertThat("setup should be called before finishBundle", Integer.valueOf(this.finishBundleCalls), Matchers.equalTo(0));
            Assert.assertThat("setup should be called before teardown", Boolean.valueOf(this.teardownCalled), Matchers.is(false));
            this.setupCalled = true;
        }

        @DoFn.StartBundle
        public void begin() {
            Assert.assertThat("setup should have been called", Boolean.valueOf(this.setupCalled), Matchers.is(true));
            Assert.assertThat("Even number of startBundle and finishBundle calls in startBundle", Integer.valueOf(this.startBundleCalls), Matchers.equalTo(Integer.valueOf(this.finishBundleCalls)));
            Assert.assertThat("teardown should not have been called", Boolean.valueOf(this.teardownCalled), Matchers.is(false));
            this.startBundleCalls++;
        }

        @DoFn.ProcessElement
        public void process(DoFn<T, T>.ProcessContext processContext) throws Exception {
            Assert.assertThat("startBundle should have been called", Integer.valueOf(this.startBundleCalls), Matchers.greaterThan(0));
            Assert.assertThat("there should be one startBundle call with no call to finishBundle", Integer.valueOf(this.startBundleCalls), Matchers.equalTo(Integer.valueOf(this.finishBundleCalls + 1)));
            Assert.assertThat("teardown should not have been called", Boolean.valueOf(this.teardownCalled), Matchers.is(false));
        }

        @DoFn.FinishBundle
        public void end() {
            Assert.assertThat("startBundle should have been called", Integer.valueOf(this.startBundleCalls), Matchers.greaterThan(0));
            Assert.assertThat("there should be one bundle that has been started but not finished", Integer.valueOf(this.startBundleCalls), Matchers.equalTo(Integer.valueOf(this.finishBundleCalls + 1)));
            Assert.assertThat("teardown should not have been called", Boolean.valueOf(this.teardownCalled), Matchers.is(false));
            this.finishBundleCalls++;
        }

        @DoFn.Teardown
        public void after() {
            Assert.assertThat(Boolean.valueOf(this.setupCalled), Matchers.is(true));
            Assert.assertThat(Integer.valueOf(this.startBundleCalls), Matchers.anyOf(new Matcher[]{Matchers.equalTo(Integer.valueOf(this.finishBundleCalls))}));
            Assert.assertThat(Boolean.valueOf(this.teardownCalled), Matchers.is(false));
            this.teardownCalled = true;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoLifecycleTest$CallSequenceEnforcingStatefulFn.class */
    private static class CallSequenceEnforcingStatefulFn<K, V> extends CallSequenceEnforcingDoFn<KV<K, V>> {
        private static final String STATE_ID = "foo";

        @DoFn.StateId("foo")
        private final StateSpec<ValueState<String>> valueSpec;

        private CallSequenceEnforcingStatefulFn() {
            super();
            this.valueSpec = StateSpecs.value();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoLifecycleTest$ExceptionThrowingFn.class */
    private static class ExceptionThrowingFn extends DoFn<Object, Object> {
        static AtomicBoolean teardownCalled = new AtomicBoolean(false);
        private final MethodForException toThrow;
        private boolean thrown;

        private ExceptionThrowingFn(MethodForException methodForException) {
            this.toThrow = methodForException;
        }

        @DoFn.Setup
        public void before() throws Exception {
            throwIfNecessary(MethodForException.SETUP);
        }

        @DoFn.StartBundle
        public void preBundle() throws Exception {
            throwIfNecessary(MethodForException.START_BUNDLE);
        }

        @DoFn.ProcessElement
        public void perElement(DoFn<Object, Object>.ProcessContext processContext) throws Exception {
            throwIfNecessary(MethodForException.PROCESS_ELEMENT);
        }

        @DoFn.FinishBundle
        public void postBundle() throws Exception {
            throwIfNecessary(MethodForException.FINISH_BUNDLE);
        }

        private void throwIfNecessary(MethodForException methodForException) throws Exception {
            if (this.toThrow != methodForException || this.thrown) {
                return;
            }
            this.thrown = true;
            throw new Exception("Hasn't yet thrown");
        }

        @DoFn.Teardown
        public void after() {
            if (!this.thrown) {
                Assert.fail("Excepted to have a processing method throw an exception");
            }
            teardownCalled.set(true);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoLifecycleTest$ExceptionThrowingOldFn.class */
    private static class ExceptionThrowingOldFn extends DoFn<Object, Object> {
        static AtomicBoolean teardownCalled = new AtomicBoolean(false);
        private final MethodForException toThrow;
        private boolean thrown;

        private ExceptionThrowingOldFn(MethodForException methodForException) {
            this.toThrow = methodForException;
        }

        @DoFn.Setup
        public void setup() throws Exception {
            throwIfNecessary(MethodForException.SETUP);
        }

        @DoFn.StartBundle
        public void startBundle() throws Exception {
            throwIfNecessary(MethodForException.START_BUNDLE);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Object, Object>.ProcessContext processContext) throws Exception {
            throwIfNecessary(MethodForException.PROCESS_ELEMENT);
        }

        @DoFn.FinishBundle
        public void finishBundle() throws Exception {
            throwIfNecessary(MethodForException.FINISH_BUNDLE);
        }

        private void throwIfNecessary(MethodForException methodForException) throws Exception {
            if (this.toThrow != methodForException || this.thrown) {
                return;
            }
            this.thrown = true;
            throw new Exception("Hasn't yet thrown");
        }

        @DoFn.Teardown
        public void teardown() {
            if (!this.thrown) {
                Assert.fail("Excepted to have a processing method throw an exception");
            }
            teardownCalled.set(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoLifecycleTest$MethodForException.class */
    public enum MethodForException {
        SETUP,
        START_BUNDLE,
        PROCESS_ELEMENT,
        FINISH_BUNDLE
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testFnCallSequence() {
        PCollectionList.of(this.p.apply("Impolite", Create.of(1, new Integer[]{2, 4}))).and(this.p.apply("Polite", Create.of(3, new Integer[]{5, 6, 7}))).apply(Flatten.pCollections()).apply(ParDo.of(new CallSequenceEnforcingFn()));
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testFnCallSequenceMulti() {
        PCollectionList.of(this.p.apply("Impolite", Create.of(1, new Integer[]{2, 4}))).and(this.p.apply("Polite", Create.of(3, new Integer[]{5, 6, 7}))).apply(Flatten.pCollections()).apply(ParDo.of(new CallSequenceEnforcingFn()).withOutputTags(new TupleTag<Integer>() { // from class: org.apache.beam.sdk.transforms.ParDoLifecycleTest.1
        }, TupleTagList.empty()));
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesParDoLifecycle.class})
    public void testFnCallSequenceStateful() {
        PCollectionList.of(this.p.apply("Impolite", Create.of(KV.of("a", 1), new KV[]{KV.of("b", 2), KV.of("a", 4)}))).and(this.p.apply("Polite", Create.of(KV.of("b", 3), new KV[]{KV.of("a", 5), KV.of("c", 6), KV.of("c", 7)}))).apply(Flatten.pCollections()).apply(ParDo.of(new CallSequenceEnforcingStatefulFn()).withOutputTags(new TupleTag<KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ParDoLifecycleTest.2
        }, TupleTagList.empty()));
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testTeardownCalledAfterExceptionInStartBundle() {
        this.p.apply(Create.of(1, new Integer[]{2, 3})).apply(ParDo.of(new ExceptionThrowingOldFn(MethodForException.START_BUNDLE)));
        try {
            this.p.run();
            Assert.fail("Pipeline should have failed with an exception");
        } catch (Exception e) {
            Assert.assertThat("Function should have been torn down after exception", Boolean.valueOf(ExceptionThrowingOldFn.teardownCalled.get()), Matchers.is(true));
        }
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testTeardownCalledAfterExceptionInProcessElement() {
        this.p.apply(Create.of(1, new Integer[]{2, 3})).apply(ParDo.of(new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT)));
        try {
            this.p.run();
            Assert.fail("Pipeline should have failed with an exception");
        } catch (Exception e) {
            Assert.assertThat("Function should have been torn down after exception", Boolean.valueOf(ExceptionThrowingOldFn.teardownCalled.get()), Matchers.is(true));
        }
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testTeardownCalledAfterExceptionInFinishBundle() {
        this.p.apply(Create.of(1, new Integer[]{2, 3})).apply(ParDo.of(new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE)));
        try {
            this.p.run();
            Assert.fail("Pipeline should have failed with an exception");
        } catch (Exception e) {
            Assert.assertThat("Function should have been torn down after exception", Boolean.valueOf(ExceptionThrowingOldFn.teardownCalled.get()), Matchers.is(true));
        }
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testWithContextTeardownCalledAfterExceptionInSetup() {
        this.p.apply(Create.of(1, new Integer[]{2, 3})).apply(ParDo.of(new ExceptionThrowingOldFn(MethodForException.SETUP)));
        try {
            this.p.run();
            Assert.fail("Pipeline should have failed with an exception");
        } catch (Exception e) {
            Assert.assertThat("Function should have been torn down after exception", Boolean.valueOf(ExceptionThrowingOldFn.teardownCalled.get()), Matchers.is(true));
        }
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testWithContextTeardownCalledAfterExceptionInStartBundle() {
        this.p.apply(Create.of(1, new Integer[]{2, 3})).apply(ParDo.of(new ExceptionThrowingOldFn(MethodForException.START_BUNDLE)));
        try {
            this.p.run();
            Assert.fail("Pipeline should have failed with an exception");
        } catch (Exception e) {
            Assert.assertThat("Function should have been torn down after exception", Boolean.valueOf(ExceptionThrowingOldFn.teardownCalled.get()), Matchers.is(true));
        }
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testWithContextTeardownCalledAfterExceptionInProcessElement() {
        this.p.apply(Create.of(1, new Integer[]{2, 3})).apply(ParDo.of(new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT)));
        try {
            this.p.run();
            Assert.fail("Pipeline should have failed with an exception");
        } catch (Exception e) {
            Assert.assertThat("Function should have been torn down after exception", Boolean.valueOf(ExceptionThrowingOldFn.teardownCalled.get()), Matchers.is(true));
        }
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class})
    public void testWithContextTeardownCalledAfterExceptionInFinishBundle() {
        this.p.apply(Create.of(1, new Integer[]{2, 3})).apply(ParDo.of(new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE)));
        try {
            this.p.run();
            Assert.fail("Pipeline should have failed with an exception");
        } catch (Exception e) {
            Assert.assertThat("Function should have been torn down after exception", Boolean.valueOf(ExceptionThrowingOldFn.teardownCalled.get()), Matchers.is(true));
        }
    }
}
