package org.apache.beam.runners.core;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Executors;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest.class */
public class SplittableParDoProcessFnTest {
    private static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
    private static final Duration MAX_BUNDLE_DURATION = Duration.standardSeconds(5);

    @Rule
    public final ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider();

    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$CounterFn.class */
    private static class CounterFn extends DoFn<Integer, String> {
        private final int numOutputsPerCall;

        public CounterFn(int i) {
            this.numOutputsPerCall = i;
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn<Integer, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
            long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
            long j = 0;
            while (true) {
                long j2 = j;
                if (!restrictionTracker.tryClaim(Long.valueOf(from))) {
                    return DoFn.ProcessContinuation.stop();
                }
                processContext.output(String.valueOf(((Integer) processContext.element()).intValue() + from));
                if (j2 == this.numOutputsPerCall - 1) {
                    return DoFn.ProcessContinuation.resume();
                }
                from++;
                j = j2 + 1;
            }
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction(Integer num) {
            throw new UnsupportedOperationException("Expected to be supplied explicitly in this test");
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$LifecycleVerifyingFn.class */
    private static class LifecycleVerifyingFn extends DoFn<Integer, String> {
        private State state;

        /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$LifecycleVerifyingFn$State.class */
        private enum State {
            BEFORE_SETUP,
            OUTSIDE_BUNDLE,
            INSIDE_BUNDLE,
            TORN_DOWN
        }

        private LifecycleVerifyingFn() {
            this.state = State.BEFORE_SETUP;
        }

        @DoFn.ProcessElement
        public void process(DoFn<Integer, String>.ProcessContext processContext, RestrictionTracker<SomeRestriction, Void> restrictionTracker) {
            Assert.assertEquals(State.INSIDE_BUNDLE, this.state);
        }

        @DoFn.GetInitialRestriction
        public SomeRestriction getInitialRestriction(Integer num) {
            return new SomeRestriction();
        }

        @DoFn.Setup
        public void setup() {
            Assert.assertEquals(State.BEFORE_SETUP, this.state);
            this.state = State.OUTSIDE_BUNDLE;
        }

        @DoFn.Teardown
        public void tearDown() {
            Assert.assertEquals(State.OUTSIDE_BUNDLE, this.state);
            this.state = State.TORN_DOWN;
        }

        @DoFn.StartBundle
        public void startBundle() {
            Assert.assertEquals(State.OUTSIDE_BUNDLE, this.state);
            this.state = State.INSIDE_BUNDLE;
        }

        @DoFn.FinishBundle
        public void finishBundle() {
            Assert.assertEquals(State.INSIDE_BUNDLE, this.state);
            this.state = State.OUTSIDE_BUNDLE;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$OutputWindowedValueToDoFnTester.class */
    private static class OutputWindowedValueToDoFnTester<OutputT> implements OutputWindowedValue<OutputT> {
        private final DoFnTester<?, OutputT> tester;

        private OutputWindowedValueToDoFnTester(DoFnTester<?, OutputT> doFnTester) {
            this.tester = doFnTester;
        }

        public void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            outputWindowedValue(this.tester.getMainOutputTag(), outputt, instant, collection, paneInfo);
        }

        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            Iterator<? extends BoundedWindow> it = collection.iterator();
            while (it.hasNext()) {
                this.tester.getMutableOutput(tupleTag).add(ValueInSingleWindow.of(additionaloutputt, instant, it.next(), paneInfo));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$ProcessFnTester.class */
    private static class ProcessFnTester<InputT, OutputT, RestrictionT, PositionT> implements AutoCloseable {
        private final DoFnTester<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> tester;
        private Instant currentProcessingTime;
        private InMemoryTimerInternals timerInternals;
        private TestInMemoryStateInternals<String> stateInternals;

        ProcessFnTester(Instant instant, DoFn<InputT, OutputT> doFn, Coder<InputT> coder, Coder<RestrictionT> coder2, int i, Duration duration) throws Exception {
            SplittableParDoViaKeyedWorkItems.ProcessFn processFn = new SplittableParDoViaKeyedWorkItems.ProcessFn(doFn, coder, coder2, WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1L))));
            this.tester = DoFnTester.of(processFn);
            this.timerInternals = new InMemoryTimerInternals();
            this.stateInternals = new TestInMemoryStateInternals<>("dummy");
            processFn.setStateInternalsFactory(bArr -> {
                return this.stateInternals;
            });
            processFn.setTimerInternalsFactory(bArr2 -> {
                return this.timerInternals;
            });
            processFn.setProcessElementInvoker(new OutputAndTimeBoundedSplittableProcessElementInvoker(doFn, this.tester.getPipelineOptions(), new OutputWindowedValueToDoFnTester(this.tester), new SideInputReader() { // from class: org.apache.beam.runners.core.SplittableParDoProcessFnTest.ProcessFnTester.1
                public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
                    throw new NoSuchElementException();
                }

                public <T> boolean contains(PCollectionView<T> pCollectionView) {
                    return false;
                }

                public boolean isEmpty() {
                    return true;
                }
            }, Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), i, duration));
            this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
            this.tester.startBundle();
            this.timerInternals.advanceProcessingTime(instant);
            this.currentProcessingTime = instant;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.tester.close();
        }

        void startElement(InputT inputt, RestrictionT restrictiont) throws Exception {
            startElement(WindowedValue.of(KV.of(inputt, restrictiont), this.currentProcessingTime, GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING));
        }

        void startElement(WindowedValue<KV<InputT, RestrictionT>> windowedValue) throws Exception {
            this.tester.processElement(KeyedWorkItems.elementsWorkItem("key".getBytes(StandardCharsets.UTF_8), Collections.singletonList(windowedValue)));
        }

        boolean advanceProcessingTimeBy(Duration duration) throws Exception {
            this.currentProcessingTime = this.currentProcessingTime.plus(duration);
            this.timerInternals.advanceProcessingTime(this.currentProcessingTime);
            ArrayList arrayList = new ArrayList();
            while (true) {
                TimerInternals.TimerData removeNextProcessingTimer = this.timerInternals.removeNextProcessingTimer();
                if (removeNextProcessingTimer == null) {
                    break;
                }
                arrayList.add(removeNextProcessingTimer);
            }
            if (arrayList.isEmpty()) {
                return false;
            }
            this.tester.processElement(KeyedWorkItems.timersWorkItem("key".getBytes(StandardCharsets.UTF_8), arrayList));
            return true;
        }

        List<TimestampedValue<OutputT>> peekOutputElementsInWindow(BoundedWindow boundedWindow) {
            return this.tester.peekOutputElementsInWindow(boundedWindow);
        }

        List<OutputT> takeOutputElements() {
            return this.tester.takeOutputElements();
        }

        public Instant getWatermarkHold() {
            return this.stateInternals.earliestWatermarkHold();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$SelfInitiatedResumeFn.class */
    private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
        private SelfInitiatedResumeFn() {
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn<Integer, String>.ProcessContext processContext, RestrictionTracker<SomeRestriction, Void> restrictionTracker) {
            Preconditions.checkState(restrictionTracker.tryClaim((Object) null));
            processContext.output(((Integer) processContext.element()).toString());
            return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(5L));
        }

        @DoFn.GetInitialRestriction
        public SomeRestriction getInitialRestriction(Integer num) {
            return new SomeRestriction();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$SomeRestriction.class */
    public static class SomeRestriction implements Serializable, HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {
        private SomeRestriction() {
        }

        /* renamed from: newTracker, reason: merged with bridge method [inline-methods] */
        public SomeRestrictionTracker m11newTracker() {
            return new SomeRestrictionTracker(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$SomeRestrictionTracker.class */
    public static class SomeRestrictionTracker extends RestrictionTracker<SomeRestriction, Void> {
        private final SomeRestriction someRestriction;

        public SomeRestrictionTracker(SomeRestriction someRestriction) {
            this.someRestriction = someRestriction;
        }

        public boolean tryClaim(Void r3) {
            return true;
        }

        /* renamed from: currentRestriction, reason: merged with bridge method [inline-methods] */
        public SomeRestriction m12currentRestriction() {
            return this.someRestriction;
        }

        public SplitResult<SomeRestriction> trySplit(double d) {
            return SplitResult.of((Object) null, this.someRestriction);
        }

        public void checkDone() {
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$ToStringFn.class */
    private static class ToStringFn extends DoFn<Integer, String> {
        private ToStringFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<Integer, String>.ProcessContext processContext, RestrictionTracker<SomeRestriction, Void> restrictionTracker) {
            Preconditions.checkState(restrictionTracker.tryClaim((Object) null));
            processContext.output(((Integer) processContext.element()).toString() + "a");
            processContext.output(((Integer) processContext.element()).toString() + "b");
            processContext.output(((Integer) processContext.element()).toString() + "c");
        }

        @DoFn.GetInitialRestriction
        public SomeRestriction getInitialRestriction(Integer num) {
            return new SomeRestriction();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDoProcessFnTest$WatermarkUpdateFn.class */
    private static class WatermarkUpdateFn extends DoFn<Instant, String> {
        private WatermarkUpdateFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<Instant, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
            long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
            while (true) {
                long j = from;
                if (!restrictionTracker.tryClaim(Long.valueOf(j))) {
                    return;
                }
                processContext.updateWatermark(((Instant) processContext.element()).plus(Duration.standardSeconds(j)));
                processContext.output(String.valueOf(j));
                from = j + 1;
            }
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction(Instant instant) {
            throw new IllegalStateException("Expected to be supplied explicitly in this test");
        }

        @DoFn.NewTracker
        public OffsetRangeTracker newTracker(OffsetRange offsetRange) {
            return new OffsetRangeTracker(offsetRange);
        }
    }

    @Test
    public void testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exception {
        ToStringFn toStringFn = new ToStringFn();
        Instant now = Instant.now();
        BoundedWindow intervalWindow = new IntervalWindow(now.minus(Duration.standardMinutes(1L)), now.plus(Duration.standardMinutes(1L)));
        ProcessFnTester processFnTester = new ProcessFnTester(now, toStringFn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class), MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
        processFnTester.startElement(WindowedValue.of(KV.of(42, new SomeRestriction()), now, Collections.singletonList(intervalWindow), PaneInfo.ON_TIME_AND_ONLY_FIRING));
        Assert.assertEquals(Arrays.asList(TimestampedValue.of("42a", now), TimestampedValue.of("42b", now), TimestampedValue.of("42c", now)), processFnTester.peekOutputElementsInWindow(intervalWindow));
    }

    @Test
    public void testUpdatesWatermark() throws Exception {
        WatermarkUpdateFn watermarkUpdateFn = new WatermarkUpdateFn();
        Instant now = Instant.now();
        ProcessFnTester processFnTester = new ProcessFnTester(now, watermarkUpdateFn, InstantCoder.of(), SerializableCoder.of(OffsetRange.class), 3, MAX_BUNDLE_DURATION);
        processFnTester.startElement(now, new OffsetRange(0L, 8L));
        Assert.assertThat(processFnTester.takeOutputElements(), Matchers.hasItems(new String[]{"0", "1", "2"}));
        Assert.assertEquals(now.plus(Duration.standardSeconds(2L)), processFnTester.getWatermarkHold());
        Assert.assertTrue(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(1L)));
        Assert.assertThat(processFnTester.takeOutputElements(), Matchers.hasItems(new String[]{"3", "4", "5"}));
        Assert.assertEquals(now.plus(Duration.standardSeconds(5L)), processFnTester.getWatermarkHold());
        Assert.assertTrue(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(1L)));
        Assert.assertThat(processFnTester.takeOutputElements(), Matchers.hasItems(new String[]{"6", "7"}));
        Assert.assertEquals((Object) null, processFnTester.getWatermarkHold());
    }

    @Test
    public void testResumeSetsTimer() throws Exception {
        SelfInitiatedResumeFn selfInitiatedResumeFn = new SelfInitiatedResumeFn();
        Instant now = Instant.now();
        this.dateTimeProvider.setDateTimeFixed(now.getMillis());
        ProcessFnTester processFnTester = new ProcessFnTester(now, selfInitiatedResumeFn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class), MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
        processFnTester.startElement(42, new SomeRestriction());
        Assert.assertThat(processFnTester.takeOutputElements(), Matchers.contains(new String[]{"42"}));
        Assert.assertFalse(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(3L)));
        Assert.assertTrue(processFnTester.takeOutputElements().isEmpty());
        Assert.assertTrue(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(3L)));
        Assert.assertThat(processFnTester.takeOutputElements(), Matchers.contains(new String[]{"42"}));
        Assert.assertFalse(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(3L)));
        Assert.assertTrue(processFnTester.takeOutputElements().isEmpty());
        Assert.assertTrue(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(3L)));
        Assert.assertThat(processFnTester.takeOutputElements(), Matchers.contains(new String[]{"42"}));
    }

    @Test
    public void testResumeCarriesOverState() throws Exception {
        CounterFn counterFn = new CounterFn(1);
        Instant now = Instant.now();
        this.dateTimeProvider.setDateTimeFixed(now.getMillis());
        ProcessFnTester processFnTester = new ProcessFnTester(now, counterFn, BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class), MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
        processFnTester.startElement(42, new OffsetRange(0L, 3L));
        Assert.assertThat(processFnTester.takeOutputElements(), Matchers.contains(new String[]{"42"}));
        Assert.assertTrue(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(1L)));
        Assert.assertThat(processFnTester.takeOutputElements(), Matchers.contains(new String[]{"43"}));
        Assert.assertTrue(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(1L)));
        Assert.assertThat(processFnTester.takeOutputElements(), Matchers.contains(new String[]{"44"}));
        Assert.assertTrue(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(1L)));
        Assert.assertEquals(0L, processFnTester.takeOutputElements().size());
        Assert.assertFalse(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(1L)));
    }

    @Test
    public void testCheckpointsAfterNumOutputs() throws Exception {
        ProcessFnTester processFnTester = new ProcessFnTester(Instant.now(), new CounterFn(Integer.MAX_VALUE), BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class), 100, MAX_BUNDLE_DURATION);
        processFnTester.startElement(42, new OffsetRange(0L, (2 * 100) + (100 / 2)));
        List takeOutputElements = processFnTester.takeOutputElements();
        Assert.assertEquals(100, takeOutputElements.size());
        Assert.assertThat(takeOutputElements, Matchers.hasItem(String.valueOf(42)));
        Assert.assertThat(takeOutputElements, Matchers.hasItem(String.valueOf((42 + 100) - 1)));
        Assert.assertTrue(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(1L)));
        List takeOutputElements2 = processFnTester.takeOutputElements();
        Assert.assertEquals(100, takeOutputElements2.size());
        Assert.assertThat(takeOutputElements2, Matchers.hasItem(String.valueOf(42 + 100)));
        Assert.assertThat(takeOutputElements2, Matchers.hasItem(String.valueOf((42 + (2 * 100)) - 1)));
        Assert.assertTrue(processFnTester.advanceProcessingTimeBy(Duration.standardSeconds(1L)));
        List takeOutputElements3 = processFnTester.takeOutputElements();
        Assert.assertEquals(100 / 2, takeOutputElements3.size());
        Assert.assertThat(takeOutputElements3, Matchers.hasItem(String.valueOf(42 + (2 * 100))));
        Assert.assertThat(takeOutputElements3, Matchers.hasItem(String.valueOf(((42 + (2 * 100)) + (100 / 2)) - 1)));
        Assert.assertThat(takeOutputElements3, Matchers.not(Matchers.hasItem(String.valueOf(42 + (2 * 100) + (100 / 2)))));
    }

    @Test
    public void testCheckpointsAfterDuration() throws Exception {
        Duration standardSeconds = Duration.standardSeconds(1L);
        CounterFn counterFn = new CounterFn(Integer.MAX_VALUE);
        Instant now = Instant.now();
        ProcessFnTester processFnTester = new ProcessFnTester(now, counterFn, BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class), Integer.MAX_VALUE, standardSeconds);
        processFnTester.startElement(42, new OffsetRange(0L, Long.MAX_VALUE));
        Assert.assertFalse(processFnTester.takeOutputElements().isEmpty());
        Assert.assertThat(Long.valueOf(Instant.now().getMillis() - now.getMillis()), Matchers.greaterThanOrEqualTo(Long.valueOf(standardSeconds.getMillis())));
    }

    @Test
    public void testInvokesLifecycleMethods() throws Exception {
        ProcessFnTester processFnTester = new ProcessFnTester(Instant.now(), new LifecycleVerifyingFn(), BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class), MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
        Throwable th = null;
        try {
            try {
                processFnTester.startElement(42, new SomeRestriction());
                if (0 == 0) {
                    processFnTester.close();
                    return;
                }
                try {
                    processFnTester.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (th != null) {
                try {
                    processFnTester.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            } else {
                processFnTester.close();
            }
            throw th4;
        }
    }
}
