package org.apache.beam.sdk.util.construction;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.util.construction.SplittableParDo;
import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValues;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/util/construction/SplittableParDoTest.class */
public class SplittableParDoTest {
    private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<String>() { // from class: org.apache.beam.sdk.util.construction.SplittableParDoTest.1
    };

    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/util/construction/SplittableParDoTest$BoundedFakeFn.class */
    private static class BoundedFakeFn extends DoFn<Integer, String> {
        private BoundedFakeFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, String>.ProcessContext processContext, RestrictionTracker<SomeRestriction, Void> restrictionTracker) {
        }

        @DoFn.GetInitialRestriction
        public SomeRestriction getInitialRestriction(@DoFn.Element Integer num) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/util/construction/SplittableParDoTest$FakeBoundedSource.class */
    private static class FakeBoundedSource extends BoundedSource<String> {
        private FakeBoundedSource() {
        }

        public List<? extends BoundedSource<String>> split(long j, PipelineOptions pipelineOptions) throws Exception {
            return Collections.singletonList(this);
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
            return 0L;
        }

        public BoundedSource.BoundedReader<String> createReader(PipelineOptions pipelineOptions) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Coder<String> getOutputCoder() {
            return StringUtf8Coder.of();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/util/construction/SplittableParDoTest$SomeRestriction.class */
    public static class SomeRestriction implements Serializable, HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {
        private SomeRestriction() {
        }

        /* renamed from: newTracker, reason: merged with bridge method [inline-methods] */
        public SomeRestrictionTracker m616newTracker() {
            return new SomeRestrictionTracker(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/util/construction/SplittableParDoTest$SomeRestrictionTracker.class */
    public static class SomeRestrictionTracker extends RestrictionTracker<SomeRestriction, Void> {
        private final SomeRestriction someRestriction;

        public SomeRestrictionTracker(SomeRestriction someRestriction) {
            this.someRestriction = someRestriction;
        }

        public boolean tryClaim(Void r3) {
            return false;
        }

        /* renamed from: currentRestriction, reason: merged with bridge method [inline-methods] */
        public SomeRestriction m617currentRestriction() {
            return this.someRestriction;
        }

        public SplitResult<SomeRestriction> trySplit(double d) {
            return SplitResult.of((Object) null, this.someRestriction);
        }

        public void checkDone() {
        }

        public RestrictionTracker.IsBounded isBounded() {
            return RestrictionTracker.IsBounded.BOUNDED;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/util/construction/SplittableParDoTest$UnboundedFakeFn.class */
    private static class UnboundedFakeFn extends DoFn<Integer, String> {
        private UnboundedFakeFn() {
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation processElement(DoFn<Integer, String>.ProcessContext processContext, RestrictionTracker<SomeRestriction, Void> restrictionTracker) {
            return DoFn.ProcessContinuation.stop();
        }

        @DoFn.GetInitialRestriction
        public SomeRestriction getInitialRestriction(@DoFn.Element Integer num) {
            return null;
        }
    }

    private static PCollection<Integer> makeUnboundedCollection(Pipeline pipeline) {
        return pipeline.apply("unbounded", Create.of(1, new Integer[]{2, 3})).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
    }

    private static PCollection<Integer> makeBoundedCollection(Pipeline pipeline) {
        return pipeline.apply("bounded", Create.of(1, new Integer[]{2, 3})).setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
    }

    private PCollection<String> applySplittableParDo(String str, PCollection<Integer> pCollection, DoFn<Integer, String> doFn) {
        ParDo.MultiOutput withOutputTags = ParDo.of(doFn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
        PCollectionTuple expand = withOutputTags.expand(pCollection);
        expand.get(MAIN_OUTPUT_TAG).setName("main");
        return pCollection.apply(str, SplittableParDo.forAppliedParDo(AppliedPTransform.of("ParDo", PValues.expandInput(pCollection), PValues.expandOutput(expand), withOutputTags, ResourceHints.create(), this.pipeline))).get(MAIN_OUTPUT_TAG);
    }

    @Test
    public void testBoundednessForBoundedFn() {
        this.pipeline.enableAbandonedNodeEnforcement(false);
        BoundedFakeFn boundedFakeFn = new BoundedFakeFn();
        Assert.assertEquals("Applying a bounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.BOUNDED, applySplittableParDo("bounded to bounded", makeBoundedCollection(this.pipeline), boundedFakeFn).isBounded());
        Assert.assertEquals("Applying a bounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, applySplittableParDo("bounded to unbounded", makeUnboundedCollection(this.pipeline), boundedFakeFn).isBounded());
    }

    @Test
    public void testBoundednessForUnboundedFn() {
        this.pipeline.enableAbandonedNodeEnforcement(false);
        UnboundedFakeFn unboundedFakeFn = new UnboundedFakeFn();
        Assert.assertEquals("Applying an unbounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.UNBOUNDED, applySplittableParDo("unbounded to bounded", makeBoundedCollection(this.pipeline), unboundedFakeFn).isBounded());
        Assert.assertEquals("Applying an unbounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, applySplittableParDo("unbounded to unbounded", makeUnboundedCollection(this.pipeline), unboundedFakeFn).isBounded());
    }

    @Test
    public void testConvertIsSkippedWhenUsingUseSDFRead() {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(CrashingRunner.class);
        ExperimentalOptions.addExperiment(create.as(ExperimentalOptions.class), "use_sdf_read");
        Pipeline create2 = Pipeline.create(create);
        create2.apply(Read.from(new FakeBoundedSource()));
        create2.apply(Read.from(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(new FakeBoundedSource())));
        SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(create2);
        create2.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() { // from class: org.apache.beam.sdk.util.construction.SplittableParDoTest.2
            public void visitPrimitiveTransform(TransformHierarchy.Node node) {
                MatcherAssert.assertThat(node.getTransform(), Matchers.not(Matchers.instanceOf(SplittableParDo.PrimitiveBoundedRead.class)));
                MatcherAssert.assertThat(node.getTransform(), Matchers.not(Matchers.instanceOf(SplittableParDo.PrimitiveUnboundedRead.class)));
            }
        });
    }

    @Test
    public void testConvertIsSkippedWhenUsingUseUnboundedSDFWrapper() {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(CrashingRunner.class);
        ExperimentalOptions.addExperiment(create.as(ExperimentalOptions.class), "use_unbounded_sdf_wrapper");
        Pipeline create2 = Pipeline.create(create);
        create2.apply(Read.from(new FakeBoundedSource()));
        create2.apply(Read.from(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(new FakeBoundedSource())));
        SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(create2);
        create2.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() { // from class: org.apache.beam.sdk.util.construction.SplittableParDoTest.3
            public void visitPrimitiveTransform(TransformHierarchy.Node node) {
                MatcherAssert.assertThat(node.getTransform(), Matchers.not(Matchers.instanceOf(SplittableParDo.PrimitiveBoundedRead.class)));
                MatcherAssert.assertThat(node.getTransform(), Matchers.not(Matchers.instanceOf(SplittableParDo.PrimitiveUnboundedRead.class)));
            }
        });
    }

    @Test
    public void testConvertToPrimitiveReadsHappen() {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(CrashingRunner.class);
        ExperimentalOptions.addExperiment(create.as(ExperimentalOptions.class), "use_deprecated_read");
        Pipeline create2 = Pipeline.create(create);
        create2.apply(Read.from(new FakeBoundedSource()));
        create2.apply(Read.from(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(new FakeBoundedSource())));
        SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(create2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        create2.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() { // from class: org.apache.beam.sdk.util.construction.SplittableParDoTest.4
            public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
                MatcherAssert.assertThat(node.getTransform(), Matchers.not(Matchers.instanceOf(Read.Bounded.class)));
                MatcherAssert.assertThat(node.getTransform(), Matchers.not(Matchers.instanceOf(Read.Unbounded.class)));
                return super.enterCompositeTransform(node);
            }

            public void visitPrimitiveTransform(TransformHierarchy.Node node) {
                if (node.getTransform() instanceof SplittableParDo.PrimitiveBoundedRead) {
                    atomicBoolean.set(true);
                } else if (node.getTransform() instanceof SplittableParDo.PrimitiveUnboundedRead) {
                    atomicBoolean2.set(true);
                }
            }
        });
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(atomicBoolean2.get());
    }
}
