package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesBoundedSplittableParDo;
import org.apache.beam.sdk.testing.UsesBundleFinalizer;
import org.apache.beam.sdk.testing.UsesParDoLifecycle;
import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDoTest;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest.class */
public class SplittableDoFnTest implements Serializable {

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$BundleFinalizingSplittableDoFn.class */
    public static class BundleFinalizingSplittableDoFn extends DoFn<String, String> {
        private static final long MAX_ATTEMPTS = 3000;
        private static final Map<UUID, AtomicBoolean> WAS_FINALIZED = new HashMap();
        private final UUID uuid = UUID.randomUUID();

        @DoFn.NewTracker
        public RestrictionTracker<OffsetRange, Long> newTracker(@DoFn.Restriction OffsetRange offsetRange) {
            return new OffsetRangeTracker(offsetRange) { // from class: org.apache.beam.sdk.transforms.SplittableDoFnTest.BundleFinalizingSplittableDoFn.1
                public SplitResult<OffsetRange> trySplit(double d) {
                    return super.trySplit(0.0d);
                }
            };
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(@DoFn.Element String str, DoFn.OutputReceiver<String> outputReceiver, RestrictionTracker<OffsetRange, Long> restrictionTracker, DoFn.BundleFinalizer bundleFinalizer) throws InterruptedException {
            if (WAS_FINALIZED.computeIfAbsent(this.uuid, uuid -> {
                return new AtomicBoolean();
            }).get()) {
                restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getFrom() + 1));
                outputReceiver.output(str);
                restrictionTracker.tryClaim(Long.MAX_VALUE);
                return DoFn.ProcessContinuation.stop();
            }
            if (!restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getFrom() + 1))) {
                return DoFn.ProcessContinuation.stop();
            }
            bundleFinalizer.afterBundleCommit(Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)), () -> {
                WAS_FINALIZED.computeIfAbsent(this.uuid, uuid2 -> {
                    return new AtomicBoolean();
                }).set(true);
            });
            Thread.sleep(100L);
            return DoFn.ProcessContinuation.resume();
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction() {
            return new OffsetRange(0L, MAX_ATTEMPTS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$PairStringWithIndexToLengthBase.class */
    public static class PairStringWithIndexToLengthBase extends DoFn<String, KV<String, Integer>> {
        PairStringWithIndexToLengthBase() {
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn<String, KV<String, Integer>>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
            long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
            long j = 0;
            while (true) {
                long j2 = j;
                if (!restrictionTracker.tryClaim(Long.valueOf(from))) {
                    return DoFn.ProcessContinuation.stop();
                }
                processContext.output(KV.of((String) processContext.element(), Integer.valueOf((int) from)));
                if (j2 % 3 == 0) {
                    return DoFn.ProcessContinuation.resume();
                }
                from++;
                j = j2 + 1;
            }
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRange(@DoFn.Element String str) {
            return new OffsetRange(0L, str.length());
        }

        @DoFn.SplitRestriction
        public void splitRange(@DoFn.Restriction OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRange> outputReceiver) {
            outputReceiver.output(new OffsetRange(offsetRange.getFrom(), (offsetRange.getFrom() + offsetRange.getTo()) / 2));
            outputReceiver.output(new OffsetRange((offsetRange.getFrom() + offsetRange.getTo()) / 2, offsetRange.getTo()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$PairStringWithIndexToLengthBounded.class */
    public static class PairStringWithIndexToLengthBounded extends PairStringWithIndexToLengthBase {
        PairStringWithIndexToLengthBounded() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$PairStringWithIndexToLengthUnbounded.class */
    public static class PairStringWithIndexToLengthUnbounded extends PairStringWithIndexToLengthBase {
        PairStringWithIndexToLengthUnbounded() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithAdditionalOutputBase.class */
    public static class SDFWithAdditionalOutputBase extends DoFn<Integer, String> {
        private final TupleTag<String> additionalOutput;

        private SDFWithAdditionalOutputBase(TupleTag<String> tupleTag) {
            this.additionalOutput = tupleTag;
        }

        @DoFn.ProcessElement
        public void process(DoFn<Integer, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
            Preconditions.checkState(restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getFrom())));
            processContext.output("main:" + processContext.element());
            processContext.output(this.additionalOutput, "additional:" + processContext.element());
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction() {
            return new OffsetRange(0L, 1L);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithAdditionalOutputBounded.class */
    public static class SDFWithAdditionalOutputBounded extends SDFWithAdditionalOutputBase {
        private SDFWithAdditionalOutputBounded(TupleTag<String> tupleTag) {
            super(tupleTag);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithAdditionalOutputUnbounded.class */
    public static class SDFWithAdditionalOutputUnbounded extends SDFWithAdditionalOutputBase {
        private SDFWithAdditionalOutputUnbounded(TupleTag<String> tupleTag) {
            super(tupleTag);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithLifecycleBase.class */
    public static class SDFWithLifecycleBase extends DoFn<String, String> {
        private transient State state;

        /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithLifecycleBase$State.class */
        private enum State {
            OUTSIDE_BUNDLE,
            INSIDE_BUNDLE,
            TORN_DOWN
        }

        private SDFWithLifecycleBase() {
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction() {
            Assert.assertEquals(State.OUTSIDE_BUNDLE, this.state);
            return new OffsetRange(0L, 1L);
        }

        @DoFn.SplitRestriction
        public void splitRestriction(@DoFn.Restriction OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRange> outputReceiver) {
            Assert.assertEquals(State.OUTSIDE_BUNDLE, this.state);
            outputReceiver.output(offsetRange);
        }

        @DoFn.Setup
        public void setUp() {
            Assert.assertEquals((Object) null, this.state);
            this.state = State.OUTSIDE_BUNDLE;
        }

        @DoFn.StartBundle
        public void startBundle() {
            Assert.assertEquals(State.OUTSIDE_BUNDLE, this.state);
            this.state = State.INSIDE_BUNDLE;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
            Assert.assertEquals(State.INSIDE_BUNDLE, this.state);
            Assert.assertTrue(restrictionTracker.tryClaim(0L));
            processContext.output((String) processContext.element());
        }

        @DoFn.FinishBundle
        public void finishBundle() {
            Assert.assertEquals(State.INSIDE_BUNDLE, this.state);
            this.state = State.OUTSIDE_BUNDLE;
        }

        @DoFn.Teardown
        public void tearDown() {
            Assert.assertEquals(State.OUTSIDE_BUNDLE, this.state);
            this.state = State.TORN_DOWN;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithLifecycleBounded.class */
    public static class SDFWithLifecycleBounded extends SDFWithLifecycleBase {
        private SDFWithLifecycleBounded() {
            super();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithLifecycleUnbounded.class */
    public static class SDFWithLifecycleUnbounded extends SDFWithLifecycleBase {
        private SDFWithLifecycleUnbounded() {
            super();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithMultipleOutputsPerBlockAndSideInputBase.class */
    public static class SDFWithMultipleOutputsPerBlockAndSideInputBase extends DoFn<Integer, KV<String, Integer>> {
        private static final int MAX_INDEX = 98765;
        private final Map<Instant, String> expectedSideInputValues;
        private final int numClaimsPerCall;

        SDFWithMultipleOutputsPerBlockAndSideInputBase(Map<Instant, String> map, int i) {
            this.expectedSideInputValues = map;
            this.numClaimsPerCall = i;
        }

        private static int snapToNextBlock(int i, int[] iArr) {
            for (int i2 = 1; i2 < iArr.length; i2++) {
                if (i > iArr[i2 - 1] && i <= iArr[i2]) {
                    return i2;
                }
            }
            throw new IllegalStateException("Shouldn't get here");
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation processElement(DoFn<Integer, KV<String, Integer>>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker, @DoFn.SideInput("sideInput") String str) {
            int[] iArr = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
            int snapToNextBlock = snapToNextBlock((int) ((OffsetRange) restrictionTracker.currentRestriction()).getFrom(), iArr);
            int i = 1;
            while (restrictionTracker.tryClaim(Long.valueOf(iArr[snapToNextBlock]))) {
                for (int i2 = iArr[snapToNextBlock]; i2 < iArr[snapToNextBlock + 1]; i2++) {
                    processContext.output(KV.of(str + ":" + processContext.element(), Integer.valueOf(i2)));
                }
                if (i == this.numClaimsPerCall) {
                    return DoFn.ProcessContinuation.resume();
                }
                snapToNextBlock++;
                i++;
            }
            return DoFn.ProcessContinuation.stop();
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction(@DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return new OffsetRange(0L, 98765L);
        }

        @DoFn.GetInitialWatermarkEstimatorState
        public Instant getInitialWatermarkEstimatorState(@DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return instant;
        }

        @DoFn.GetSize
        public double getSize(@DoFn.Restriction OffsetRange offsetRange, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return offsetRange.getTo() - offsetRange.getFrom();
        }

        @DoFn.SplitRestriction
        public void splitRestriction(@DoFn.Restriction OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRange> outputReceiver, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            outputReceiver.output(offsetRange);
        }

        @DoFn.TruncateRestriction
        public RestrictionTracker.TruncateResult<OffsetRange> truncate(@DoFn.Restriction OffsetRange offsetRange, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return RestrictionTracker.TruncateResult.of(offsetRange);
        }

        @DoFn.NewTracker
        public RestrictionTracker<OffsetRange, Long> newTracker(@DoFn.Restriction OffsetRange offsetRange, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return new OffsetRangeTracker(offsetRange);
        }

        @DoFn.NewWatermarkEstimator
        public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant2) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant2, str));
            return new WatermarkEstimators.MonotonicallyIncreasing(instant);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithMultipleOutputsPerBlockAndSideInputBounded.class */
    public static class SDFWithMultipleOutputsPerBlockAndSideInputBounded extends SDFWithMultipleOutputsPerBlockAndSideInputBase {
        private SDFWithMultipleOutputsPerBlockAndSideInputBounded(Map<Instant, String> map, int i) {
            super(map, i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithMultipleOutputsPerBlockAndSideInputUnbounded.class */
    public static class SDFWithMultipleOutputsPerBlockAndSideInputUnbounded extends SDFWithMultipleOutputsPerBlockAndSideInputBase {
        private SDFWithMultipleOutputsPerBlockAndSideInputUnbounded(Map<Instant, String> map, int i) {
            super(map, i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithMultipleOutputsPerBlockBase.class */
    public static class SDFWithMultipleOutputsPerBlockBase extends DoFn<String, Integer> {
        private static final int MAX_INDEX = 98765;
        private final int numClaimsPerCall;

        private SDFWithMultipleOutputsPerBlockBase(int i) {
            this.numClaimsPerCall = i;
        }

        private static int snapToNextBlock(int i, int[] iArr) {
            for (int i2 = 1; i2 < iArr.length; i2++) {
                if (i > iArr[i2 - 1] && i <= iArr[i2]) {
                    return i2;
                }
            }
            throw new IllegalStateException("Shouldn't get here");
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation processElement(DoFn<String, Integer>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
            int[] iArr = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
            int snapToNextBlock = snapToNextBlock((int) ((OffsetRange) restrictionTracker.currentRestriction()).getFrom(), iArr);
            int i = 1;
            while (restrictionTracker.tryClaim(Long.valueOf(iArr[snapToNextBlock]))) {
                for (int i2 = iArr[snapToNextBlock]; i2 < iArr[snapToNextBlock + 1]; i2++) {
                    processContext.output(Integer.valueOf(i2));
                }
                if (i == this.numClaimsPerCall) {
                    return DoFn.ProcessContinuation.resume();
                }
                snapToNextBlock++;
                i++;
            }
            return DoFn.ProcessContinuation.stop();
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRange() {
            return new OffsetRange(0L, 98765L);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithMultipleOutputsPerBlockBounded.class */
    public static class SDFWithMultipleOutputsPerBlockBounded extends SDFWithMultipleOutputsPerBlockBase {
        SDFWithMultipleOutputsPerBlockBounded(int i) {
            super(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithMultipleOutputsPerBlockUnbounded.class */
    public static class SDFWithMultipleOutputsPerBlockUnbounded extends SDFWithMultipleOutputsPerBlockBase {
        SDFWithMultipleOutputsPerBlockUnbounded(int i) {
            super(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithSideInputBase.class */
    public static class SDFWithSideInputBase extends DoFn<Integer, String> {
        private final Map<Instant, String> expectedSideInputValues;

        SDFWithSideInputBase(Map<Instant, String> map) {
            this.expectedSideInputValues = map;
        }

        @DoFn.ProcessElement
        public void process(DoFn<Integer, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker, @DoFn.SideInput("sideInput") String str) {
            Preconditions.checkState(restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getFrom())));
            processContext.output(str + ":" + processContext.element());
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction(@DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return new OffsetRange(0L, 1L);
        }

        @DoFn.GetInitialWatermarkEstimatorState
        public Instant getInitialWatermarkEstimatorState(@DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return instant;
        }

        @DoFn.GetSize
        public double getSize(@DoFn.Restriction OffsetRange offsetRange, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return offsetRange.getTo() - offsetRange.getFrom();
        }

        @DoFn.SplitRestriction
        public void splitRestriction(@DoFn.Restriction OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRange> outputReceiver, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            outputReceiver.output(offsetRange);
        }

        @DoFn.TruncateRestriction
        public RestrictionTracker.TruncateResult<OffsetRange> truncate(@DoFn.Restriction OffsetRange offsetRange, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return RestrictionTracker.TruncateResult.of(offsetRange);
        }

        @DoFn.NewTracker
        public RestrictionTracker<OffsetRange, Long> newTracker(@DoFn.Restriction OffsetRange offsetRange, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant, str));
            return new OffsetRangeTracker(offsetRange);
        }

        @DoFn.NewWatermarkEstimator
        public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant, @DoFn.SideInput("sideInput") String str, @DoFn.Timestamp Instant instant2) {
            MatcherAssert.assertThat(this.expectedSideInputValues, Matchers.hasEntry(instant2, str));
            return new WatermarkEstimators.MonotonicallyIncreasing(instant);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithSideInputBounded.class */
    public static class SDFWithSideInputBounded extends SDFWithSideInputBase {
        SDFWithSideInputBounded(Map<Instant, String> map) {
            super(map);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithSideInputUnbounded.class */
    public static class SDFWithSideInputUnbounded extends SDFWithSideInputBase {
        SDFWithSideInputUnbounded(Map<Instant, String> map) {
            super(map);
        }
    }

    private static PairStringWithIndexToLengthBase pairStringWithIndexToLengthFn(PCollection.IsBounded isBounded) {
        return isBounded == PCollection.IsBounded.BOUNDED ? new PairStringWithIndexToLengthBounded() : new PairStringWithIndexToLengthUnbounded();
    }

    @Test
    @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
    public void testPairWithIndexBasicBounded() {
        testPairWithIndexBasic(PCollection.IsBounded.BOUNDED);
    }

    @Test
    @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
    public void testPairWithIndexBasicUnbounded() {
        testPairWithIndexBasic(PCollection.IsBounded.UNBOUNDED);
    }

    private void testPairWithIndexBasic(PCollection.IsBounded isBounded) {
        PAssert.that(this.p.apply(Create.of("a", new String[]{"bb", "ccccc"})).apply(ParDo.of(pairStringWithIndexToLengthFn(isBounded))).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))).containsInAnyOrder(Arrays.asList(KV.of("a", 0), KV.of("bb", 0), KV.of("bb", 1), KV.of("ccccc", 0), KV.of("ccccc", 1), KV.of("ccccc", 2), KV.of("ccccc", 3), KV.of("ccccc", 4)));
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
    public void testPairWithIndexWindowedTimestampedBounded() {
        testPairWithIndexWindowedTimestamped(PCollection.IsBounded.BOUNDED);
    }

    @Test
    @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
    public void testPairWithIndexWindowedTimestampedUnbounded() {
        testPairWithIndexWindowedTimestamped(PCollection.IsBounded.UNBOUNDED);
    }

    private void testPairWithIndexWindowedTimestamped(PCollection.IsBounded isBounded) {
        MutableDateTime mutableDateTime = Instant.now().toMutableDateTime();
        mutableDateTime.setMillisOfSecond(0);
        Instant instant = mutableDateTime.toInstant();
        Instant plus = instant.plus(Duration.standardSeconds(1L));
        Instant plus2 = instant.plus(Duration.standardSeconds(2L));
        SlidingWindows every = SlidingWindows.of(Duration.standardSeconds(5L)).every(Duration.standardSeconds(1L));
        PCollection coder = this.p.apply(Create.timestamped(TimestampedValue.of("a", instant), new TimestampedValue[]{TimestampedValue.of("bb", plus), TimestampedValue.of("ccccc", plus2)})).apply(Window.into(every)).apply(ParDo.of(pairStringWithIndexToLengthFn(isBounded))).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
        Assert.assertEquals(every, coder.getWindowingStrategy().getWindowFn());
        PCollection apply = coder.apply(Reify.timestamps());
        for (int i = 0; i < 4; i++) {
            Instant minus = instant.minus(Duration.standardSeconds(i));
            IntervalWindow intervalWindow = new IntervalWindow(minus, minus.plus(Duration.standardSeconds(5L)));
            List<TimestampedValue> asList = Arrays.asList(TimestampedValue.of(KV.of("a", 0), instant), TimestampedValue.of(KV.of("bb", 0), plus), TimestampedValue.of(KV.of("bb", 1), plus), TimestampedValue.of(KV.of("ccccc", 0), plus2), TimestampedValue.of(KV.of("ccccc", 1), plus2), TimestampedValue.of(KV.of("ccccc", 2), plus2), TimestampedValue.of(KV.of("ccccc", 3), plus2), TimestampedValue.of(KV.of("ccccc", 4), plus2));
            ArrayList arrayList = new ArrayList();
            for (TimestampedValue timestampedValue : asList) {
                if (!intervalWindow.start().isAfter(timestampedValue.getTimestamp()) && !timestampedValue.getTimestamp().isAfter(intervalWindow.maxTimestamp())) {
                    arrayList.add(timestampedValue);
                }
            }
            Assert.assertFalse(arrayList.isEmpty());
            PAssert.that(apply).inWindow(intervalWindow).containsInAnyOrder(arrayList);
        }
        this.p.run();
    }

    private static SDFWithMultipleOutputsPerBlockBase sdfWithMultipleOutputsPerBlock(PCollection.IsBounded isBounded, int i) {
        return isBounded == PCollection.IsBounded.BOUNDED ? new SDFWithMultipleOutputsPerBlockBounded(i) : new SDFWithMultipleOutputsPerBlockUnbounded(i);
    }

    @Test
    @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
    public void testOutputAfterCheckpointBounded() {
        testOutputAfterCheckpoint(PCollection.IsBounded.BOUNDED);
    }

    @Test
    @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
    public void testOutputAfterCheckpointUnbounded() {
        testOutputAfterCheckpoint(PCollection.IsBounded.UNBOUNDED);
    }

    private void testOutputAfterCheckpoint(PCollection.IsBounded isBounded) {
        PAssert.thatSingleton(this.p.apply(Create.of(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, new String[0])).apply(ParDo.of(sdfWithMultipleOutputsPerBlock(isBounded, 3))).apply(Window.configure().triggering(Never.ever()).discardingFiredPanes()).apply(Count.globally())).isEqualTo(98765L);
        this.p.run();
    }

    private static SDFWithSideInputBase sdfWithSideInput(PCollection.IsBounded isBounded, Map<Instant, String> map) {
        return isBounded == PCollection.IsBounded.BOUNDED ? new SDFWithSideInputBounded(map) : new SDFWithSideInputUnbounded(map);
    }

    @Test
    @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesSideInputs.class})
    public void testSideInputBounded() {
        testSideInput(PCollection.IsBounded.BOUNDED);
    }

    @Test
    @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class, UsesSideInputs.class})
    public void testSideInputUnbounded() {
        testSideInput(PCollection.IsBounded.UNBOUNDED);
    }

    private void testSideInput(PCollection.IsBounded isBounded) {
        PAssert.that(this.p.apply("input", Create.of(0, new Integer[]{1, 2})).apply(ParDo.of(sdfWithSideInput(isBounded, Collections.singletonMap(GlobalWindow.TIMESTAMP_MIN_VALUE, ParDoTest.TimerTests.AnonymousClass4.TIMER_ID))).withSideInput("sideInput", this.p.apply("side input", Create.of(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, new String[0])).apply(View.asSingleton())))).containsInAnyOrder(Arrays.asList("foo:0", "foo:1", "foo:2"));
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesSideInputs.class})
    public void testWindowedSideInputBounded() {
        testWindowedSideInput(PCollection.IsBounded.BOUNDED);
    }

    @Test
    @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class, UsesSideInputs.class})
    public void testWindowedSideInputUnbounded() {
        testWindowedSideInput(PCollection.IsBounded.UNBOUNDED);
    }

    private void testWindowedSideInput(PCollection.IsBounded isBounded) {
        PAssert.that(this.p.apply("main", Create.timestamped(TimestampedValue.of(0, new Instant(0L)), new TimestampedValue[]{TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)), TimestampedValue.of(3, new Instant(3L)), TimestampedValue.of(4, new Instant(4L)), TimestampedValue.of(5, new Instant(5L)), TimestampedValue.of(6, new Instant(6L)), TimestampedValue.of(7, new Instant(7L))})).apply("window 2", Window.into(FixedWindows.of(Duration.millis(2L)))).apply(ParDo.of(sdfWithSideInput(isBounded, ImmutableMap.builder().put(new Instant(0L), "a").put(new Instant(1L), "a").put(new Instant(2L), "a").put(new Instant(3L), "a").put(new Instant(4L), "b").put(new Instant(5L), "b").put(new Instant(6L), "b").put(new Instant(7L), "b").build())).withSideInput("sideInput", this.p.apply("side", Create.timestamped(TimestampedValue.of("a", new Instant(0L)), new TimestampedValue[]{TimestampedValue.of("b", new Instant(4L))})).apply("window 4", Window.into(FixedWindows.of(Duration.millis(4L)))).apply("singleton", View.asSingleton())))).containsInAnyOrder(new String[]{"a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7"});
        this.p.run();
    }

    private static SDFWithMultipleOutputsPerBlockAndSideInputBase sdfWithMultipleOutputsPerBlockAndSideInput(PCollection.IsBounded isBounded, Map<Instant, String> map, int i) {
        return isBounded == PCollection.IsBounded.BOUNDED ? new SDFWithMultipleOutputsPerBlockAndSideInputBounded(map, i) : new SDFWithMultipleOutputsPerBlockAndSideInputUnbounded(map, i);
    }

    @Test
    @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesSideInputs.class})
    public void testWindowedSideInputWithCheckpointsBounded() {
        testWindowedSideInputWithCheckpoints(PCollection.IsBounded.BOUNDED);
    }

    @Test
    @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class, UsesSideInputs.class})
    public void testWindowedSideInputWithCheckpointsUnbounded() {
        testWindowedSideInputWithCheckpoints(PCollection.IsBounded.UNBOUNDED);
    }

    private void testWindowedSideInputWithCheckpoints(PCollection.IsBounded isBounded) {
        PCollection apply = this.p.apply("main", Create.timestamped(TimestampedValue.of(0, new Instant(0L)), new TimestampedValue[]{TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)), TimestampedValue.of(3, new Instant(3L))})).apply("window 1", Window.into(FixedWindows.of(Duration.millis(1L)))).apply(ParDo.of(sdfWithMultipleOutputsPerBlockAndSideInput(isBounded, ImmutableMap.builder().put(new Instant(0L), "a").put(new Instant(1L), "a").put(new Instant(2L), "b").put(new Instant(3L), "b").build(), 3)).withSideInput("sideInput", this.p.apply("side", Create.timestamped(TimestampedValue.of("a", new Instant(0L)), new TimestampedValue[]{TimestampedValue.of("b", new Instant(2L))})).apply("window 2", Window.into(FixedWindows.of(Duration.millis(2L)))).apply("singleton", View.asSingleton()))).apply(GroupByKey.create());
        PAssert.that(apply.apply(Keys.create())).containsInAnyOrder(new String[]{"a:0", "a:1", "b:2", "b:3"});
        PAssert.that(apply).satisfies(iterable -> {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 98765; i++) {
                arrayList.add(Integer.valueOf(i));
            }
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(arrayList, Ordering.natural().sortedCopy((Iterable) ((KV) it.next()).getValue()));
            }
            return null;
        });
        this.p.run();
    }

    private static SDFWithAdditionalOutputBase sdfWithAdditionalOutput(PCollection.IsBounded isBounded, TupleTag<String> tupleTag) {
        return isBounded == PCollection.IsBounded.BOUNDED ? new SDFWithAdditionalOutputBounded(tupleTag) : new SDFWithAdditionalOutputUnbounded(tupleTag);
    }

    @Test
    @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
    public void testAdditionalOutputBounded() {
        testAdditionalOutput(PCollection.IsBounded.BOUNDED);
    }

    @Test
    @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
    public void testAdditionalOutputUnbounded() {
        testAdditionalOutput(PCollection.IsBounded.UNBOUNDED);
    }

    private void testAdditionalOutput(PCollection.IsBounded isBounded) {
        TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.sdk.transforms.SplittableDoFnTest.1
        };
        TupleTag<String> tupleTag2 = new TupleTag<String>("additional") { // from class: org.apache.beam.sdk.transforms.SplittableDoFnTest.2
        };
        PCollectionTuple apply = this.p.apply("input", Create.of(0, new Integer[]{1, 2})).apply(ParDo.of(sdfWithAdditionalOutput(isBounded, tupleTag2)).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        PAssert.that(apply.get(tupleTag)).containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2"));
        PAssert.that(apply.get(tupleTag2)).containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2"));
        this.p.run();
    }

    @Test(timeout = 15000)
    @Ignore("https://issues.apache.org/jira/browse/BEAM-6354")
    @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesTestStream.class})
    public void testLateData() {
        Instant now = Instant.now();
        PCollection apply = this.p.apply(TestStream.create(StringUtf8Coder.of()).advanceWatermarkTo(now).addElements("aa", new String[0]).advanceWatermarkTo(now.plus(Duration.standardSeconds(5L))).addElements(TimestampedValue.of("bb", now.minus(Duration.standardHours(1L))), new TimestampedValue[0]).advanceProcessingTime(Duration.standardHours(1L)).advanceWatermarkToInfinity()).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))).withAllowedLateness(Duration.standardMinutes(1L)).discardingFiredPanes());
        PCollection coder = apply.apply(ParDo.of(pairStringWithIndexToLengthFn(PCollection.IsBounded.UNBOUNDED))).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
        PCollection apply2 = coder.apply(GroupByKey.create()).apply(Keys.create());
        PAssert.that(coder).containsInAnyOrder(Arrays.asList(KV.of("aa", 0), KV.of("aa", 1), KV.of("bb", 0), KV.of("bb", 1)));
        Assert.assertEquals(coder.getWindowingStrategy(), apply.getWindowingStrategy());
        PAssert.that(apply2).containsInAnyOrder(new String[]{"aa"});
        this.p.run();
    }

    private static SDFWithLifecycleBase sdfWithLifecycle(PCollection.IsBounded isBounded) {
        return isBounded == PCollection.IsBounded.BOUNDED ? new SDFWithLifecycleBounded() : new SDFWithLifecycleUnbounded();
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class, UsesBoundedSplittableParDo.class})
    public void testLifecycleMethodsBounded() {
        testLifecycleMethods(PCollection.IsBounded.BOUNDED);
    }

    @Test
    @Category({ValidatesRunner.class, UsesParDoLifecycle.class, UsesUnboundedSplittableParDo.class})
    public void testLifecycleMethodsUnbounded() {
        testLifecycleMethods(PCollection.IsBounded.UNBOUNDED);
    }

    private void testLifecycleMethods(PCollection.IsBounded isBounded) {
        PAssert.that(this.p.apply(Create.of("a", new String[]{"b", "c"})).apply(ParDo.of(sdfWithLifecycle(isBounded)))).containsInAnyOrder(new String[]{"a", "b", "c"});
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testBoundedness() {
        PCollection apply = TestPipeline.create().apply(Create.of(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, new String[0]));
        Assert.assertEquals(PCollection.IsBounded.BOUNDED, apply.apply(ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.SplittableDoFnTest.3
            @DoFn.ProcessElement
            public void process(RestrictionTracker<OffsetRange, Long> restrictionTracker) {
            }

            @DoFn.GetInitialRestriction
            public OffsetRange getInitialRestriction() {
                return new OffsetRange(0L, 1L);
            }
        })).isBounded());
        Assert.assertEquals(PCollection.IsBounded.UNBOUNDED, apply.apply(ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.SplittableDoFnTest.4
            @DoFn.ProcessElement
            public DoFn.ProcessContinuation process(RestrictionTracker<OffsetRange, Long> restrictionTracker) {
                return DoFn.ProcessContinuation.stop();
            }

            @DoFn.GetInitialRestriction
            public OffsetRange getInitialRestriction() {
                return new OffsetRange(0L, 1L);
            }
        })).isBounded());
    }

    @Test
    @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesBundleFinalizer.class})
    public void testBundleFinalizationOccursOnBoundedSplittableDoFn() throws Exception {
        PAssert.that(this.p.apply(Create.of(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, new String[0])).apply(ParDo.of(new BundleFinalizingSplittableDoFn() { // from class: org.apache.beam.sdk.transforms.SplittableDoFnTest.1BoundedBundleFinalizingSplittableDoFn
        }))).containsInAnyOrder(new String[]{ParDoTest.TimerTests.AnonymousClass4.TIMER_ID});
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class, UsesBundleFinalizer.class})
    public void testBundleFinalizationOccursOnUnboundedSplittableDoFn() throws Exception {
        PAssert.that(this.p.apply(Create.of(ParDoTest.TimerTests.AnonymousClass4.TIMER_ID, new String[0])).apply(ParDo.of(new BundleFinalizingSplittableDoFn() { // from class: org.apache.beam.sdk.transforms.SplittableDoFnTest.1UnboundedBundleFinalizingSplittableDoFn
        }))).containsInAnyOrder(new String[]{ParDoTest.TimerTests.AnonymousClass4.TIMER_ID});
        this.p.run();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1367367564:
                if (implMethodName.equals("lambda$testWindowedSideInputWithCheckpoints$abf6f5a4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/SplittableDoFnTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        ArrayList arrayList = new ArrayList();
                        for (int i = 0; i < 98765; i++) {
                            arrayList.add(Integer.valueOf(i));
                        }
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            Assert.assertEquals(arrayList, Ordering.natural().sortedCopy((Iterable) ((KV) it.next()).getValue()));
                        }
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
