package org.apache.beam.repackaged.direct_java.runners.core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/SimplePushbackSideInputDoFnRunnerTest.class */
public class SimplePushbackSideInputDoFnRunnerTest {

    @Mock
    StepContext mockStepContext;

    @Mock
    private ReadyCheckingSideInputReader reader;
    private TestDoFnRunner<Integer, Integer> underlying;
    private PCollectionView<Integer> singletonView;
    private DoFnRunner<KV<String, Integer>, Integer> statefulRunner;
    private static final long ALLOWED_LATENESS = 1;
    private InMemoryStateInternals<String> stateInternals;
    private InMemoryTimerInternals timerInternals;

    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
    private static final long WINDOW_SIZE = 10;
    private static final IntervalWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(WINDOW_SIZE));
    private static final IntervalWindow WINDOW_2 = new IntervalWindow(new Instant(WINDOW_SIZE), new Instant(20));
    private static final WindowingStrategy<?, ?> WINDOWING_STRATEGY = WindowingStrategy.of(FixedWindows.of(Duration.millis(WINDOW_SIZE))).withAllowedLateness(Duration.millis(1));

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/SimplePushbackSideInputDoFnRunnerTest$MyDoFn.class */
    private static class MyDoFn extends DoFn<KV<String, Integer>, Integer> {
        public final String stateId = "foo";

        @DoFn.StateId("foo")
        public final StateSpec<ValueState<Integer>> intState;

        private MyDoFn() {
            this.stateId = "foo";
            this.intState = StateSpecs.value(VarIntCoder.of());
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, Integer>, Integer>.ProcessContext processContext, @DoFn.StateId("foo") ValueState<Integer> valueState) {
            valueState.write(Integer.valueOf(((Integer) MoreObjects.firstNonNull((Integer) valueState.read(), 0)).intValue() + 1));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/SimplePushbackSideInputDoFnRunnerTest$TestDoFnRunner.class */
    public static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
        private final Coder<InputT> inputCoder;
        List<WindowedValue<InputT>> inputElems;
        List<TimerInternals.TimerData> firedTimers;
        private boolean started = false;
        private boolean finished = false;

        TestDoFnRunner(Coder<InputT> coder) {
            this.inputCoder = coder;
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
        public DoFn<InputT, OutputT> getFn() {
            return null;
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
        public void startBundle() {
            this.started = true;
            this.inputElems = new ArrayList();
            this.firedTimers = new ArrayList();
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
        public void processElement(WindowedValue<InputT> windowedValue) {
            this.inputElems.add(windowedValue);
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
        public <KeyT> void onTimer(String str, String str2, KeyT keyt, BoundedWindow boundedWindow, Instant instant, Instant instant2, TimeDomain timeDomain) {
            this.firedTimers.add(TimerInternals.TimerData.of(str, str2, StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) boundedWindow), instant, instant2, timeDomain));
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
        public void finishBundle() {
            this.finished = true;
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
        public <KeyT> void onWindowExpiration(BoundedWindow boundedWindow, Instant instant, KeyT keyt) {
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.singletonView = this.p.apply(Create.of(1, new Integer[]{2, 3})).apply(Window.into(new IdentitySideInputWindowFn())).apply(Sum.integersGlobally().asSingletonView());
        this.underlying = new TestDoFnRunner<>(VarIntCoder.of());
        MyDoFn myDoFn = new MyDoFn();
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.mockStepContext.timerInternals()).thenReturn(this.timerInternals);
        this.stateInternals = new InMemoryStateInternals<>("hello");
        this.timerInternals = new InMemoryTimerInternals();
        Mockito.when(this.mockStepContext.stateInternals()).thenReturn(this.stateInternals);
        Mockito.when(this.mockStepContext.timerInternals()).thenReturn(this.timerInternals);
        this.statefulRunner = DoFnRunners.defaultStatefulDoFnRunner(myDoFn, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), getDoFnRunner(myDoFn), asStepContext(this.stateInternals, this.timerInternals), WINDOWING_STRATEGY, new StatefulDoFnRunner.TimeInternalsCleanupTimer(this.timerInternals, WINDOWING_STRATEGY), new StatefulDoFnRunner.StateInternalsStateCleaner(myDoFn, this.stateInternals, WINDOWING_STRATEGY.getWindowFn().windowCoder()));
    }

    private StepContext asStepContext(final StateInternals stateInternals, final TimerInternals timerInternals) {
        return new StepContext() { // from class: org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunnerTest.1
            @Override // org.apache.beam.repackaged.direct_java.runners.core.StepContext
            public StateInternals stateInternals() {
                return stateInternals;
            }

            @Override // org.apache.beam.repackaged.direct_java.runners.core.StepContext
            public TimerInternals timerInternals() {
                return timerInternals;
            }
        };
    }

    private SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner(ImmutableList<PCollectionView<?>> immutableList) {
        SimplePushbackSideInputDoFnRunner<Integer, Integer> create = SimplePushbackSideInputDoFnRunner.create(this.underlying, immutableList, this.reader);
        create.startBundle();
        return create;
    }

    @Test
    public void startFinishBundleDelegates() {
        SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner = createRunner(ImmutableList.of(this.singletonView));
        MatcherAssert.assertThat(Boolean.valueOf(((TestDoFnRunner) this.underlying).started), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(((TestDoFnRunner) this.underlying).finished), Matchers.is(false));
        createRunner.finishBundle();
        MatcherAssert.assertThat(Boolean.valueOf(((TestDoFnRunner) this.underlying).finished), Matchers.is(true));
    }

    @Test
    public void processElementSideInputNotReady() {
        Mockito.when(Boolean.valueOf(this.reader.isReady((PCollectionView) Mockito.eq(this.singletonView), (BoundedWindow) Mockito.any(BoundedWindow.class)))).thenReturn(false);
        SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner = createRunner(ImmutableList.of(this.singletonView));
        WindowedValue<Integer> of = WindowedValue.of(2, new Instant(-2L), new IntervalWindow(new Instant(-500L), new Instant(0L)), PaneInfo.ON_TIME_AND_ONLY_FIRING);
        MatcherAssert.assertThat(createRunner.processElementInReadyWindows(of), Matchers.containsInAnyOrder(new WindowedValue[]{of}));
        MatcherAssert.assertThat(this.underlying.inputElems, Matchers.emptyIterable());
    }

    @Test
    public void processElementSideInputNotReadyMultipleWindows() {
        Mockito.when(Boolean.valueOf(this.reader.isReady((PCollectionView) Mockito.eq(this.singletonView), (BoundedWindow) Mockito.any(BoundedWindow.class)))).thenReturn(false);
        SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner = createRunner(ImmutableList.of(this.singletonView));
        WindowedValue<Integer> of = WindowedValue.of(2, new Instant(-2L), ImmutableList.of(new IntervalWindow(new Instant(-500L), new Instant(0L)), new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), GlobalWindow.INSTANCE), PaneInfo.ON_TIME_AND_ONLY_FIRING);
        MatcherAssert.assertThat(createRunner.processElementInReadyWindows(of), Matchers.equalTo(of.explodeWindows()));
        MatcherAssert.assertThat(this.underlying.inputElems, Matchers.emptyIterable());
    }

    @Test
    public void processElementSideInputNotReadySomeWindows() {
        Mockito.when(Boolean.valueOf(this.reader.isReady((PCollectionView) Mockito.eq(this.singletonView), (BoundedWindow) Mockito.eq(GlobalWindow.INSTANCE)))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.reader.isReady((PCollectionView) Mockito.eq(this.singletonView), (BoundedWindow) AdditionalMatchers.not((GlobalWindow) Mockito.eq(GlobalWindow.INSTANCE))))).thenReturn(true);
        SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner = createRunner(ImmutableList.of(this.singletonView));
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
        IntervalWindow intervalWindow2 = new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
        MatcherAssert.assertThat(createRunner.processElementInReadyWindows(WindowedValue.of(2, new Instant(-2L), ImmutableList.of(intervalWindow, intervalWindow2, GlobalWindow.INSTANCE), PaneInfo.NO_FIRING)), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))}));
        MatcherAssert.assertThat(this.underlying.inputElems, Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.of(2, new Instant(-2L), ImmutableList.of(intervalWindow), PaneInfo.NO_FIRING), WindowedValue.of(2, new Instant(-2L), ImmutableList.of(intervalWindow2), PaneInfo.NO_FIRING)}));
    }

    @Test
    public void processElementSideInputReadyAllWindows() {
        Mockito.when(Boolean.valueOf(this.reader.isReady((PCollectionView) Mockito.eq(this.singletonView), (BoundedWindow) Mockito.any(BoundedWindow.class)))).thenReturn(true);
        SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner = createRunner(ImmutableList.of(this.singletonView));
        WindowedValue<Integer> of = WindowedValue.of(2, new Instant(-2L), ImmutableList.of(new IntervalWindow(new Instant(-500L), new Instant(0L)), new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), GlobalWindow.INSTANCE), PaneInfo.ON_TIME_AND_ONLY_FIRING);
        MatcherAssert.assertThat(createRunner.processElementInReadyWindows(of), Matchers.emptyIterable());
        MatcherAssert.assertThat(this.underlying.inputElems, Matchers.containsInAnyOrder(ImmutableList.copyOf(of.explodeWindows()).toArray()));
    }

    @Test
    public void processElementNoSideInputs() {
        SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner = createRunner(ImmutableList.of());
        WindowedValue<Integer> of = WindowedValue.of(2, new Instant(-2L), ImmutableList.of(new IntervalWindow(new Instant(-500L), new Instant(0L)), new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), GlobalWindow.INSTANCE), PaneInfo.ON_TIME_AND_ONLY_FIRING);
        MatcherAssert.assertThat(createRunner.processElementInReadyWindows(of), Matchers.emptyIterable());
        MatcherAssert.assertThat(this.underlying.inputElems, Matchers.containsInAnyOrder(new WindowedValue[]{of}));
    }

    @Test
    public void testOnTimerCalled() {
        SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner = createRunner(ImmutableList.of());
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(4L), new Instant(16L));
        Instant instant = new Instant(72L);
        createRunner.onTimer("fooTimer", "", null, intervalWindow, instant, instant, TimeDomain.EVENT_TIME);
        MatcherAssert.assertThat(this.underlying.firedTimers, Matchers.contains(new TimerInternals.TimerData[]{TimerInternals.TimerData.of("fooTimer", StateNamespaces.window(IntervalWindow.getCoder(), intervalWindow), instant, instant, TimeDomain.EVENT_TIME)}));
    }

    private SimplePushbackSideInputDoFnRunner<KV<String, Integer>, Integer> createRunner(DoFnRunner<KV<String, Integer>, Integer> doFnRunner, ImmutableList<PCollectionView<?>> immutableList) {
        SimplePushbackSideInputDoFnRunner<KV<String, Integer>, Integer> create = SimplePushbackSideInputDoFnRunner.create(doFnRunner, immutableList, this.reader);
        create.startBundle();
        return create;
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testLateDroppingForStatefulDoFnRunner() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        this.timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.timerInternals.advanceOutputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        SimplePushbackSideInputDoFnRunner<KV<String, Integer>, Integer> createRunner = createRunner(this.statefulRunner, ImmutableList.of(this.singletonView));
        createRunner.startBundle();
        Mockito.when(Boolean.valueOf(this.reader.isReady((PCollectionView) Mockito.eq(this.singletonView), (BoundedWindow) Mockito.any(BoundedWindow.class)))).thenReturn(true);
        createRunner.processElementInReadyWindows(WindowedValue.of(1, new Instant(0L), ImmutableList.of(new IntervalWindow(new Instant(0L), new Instant(WINDOW_SIZE))), PaneInfo.ON_TIME_AND_ONLY_FIRING));
        Assert.assertEquals(1L, metricsContainerImpl.m133getCounter(MetricName.named(StatefulDoFnRunner.class, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue());
        createRunner.finishBundle();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testGarbageCollectForStatefulDoFnRunner() throws Exception {
        this.timerInternals.advanceInputWatermark(new Instant(1L));
        MyDoFn myDoFn = new MyDoFn();
        Objects.requireNonNull(myDoFn);
        StateTag tagForSpec = StateTags.tagForSpec("foo", myDoFn.intState);
        SimplePushbackSideInputDoFnRunner<KV<String, Integer>, Integer> createRunner = createRunner(this.statefulRunner, ImmutableList.of(this.singletonView));
        Instant instant = new Instant(1L);
        Mockito.when(Boolean.valueOf(this.reader.isReady((PCollectionView) Mockito.eq(this.singletonView), (BoundedWindow) Mockito.any(BoundedWindow.class)))).thenReturn(true);
        createRunner.processElementInReadyWindows(WindowedValue.of(KV.of("hello", 1), instant, WINDOW_1, PaneInfo.NO_FIRING));
        Assert.assertEquals(1L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_1), tagForSpec).read()).intValue());
        createRunner.processElementInReadyWindows(WindowedValue.of(KV.of("hello", 1), instant.plus(Duration.millis(WINDOW_SIZE)), WINDOW_2, PaneInfo.NO_FIRING));
        createRunner.processElementInReadyWindows(WindowedValue.of(KV.of("hello", 1), instant.plus(Duration.millis(WINDOW_SIZE)), WINDOW_2, PaneInfo.NO_FIRING));
        Assert.assertEquals(2L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec).read()).intValue());
        advanceInputWatermark(this.timerInternals, WINDOW_1.maxTimestamp().plus(Duration.millis(1L)).plus(Duration.millis(1L)).plus(Duration.millis(1L)), createRunner);
        Assert.assertTrue(this.stateInternals.isEmptyForTesting(this.stateInternals.state(windowNamespace(WINDOW_1), tagForSpec)));
        Assert.assertEquals(2L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec).read()).intValue());
        advanceInputWatermark(this.timerInternals, WINDOW_2.maxTimestamp().plus(Duration.millis(1L)).plus(Duration.millis(1L)).plus(Duration.millis(1L)), createRunner);
        Assert.assertTrue(this.stateInternals.isEmptyForTesting(this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec)));
    }

    private static void advanceInputWatermark(InMemoryTimerInternals inMemoryTimerInternals, Instant instant, PushbackSideInputDoFnRunner<?, ?> pushbackSideInputDoFnRunner) throws Exception {
        inMemoryTimerInternals.advanceInputWatermark(instant);
        while (true) {
            TimerInternals.TimerData removeNextEventTimer = inMemoryTimerInternals.removeNextEventTimer();
            if (removeNextEventTimer == null) {
                return;
            }
            StateNamespace namespace = removeNextEventTimer.getNamespace();
            Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
            pushbackSideInputDoFnRunner.onTimer(removeNextEventTimer.getTimerId(), removeNextEventTimer.getTimerFamilyId(), null, ((StateNamespaces.WindowNamespace) namespace).getWindow(), removeNextEventTimer.getTimestamp(), removeNextEventTimer.getOutputTimestamp(), removeNextEventTimer.getDomain());
        }
    }

    private static StateNamespace windowNamespace(IntervalWindow intervalWindow) {
        return StateNamespaces.window(WINDOWING_STRATEGY.getWindowFn().windowCoder(), intervalWindow);
    }

    private DoFnRunner<KV<String, Integer>, Integer> getDoFnRunner(DoFn<KV<String, Integer>, Integer> doFn) {
        return new SimpleDoFnRunner(null, doFn, NullSideInputReader.empty(), null, null, Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WINDOWING_STRATEGY, DoFnSchemaInformation.create(), Collections.emptyMap());
    }
}
