package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesSplittableParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest.class */
public class SplittableDoFnTest {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$OffsetRange.class */
    public static class OffsetRange implements Serializable {
        public final int from;
        public final int to;

        OffsetRange(int i, int i2) {
            this.from = i;
            this.to = i2;
        }

        public String toString() {
            return "OffsetRange{from=" + this.from + ", to=" + this.to + '}';
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$OffsetRangeTracker.class */
    private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
        private OffsetRange range;
        private Integer lastClaimedIndex = null;

        OffsetRangeTracker(OffsetRange offsetRange) {
            this.range = (OffsetRange) Preconditions.checkNotNull(offsetRange);
        }

        /* renamed from: currentRestriction, reason: merged with bridge method [inline-methods] */
        public OffsetRange m135currentRestriction() {
            return this.range;
        }

        /* renamed from: checkpoint, reason: merged with bridge method [inline-methods] */
        public OffsetRange m134checkpoint() {
            if (this.lastClaimedIndex == null) {
                OffsetRange offsetRange = this.range;
                this.range = new OffsetRange(this.range.from, this.range.from);
                return offsetRange;
            }
            OffsetRange offsetRange2 = new OffsetRange(this.lastClaimedIndex.intValue() + 1, this.range.to);
            this.range = new OffsetRange(this.range.from, this.lastClaimedIndex.intValue() + 1);
            return offsetRange2;
        }

        boolean tryClaim(int i) {
            Preconditions.checkState(this.lastClaimedIndex == null || i > this.lastClaimedIndex.intValue());
            if (i >= this.range.to) {
                return false;
            }
            this.lastClaimedIndex = Integer.valueOf(i);
            return true;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$PairStringWithIndexToLength.class */
    static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
        PairStringWithIndexToLength() {
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn<String, KV<String, Integer>>.ProcessContext processContext, OffsetRangeTracker offsetRangeTracker) {
            for (int i = offsetRangeTracker.m135currentRestriction().from; offsetRangeTracker.tryClaim(i); i++) {
                processContext.output(KV.of(processContext.element(), Integer.valueOf(i)));
                if (i % 3 == 0) {
                    return DoFn.ProcessContinuation.resume();
                }
            }
            return DoFn.ProcessContinuation.stop();
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRange(String str) {
            return new OffsetRange(0, str.length());
        }

        @DoFn.SplitRestriction
        public void splitRange(String str, OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRange> outputReceiver) {
            outputReceiver.output(new OffsetRange(offsetRange.from, (offsetRange.from + offsetRange.to) / 2));
            outputReceiver.output(new OffsetRange((offsetRange.from + offsetRange.to) / 2, offsetRange.to));
        }

        @DoFn.NewTracker
        public OffsetRangeTracker newTracker(OffsetRange offsetRange) {
            return new OffsetRangeTracker(offsetRange);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$ReifyTimestampsFn.class */
    private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> {
        private ReifyTimestampsFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<T, TimestampedValue<T>>.ProcessContext processContext) {
            processContext.output(TimestampedValue.of(processContext.element(), processContext.timestamp()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithLifecycle.class */
    private static class SDFWithLifecycle extends DoFn<String, String> {
        private State state;

        /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithLifecycle$State.class */
        private enum State {
            BEFORE_SETUP,
            OUTSIDE_BUNDLE,
            INSIDE_BUNDLE,
            TORN_DOWN
        }

        private SDFWithLifecycle() {
            this.state = State.BEFORE_SETUP;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext, OffsetRangeTracker offsetRangeTracker) {
            Assert.assertEquals(State.INSIDE_BUNDLE, this.state);
            Assert.assertTrue(offsetRangeTracker.tryClaim(0));
            processContext.output(processContext.element());
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction(String str) {
            return new OffsetRange(0, 1);
        }

        @DoFn.NewTracker
        public OffsetRangeTracker newTracker(OffsetRange offsetRange) {
            return new OffsetRangeTracker(offsetRange);
        }

        @DoFn.Setup
        public void setUp() {
            Assert.assertEquals(State.BEFORE_SETUP, this.state);
            this.state = State.OUTSIDE_BUNDLE;
        }

        @DoFn.StartBundle
        public void startBundle(DoFn<String, String>.Context context) {
            Assert.assertEquals(State.OUTSIDE_BUNDLE, this.state);
            this.state = State.INSIDE_BUNDLE;
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<String, String>.Context context) {
            Assert.assertEquals(State.INSIDE_BUNDLE, this.state);
            this.state = State.OUTSIDE_BUNDLE;
        }

        @DoFn.Teardown
        public void tearDown() {
            Assert.assertEquals(State.OUTSIDE_BUNDLE, this.state);
            this.state = State.TORN_DOWN;
        }
    }

    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithMultipleOutputsPerBlock.class */
    private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer> {
        private static final int MAX_INDEX = 98765;

        private SDFWithMultipleOutputsPerBlock() {
        }

        private static int snapToNextBlock(int i, int[] iArr) {
            for (int i2 = 1; i2 < iArr.length; i2++) {
                if (i > iArr[i2 - 1] && i <= iArr[i2]) {
                    return i2;
                }
            }
            throw new IllegalStateException("Shouldn't get here");
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation processElement(DoFn<String, Integer>.ProcessContext processContext, OffsetRangeTracker offsetRangeTracker) {
            int[] iArr = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
            int snapToNextBlock = snapToNextBlock(offsetRangeTracker.m135currentRestriction().from, iArr);
            int snapToNextBlock2 = snapToNextBlock(offsetRangeTracker.m135currentRestriction().to, iArr);
            for (int i = snapToNextBlock; i < snapToNextBlock2; i++) {
                if (!offsetRangeTracker.tryClaim(iArr[i])) {
                    return DoFn.ProcessContinuation.resume();
                }
                for (int i2 = iArr[i]; i2 < iArr[i + 1]; i2++) {
                    processContext.output(Integer.valueOf(i2));
                }
            }
            return DoFn.ProcessContinuation.stop();
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRange(String str) {
            return new OffsetRange(0, MAX_INDEX);
        }

        @DoFn.NewTracker
        public OffsetRangeTracker newTracker(OffsetRange offsetRange) {
            return new OffsetRangeTracker(offsetRange);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/SplittableDoFnTest$SDFWithSideInputsAndOutputs.class */
    private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
        private final PCollectionView<String> sideInput;
        private final TupleTag<String> sideOutput;

        private SDFWithSideInputsAndOutputs(PCollectionView<String> pCollectionView, TupleTag<String> tupleTag) {
            this.sideInput = pCollectionView;
            this.sideOutput = tupleTag;
        }

        @DoFn.ProcessElement
        public void process(DoFn<Integer, String>.ProcessContext processContext, OffsetRangeTracker offsetRangeTracker) {
            Preconditions.checkState(offsetRangeTracker.tryClaim(offsetRangeTracker.m135currentRestriction().from));
            String str = (String) processContext.sideInput(this.sideInput);
            processContext.output("main:" + str + ":" + processContext.element());
            processContext.sideOutput(this.sideOutput, "side:" + str + ":" + processContext.element());
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction(Integer num) {
            return new OffsetRange(0, 1);
        }

        @DoFn.NewTracker
        public OffsetRangeTracker newTracker(OffsetRange offsetRange) {
            return new OffsetRangeTracker(offsetRange);
        }
    }

    @Test
    @Category({RunnableOnService.class, UsesSplittableParDo.class})
    public void testPairWithIndexBasic() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(new String[]{"a", "bb", "ccccc"})).apply(ParDo.of(new PairStringWithIndexToLength())).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))).containsInAnyOrder(Arrays.asList(KV.of("a", 0), KV.of("bb", 0), KV.of("bb", 1), KV.of("ccccc", 0), KV.of("ccccc", 1), KV.of("ccccc", 2), KV.of("ccccc", 3), KV.of("ccccc", 4)));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class, UsesSplittableParDo.class})
    public void testPairWithIndexWindowedTimestamped() {
        TestPipeline create = TestPipeline.create();
        MutableDateTime mutableDateTime = Instant.now().toMutableDateTime();
        mutableDateTime.setMillisOfSecond(0);
        Instant instant = mutableDateTime.toInstant();
        Instant plus = instant.plus(Duration.standardSeconds(1L));
        Instant plus2 = instant.plus(Duration.standardSeconds(2L));
        SlidingWindows every = SlidingWindows.of(Duration.standardSeconds(5L)).every(Duration.standardSeconds(1L));
        PCollection coder = create.apply(Create.timestamped(new TimestampedValue[]{TimestampedValue.of("a", instant), TimestampedValue.of("bb", plus), TimestampedValue.of("ccccc", plus2)})).apply(Window.into(every)).apply(ParDo.of(new PairStringWithIndexToLength())).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
        Assert.assertEquals(every, coder.getWindowingStrategy().getWindowFn());
        PCollection apply = coder.apply("Reify timestamps", ParDo.of(new ReifyTimestampsFn()));
        for (int i = 0; i < 4; i++) {
            Instant minus = instant.minus(Duration.standardSeconds(i));
            IntervalWindow intervalWindow = new IntervalWindow(minus, minus.plus(Duration.standardSeconds(5L)));
            List<TimestampedValue> asList = Arrays.asList(TimestampedValue.of(KV.of("a", 0), instant), TimestampedValue.of(KV.of("bb", 0), plus), TimestampedValue.of(KV.of("bb", 1), plus), TimestampedValue.of(KV.of("ccccc", 0), plus2), TimestampedValue.of(KV.of("ccccc", 1), plus2), TimestampedValue.of(KV.of("ccccc", 2), plus2), TimestampedValue.of(KV.of("ccccc", 3), plus2), TimestampedValue.of(KV.of("ccccc", 4), plus2));
            ArrayList arrayList = new ArrayList();
            for (TimestampedValue timestampedValue : asList) {
                if (!intervalWindow.start().isAfter(timestampedValue.getTimestamp()) && !timestampedValue.getTimestamp().isAfter(intervalWindow.maxTimestamp())) {
                    arrayList.add(timestampedValue);
                }
            }
            Assert.assertFalse(arrayList.isEmpty());
            PAssert.that(apply).inWindow(intervalWindow).containsInAnyOrder(arrayList);
        }
        create.run();
    }

    @Test
    @Category({RunnableOnService.class, UsesSplittableParDo.class})
    public void testOutputAfterCheckpoint() throws Exception {
        TestPipeline create = TestPipeline.create();
        PAssert.thatSingleton(create.apply(Create.of(new String[]{"foo"})).apply(ParDo.of(new SDFWithMultipleOutputsPerBlock())).apply(Count.globally())).isEqualTo(98765L);
        create.run();
    }

    @Test
    @Category({RunnableOnService.class, UsesSplittableParDo.class})
    public void testSideInputsAndOutputs() throws Exception {
        TestPipeline create = TestPipeline.create();
        PCollectionView apply = create.apply("side input", Create.of(new String[]{"foo"})).apply(View.asSingleton());
        TupleTag tupleTag = new TupleTag("main");
        TupleTag tupleTag2 = new TupleTag("side");
        PCollectionTuple apply2 = create.apply("input", Create.of(new Integer[]{0, 1, 2})).apply(ParDo.of(new SDFWithSideInputsAndOutputs(apply, tupleTag2)).withSideInputs(new PCollectionView[]{apply}).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        apply2.get(tupleTag).setCoder(StringUtf8Coder.of());
        apply2.get(tupleTag2).setCoder(StringUtf8Coder.of());
        PAssert.that(apply2.get(tupleTag)).containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2"));
        PAssert.that(apply2.get(tupleTag2)).containsInAnyOrder(Arrays.asList("side:foo:0", "side:foo:1", "side:foo:2"));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class, UsesSplittableParDo.class})
    public void testLateData() throws Exception {
        TestPipeline create = TestPipeline.create();
        Instant now = Instant.now();
        PCollection apply = create.apply(TestStream.create(StringUtf8Coder.of()).advanceWatermarkTo(now).addElements("aa", new String[0]).advanceWatermarkTo(now.plus(Duration.standardSeconds(5L))).addElements(TimestampedValue.of("bb", now.minus(Duration.standardHours(1L))), new TimestampedValue[0]).advanceProcessingTime(Duration.standardHours(1L)).advanceWatermarkToInfinity()).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))).withAllowedLateness(Duration.standardMinutes(1L)));
        PCollection coder = apply.apply(ParDo.of(new PairStringWithIndexToLength())).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
        PCollection apply2 = coder.apply(GroupByKey.create()).apply(Keys.create());
        PAssert.that(coder).containsInAnyOrder(Arrays.asList(KV.of("aa", 0), KV.of("aa", 1), KV.of("bb", 0), KV.of("bb", 1)));
        Assert.assertEquals(coder.getWindowingStrategy(), apply.getWindowingStrategy());
        PAssert.that(apply2).containsInAnyOrder(new String[]{"aa"});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class, UsesSplittableParDo.class})
    public void testLifecycleMethods() throws Exception {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(new String[]{"a", "b", "c"})).apply(ParDo.of(new SDFWithLifecycle()))).containsInAnyOrder(new String[]{"a", "b", "c"});
        create.run();
    }
}
