package org.apache.beam.runners.core;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.class */
public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {

    @Rule
    public transient ExpectedException e = ExpectedException.none();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest$SomeFn.class */
    public static class SomeFn extends DoFn<Void, String> {
        private final Duration sleepBeforeFirstClaim;
        private final int numOutputsPerProcessCall;
        private final Duration sleepBeforeEachOutput;

        private SomeFn(Duration duration, int i, Duration duration2) {
            this.sleepBeforeFirstClaim = duration;
            this.numOutputsPerProcessCall = i;
            this.sleepBeforeEachOutput = duration2;
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn<Void, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
            Uninterruptibles.sleepUninterruptibly(this.sleepBeforeFirstClaim.getMillis(), TimeUnit.MILLISECONDS);
            long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
            long j = 1;
            while (true) {
                long j2 = j;
                if (!restrictionTracker.tryClaim(Long.valueOf(from))) {
                    return DoFn.ProcessContinuation.stop();
                }
                Uninterruptibles.sleepUninterruptibly(this.sleepBeforeEachOutput.getMillis(), TimeUnit.MILLISECONDS);
                processContext.output("" + from);
                if (j2 == this.numOutputsPerProcessCall) {
                    return DoFn.ProcessContinuation.resume();
                }
                from++;
                j = j2 + 1;
            }
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction(@DoFn.Element Void r5) {
            throw new UnsupportedOperationException("Should not be called in this test");
        }
    }

    private SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest(int i, Duration duration, int i2, Duration duration2) throws Exception {
        return runTest(new SomeFn(duration, i2, duration2), new OffsetRange(0L, i));
    }

    private SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest(DoFn<Void, String> doFn, OffsetRange offsetRange) throws Exception {
        return new OutputAndTimeBoundedSplittableProcessElementInvoker(doFn, PipelineOptionsFactory.create(), new OutputWindowedValue<String>() { // from class: org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvokerTest.1
            public void outputWindowedValue(String str, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            }

            public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            }

            public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection, PaneInfo paneInfo) {
                outputWindowedValue((String) obj, instant, (Collection<? extends BoundedWindow>) collection, paneInfo);
            }
        }, NullSideInputReader.empty(), Executors.newSingleThreadScheduledExecutor(), 1000, Duration.standardSeconds(3L), () -> {
            throw new UnsupportedOperationException("BundleFinalizer not configured for test.");
        }).invokeProcessElement(DoFnInvokers.invokerFor(doFn), WindowedValue.of((Object) null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), new OffsetRangeTracker(offsetRange), new WatermarkEstimator<Void>() { // from class: org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvokerTest.2
            public Instant currentWatermark() {
                return GlobalWindow.TIMESTAMP_MIN_VALUE;
            }

            /* renamed from: getState, reason: merged with bridge method [inline-methods] */
            public Void m3getState() {
                return null;
            }
        }, Collections.emptyMap());
    }

    @Test
    public void testInvokeProcessElementOutputBounded() throws Exception {
        SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest = runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.ZERO);
        Assert.assertFalse(runTest.getContinuation().shouldResume());
        OffsetRange offsetRange = (OffsetRange) runTest.getResidualRestriction();
        Assert.assertEquals(1000L, offsetRange.getFrom());
        Assert.assertEquals(10000L, offsetRange.getTo());
    }

    @Test
    public void testInvokeProcessElementTimeBounded() throws Exception {
        SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest = runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100L));
        Assert.assertFalse(runTest.getContinuation().shouldResume());
        OffsetRange offsetRange = (OffsetRange) runTest.getResidualRestriction();
        MatcherAssert.assertThat(Long.valueOf(offsetRange.getFrom()), Matchers.greaterThan(10L));
        MatcherAssert.assertThat(Long.valueOf(offsetRange.getFrom()), Matchers.lessThan(100L));
        Assert.assertEquals(10000L, offsetRange.getTo());
    }

    @Test
    public void testInvokeProcessElementTimeBoundedWithStartupDelay() throws Exception {
        SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest = runTest(10000, Duration.standardSeconds(3L), Integer.MAX_VALUE, Duration.millis(100L));
        Assert.assertFalse(runTest.getContinuation().shouldResume());
        OffsetRange offsetRange = (OffsetRange) runTest.getResidualRestriction();
        MatcherAssert.assertThat(Long.valueOf(offsetRange.getFrom()), Matchers.greaterThan(10L));
        MatcherAssert.assertThat(Long.valueOf(offsetRange.getFrom()), Matchers.lessThan(100L));
        Assert.assertEquals(10000L, offsetRange.getTo());
    }

    @Test
    public void testInvokeProcessElementVoluntaryReturnStop() throws Exception {
        SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest = runTest(5, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100L));
        Assert.assertFalse(runTest.getContinuation().shouldResume());
        Assert.assertNull(runTest.getResidualRestriction());
    }

    @Test
    public void testInvokeProcessElementVoluntaryReturnResume() throws Exception {
        SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest = runTest(10, Duration.ZERO, 5, Duration.millis(100L));
        Assert.assertTrue(runTest.getContinuation().shouldResume());
        Assert.assertEquals(new OffsetRange(5L, 10L), runTest.getResidualRestriction());
    }

    @Test
    public void testInvokeProcessElementOutputDisallowedBeforeTryClaim() throws Exception {
        DoFn<Void, String> doFn = new DoFn<Void, String>() { // from class: org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvokerTest.3
            @DoFn.ProcessElement
            public void process(DoFn<Void, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
                processContext.output("foo");
            }

            @DoFn.GetInitialRestriction
            public OffsetRange getInitialRestriction(@DoFn.Element Void r5) {
                throw new UnsupportedOperationException("Should not be called in this test");
            }
        };
        this.e.expectMessage("Output is not allowed before tryClaim()");
        runTest(doFn, new OffsetRange(0L, 5L));
    }

    @Test
    public void testInvokeProcessElementOutputDisallowedAfterFailedTryClaim() throws Exception {
        DoFn<Void, String> doFn = new DoFn<Void, String>() { // from class: org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvokerTest.4
            @DoFn.ProcessElement
            public void process(DoFn<Void, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
                Assert.assertFalse(restrictionTracker.tryClaim(6L));
                processContext.output("foo");
            }

            @DoFn.GetInitialRestriction
            public OffsetRange getInitialRestriction(@DoFn.Element Void r5) {
                throw new UnsupportedOperationException("Should not be called in this test");
            }
        };
        this.e.expectMessage("Output is not allowed after a failed tryClaim()");
        runTest(doFn, new OffsetRange(0L, 5L));
    }
}
