package org.apache.beam.repackaged.beam_runners_direct_java.runners.core;

import java.util.Collections;
import java.util.Objects;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StatefulDoFnRunner;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/core/StatefulDoFnRunnerTest.class */
public class StatefulDoFnRunnerTest {
    private static final long ALLOWED_LATENESS = 1;

    @Mock
    StepContext mockStepContext;
    private InMemoryStateInternals<String> stateInternals;
    private InMemoryTimerInternals timerInternals;
    private static final long WINDOW_SIZE = 10;
    private static final WindowingStrategy<?, ?> WINDOWING_STRATEGY = WindowingStrategy.of(FixedWindows.of(Duration.millis(WINDOW_SIZE))).withAllowedLateness(Duration.millis(1));
    private static final IntervalWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(WINDOW_SIZE));
    private static final IntervalWindow WINDOW_2 = new IntervalWindow(new Instant(WINDOW_SIZE), new Instant(20));

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/core/StatefulDoFnRunnerTest$MyDoFn.class */
    private static class MyDoFn extends DoFn<KV<String, Integer>, Integer> {
        public final String stateId = "foo";

        @DoFn.StateId("foo")
        public final StateSpec<ValueState<Integer>> intState;

        private MyDoFn() {
            this.stateId = "foo";
            this.intState = StateSpecs.value(VarIntCoder.of());
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, Integer>, Integer>.ProcessContext processContext, @DoFn.StateId("foo") ValueState<Integer> valueState) {
            valueState.write(Integer.valueOf(((Integer) MoreObjects.firstNonNull((Integer) valueState.read(), 0)).intValue() + 1));
        }
    }

    private static StateNamespace windowNamespace(IntervalWindow intervalWindow) {
        return StateNamespaces.window(WINDOWING_STRATEGY.getWindowFn().windowCoder(), intervalWindow);
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.mockStepContext.timerInternals()).thenReturn(this.timerInternals);
        this.stateInternals = new InMemoryStateInternals<>("hello");
        this.timerInternals = new InMemoryTimerInternals();
        Mockito.when(this.mockStepContext.stateInternals()).thenReturn(this.stateInternals);
        Mockito.when(this.mockStepContext.timerInternals()).thenReturn(this.timerInternals);
    }

    @Test
    public void testLateDropping() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        this.timerInternals.advanceInputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
        this.timerInternals.advanceOutputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
        MyDoFn myDoFn = new MyDoFn();
        DoFnRunner defaultStatefulDoFnRunner = DoFnRunners.defaultStatefulDoFnRunner(myDoFn, getDoFnRunner(myDoFn), WINDOWING_STRATEGY, new StatefulDoFnRunner.TimeInternalsCleanupTimer(this.timerInternals, WINDOWING_STRATEGY), new StatefulDoFnRunner.StateInternalsStateCleaner(myDoFn, this.stateInternals, WINDOWING_STRATEGY.getWindowFn().windowCoder()));
        defaultStatefulDoFnRunner.startBundle();
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(WINDOW_SIZE));
        defaultStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), new Instant(0L), intervalWindow, PaneInfo.NO_FIRING));
        Assert.assertEquals(1L, metricsContainerImpl.m596getCounter(MetricName.named(StatefulDoFnRunner.class, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue());
        defaultStatefulDoFnRunner.finishBundle();
    }

    @Test
    public void testGarbageCollect() throws Exception {
        this.timerInternals.advanceInputWatermark(new Instant(1L));
        MyDoFn myDoFn = new MyDoFn();
        Objects.requireNonNull(myDoFn);
        StateTag tagForSpec = StateTags.tagForSpec("foo", myDoFn.intState);
        DoFnRunner defaultStatefulDoFnRunner = DoFnRunners.defaultStatefulDoFnRunner(myDoFn, getDoFnRunner(myDoFn), WINDOWING_STRATEGY, new StatefulDoFnRunner.TimeInternalsCleanupTimer(this.timerInternals, WINDOWING_STRATEGY), new StatefulDoFnRunner.StateInternalsStateCleaner(myDoFn, this.stateInternals, WINDOWING_STRATEGY.getWindowFn().windowCoder()));
        Instant instant = new Instant(1L);
        defaultStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), instant, WINDOW_1, PaneInfo.NO_FIRING));
        Assert.assertEquals(1L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_1), tagForSpec).read()).intValue());
        defaultStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), instant.plus(WINDOW_SIZE), WINDOW_2, PaneInfo.NO_FIRING));
        defaultStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), instant.plus(WINDOW_SIZE), WINDOW_2, PaneInfo.NO_FIRING));
        Assert.assertEquals(2L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec).read()).intValue());
        advanceInputWatermark(this.timerInternals, WINDOW_1.maxTimestamp().plus(1L).plus(1L).plus(1L), defaultStatefulDoFnRunner);
        Assert.assertTrue(this.stateInternals.isEmptyForTesting(this.stateInternals.state(windowNamespace(WINDOW_1), tagForSpec)));
        Assert.assertEquals(2L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec).read()).intValue());
        advanceInputWatermark(this.timerInternals, WINDOW_2.maxTimestamp().plus(1L).plus(1L).plus(1L), defaultStatefulDoFnRunner);
        Assert.assertTrue(this.stateInternals.isEmptyForTesting(this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec)));
    }

    private DoFnRunner<KV<String, Integer>, Integer> getDoFnRunner(DoFn<KV<String, Integer>, Integer> doFn) {
        return new SimpleDoFnRunner(null, doFn, NullSideInputReader.empty(), null, null, Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WINDOWING_STRATEGY);
    }

    private static void advanceInputWatermark(InMemoryTimerInternals inMemoryTimerInternals, Instant instant, DoFnRunner<?, ?> doFnRunner) throws Exception {
        inMemoryTimerInternals.advanceInputWatermark(instant);
        while (true) {
            TimerInternals.TimerData removeNextEventTimer = inMemoryTimerInternals.removeNextEventTimer();
            if (removeNextEventTimer == null) {
                return;
            }
            StateNamespace namespace = removeNextEventTimer.getNamespace();
            Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
            doFnRunner.onTimer(removeNextEventTimer.getTimerId(), ((StateNamespaces.WindowNamespace) namespace).getWindow(), removeNextEventTimer.getTimestamp(), removeNextEventTimer.getDomain());
        }
    }
}
