package org.apache.beam.repackaged.direct_java.runners.core;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.BiFunction;
import org.apache.beam.repackaged.direct_java.runners.core.DoFnRunners;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StatefulDoFnRunnerTest.class */
public class StatefulDoFnRunnerTest {
    private static final long ALLOWED_LATENESS = 1;
    private final TupleTag<Integer> outputTag = new TupleTag<>();

    @Mock
    StepContext mockStepContext;
    private InMemoryStateInternals<String> stateInternals;
    private InMemoryTimerInternals timerInternals;
    private static final long WINDOW_SIZE = 10;
    private static final WindowingStrategy<?, ?> WINDOWING_STRATEGY = WindowingStrategy.of(FixedWindows.of(Duration.millis(WINDOW_SIZE))).withAllowedLateness(Duration.millis(1));
    private static final IntervalWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(WINDOW_SIZE));
    private static final IntervalWindow WINDOW_2 = new IntervalWindow(new Instant(WINDOW_SIZE), new Instant(20));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StatefulDoFnRunnerTest$MyDoFn.class */
    public static abstract class MyDoFn extends DoFn<KV<String, Integer>, Integer> {
        static final String STATE_ID = "foo";

        private MyDoFn() {
        }

        static MyDoFn create(boolean z) {
            return z ? new MyDoFnSorted() : new MyDoFnUnsorted();
        }

        abstract StateSpec<ValueState<Integer>> intState();

        public void processElement(DoFn<KV<String, Integer>, Integer>.ProcessContext processContext, ValueState<Integer> valueState) {
            int intValue = ((Integer) MoreObjects.firstNonNull((Integer) valueState.read(), 0)).intValue() + ((Integer) ((KV) processContext.element()).getValue()).intValue();
            valueState.write(Integer.valueOf(intValue));
            processContext.output(Integer.valueOf(intValue));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StatefulDoFnRunnerTest$MyDoFnSorted.class */
    public static class MyDoFnSorted extends MyDoFn {

        @DoFn.StateId("foo")
        public final StateSpec<ValueState<Integer>> intState;

        private MyDoFnSorted() {
            super();
            this.intState = StateSpecs.value(VarIntCoder.of());
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunnerTest.MyDoFn
        @DoFn.RequiresTimeSortedInput
        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, Integer>, Integer>.ProcessContext processContext, @DoFn.StateId("foo") ValueState<Integer> valueState) {
            super.processElement(processContext, valueState);
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunnerTest.MyDoFn
        StateSpec<ValueState<Integer>> intState() {
            return this.intState;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StatefulDoFnRunnerTest$MyDoFnUnsorted.class */
    public static class MyDoFnUnsorted extends MyDoFn {

        @DoFn.StateId("foo")
        public final StateSpec<ValueState<Integer>> intState;

        private MyDoFnUnsorted() {
            super();
            this.intState = StateSpecs.value(VarIntCoder.of());
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunnerTest.MyDoFn
        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, Integer>, Integer>.ProcessContext processContext, @DoFn.StateId("foo") ValueState<Integer> valueState) {
            super.processElement(processContext, valueState);
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunnerTest.MyDoFn
        StateSpec<ValueState<Integer>> intState() {
            return this.intState;
        }
    }

    private static StateNamespace windowNamespace(IntervalWindow intervalWindow) {
        return StateNamespaces.window(WINDOWING_STRATEGY.getWindowFn().windowCoder(), intervalWindow);
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.stateInternals = new InMemoryStateInternals<>("hello");
        this.timerInternals = new InMemoryTimerInternals();
        Mockito.when(this.mockStepContext.stateInternals()).thenReturn(this.stateInternals);
        Mockito.when(this.mockStepContext.timerInternals()).thenReturn(this.timerInternals);
    }

    @Test
    public void testLateDroppingUnordered() throws Exception {
        testLateDropping(false);
    }

    @Test
    public void testLateDroppingOrdered() throws Exception {
        testLateDropping(true);
    }

    @Test
    public void testGargageCollectUnordered() throws Exception {
        testGarbageCollect(false);
    }

    @Test
    public void testGargageCollectOrdered() throws Exception {
        testGarbageCollect(true);
    }

    @Test
    public void testOutputUnordered() throws Exception {
        testOutput(false);
    }

    @Test
    public void testOutputOrdered() throws Exception {
        testOutput(true);
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testOutputOrderedUnsupported() throws Exception {
        testOutput(true, (myDoFn, outputManager) -> {
            return createStatefulDoFnRunner(myDoFn, outputManager, false);
        });
    }

    @Test
    public void testDataDroppedBasedOnInputWatermarkWhenOrdered() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        Instant instant = new Instant(0L);
        DoFnRunner createStatefulDoFnRunner = createStatefulDoFnRunner(MyDoFn.create(true));
        createStatefulDoFnRunner.startBundle();
        IntervalWindow intervalWindow = new IntervalWindow(instant, instant.plus(Duration.millis(WINDOW_SIZE)));
        createStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), instant, intervalWindow, PaneInfo.NO_FIRING));
        Assert.assertEquals(0L, metricsContainerImpl.m136getCounter(MetricName.named(StatefulDoFnRunner.class, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue());
        this.timerInternals.advanceInputWatermark(instant.plus(Duration.millis(2L)));
        createStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), instant, intervalWindow, PaneInfo.NO_FIRING));
        Assert.assertEquals(1L, metricsContainerImpl.m136getCounter(MetricName.named(StatefulDoFnRunner.class, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue());
        createStatefulDoFnRunner.finishBundle();
    }

    private void testLateDropping(boolean z) throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        this.timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.timerInternals.advanceOutputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        DoFnRunner createStatefulDoFnRunner = createStatefulDoFnRunner(MyDoFn.create(z));
        createStatefulDoFnRunner.startBundle();
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(WINDOW_SIZE));
        createStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), new Instant(0L), intervalWindow, PaneInfo.NO_FIRING));
        Assert.assertEquals(1L, metricsContainerImpl.m136getCounter(MetricName.named(StatefulDoFnRunner.class, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue());
        createStatefulDoFnRunner.finishBundle();
    }

    private void testGarbageCollect(boolean z) throws Exception {
        this.timerInternals.advanceInputWatermark(new Instant(1L));
        MyDoFn create = MyDoFn.create(z);
        StateTag tagForSpec = StateTags.tagForSpec("foo", create.intState());
        DoFnRunner createStatefulDoFnRunner = createStatefulDoFnRunner(create);
        Instant instant = new Instant(1L);
        createStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), instant, WINDOW_1, PaneInfo.NO_FIRING));
        if (z) {
            advanceInputWatermark(this.timerInternals, instant.plus(Duration.millis(2L)), createStatefulDoFnRunner);
        }
        Assert.assertEquals(1L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_1), tagForSpec).read()).intValue());
        createStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), instant.plus(Duration.millis(WINDOW_SIZE)), WINDOW_2, PaneInfo.NO_FIRING));
        createStatefulDoFnRunner.processElement(WindowedValue.of(KV.of("hello", 1), instant.plus(Duration.millis(WINDOW_SIZE)), WINDOW_2, PaneInfo.NO_FIRING));
        if (z) {
            advanceInputWatermark(this.timerInternals, instant.plus(Duration.millis(12L)), createStatefulDoFnRunner);
        }
        Assert.assertEquals(2L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec).read()).intValue());
        advanceInputWatermark(this.timerInternals, instant.plus(Duration.millis(12L)), createStatefulDoFnRunner);
        Assert.assertTrue(this.stateInternals.isEmptyForTesting(this.stateInternals.state(windowNamespace(WINDOW_1), tagForSpec)));
        Assert.assertEquals(2L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec).read()).intValue());
        advanceInputWatermark(this.timerInternals, WINDOW_2.maxTimestamp().plus(Duration.millis(1L)).plus(Duration.millis(1L)).plus(Duration.millis(1L)), createStatefulDoFnRunner);
        Assert.assertTrue(this.stateInternals.isEmptyForTesting(this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec)));
    }

    private void testOutput(boolean z) throws Exception {
        testOutput(z, (v1, v2) -> {
            return createStatefulDoFnRunner(v1, v2);
        });
    }

    private void testOutput(boolean z, BiFunction<MyDoFn, DoFnRunners.OutputManager, DoFnRunner<KV<String, Integer>, Integer>> biFunction) throws Exception {
        this.timerInternals.advanceInputWatermark(new Instant(1L));
        MyDoFn create = MyDoFn.create(z);
        StateTag tagForSpec = StateTags.tagForSpec("foo", create.intState());
        ArrayList arrayList = new ArrayList();
        DoFnRunner<KV<String, Integer>, Integer> apply = biFunction.apply(create, asOutputManager(arrayList));
        Instant instant = new Instant(5L);
        apply.processElement(WindowedValue.of(KV.of("hello", 1), instant, WINDOW_1, PaneInfo.NO_FIRING));
        apply.processElement(WindowedValue.of(KV.of("hello", 2), instant.minus(Duration.millis(1L)), WINDOW_1, PaneInfo.NO_FIRING));
        if (z) {
            advanceInputWatermark(this.timerInternals, instant.plus(Duration.millis(2L)), apply);
        }
        Assert.assertEquals(3L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_1), tagForSpec).read()).intValue());
        Assert.assertEquals(2L, arrayList.size());
        if (z) {
            Assert.assertEquals(Arrays.asList(KV.of(this.outputTag, WindowedValue.of(2, instant.minus(Duration.millis(1L)), WINDOW_1, PaneInfo.NO_FIRING)), KV.of(this.outputTag, WindowedValue.of(3, instant, WINDOW_1, PaneInfo.NO_FIRING))), arrayList);
        } else {
            Assert.assertEquals(Arrays.asList(KV.of(this.outputTag, WindowedValue.of(1, instant, WINDOW_1, PaneInfo.NO_FIRING)), KV.of(this.outputTag, WindowedValue.of(3, instant.minus(Duration.millis(1L)), WINDOW_1, PaneInfo.NO_FIRING))), arrayList);
        }
        arrayList.clear();
        Instant plus = instant.plus(Duration.millis(WINDOW_SIZE));
        apply.processElement(WindowedValue.of(KV.of("hello", 1), plus, WINDOW_2, PaneInfo.NO_FIRING));
        apply.processElement(WindowedValue.of(KV.of("hello", 2), plus.minus(Duration.millis(1L)), WINDOW_2, PaneInfo.NO_FIRING));
        apply.processElement(WindowedValue.of(KV.of("hello", 3), plus.minus(Duration.millis(2L)), WINDOW_2, PaneInfo.NO_FIRING));
        if (z) {
            advanceInputWatermark(this.timerInternals, plus.plus(Duration.millis(2L)), apply);
        }
        Assert.assertEquals(6L, ((Integer) this.stateInternals.state(windowNamespace(WINDOW_2), tagForSpec).read()).intValue());
        Assert.assertEquals(3L, arrayList.size());
        if (z) {
            Assert.assertEquals(Arrays.asList(KV.of(this.outputTag, WindowedValue.of(3, plus.minus(Duration.millis(2L)), WINDOW_2, PaneInfo.NO_FIRING)), KV.of(this.outputTag, WindowedValue.of(5, plus.minus(Duration.millis(1L)), WINDOW_2, PaneInfo.NO_FIRING)), KV.of(this.outputTag, WindowedValue.of(6, plus, WINDOW_2, PaneInfo.NO_FIRING))), arrayList);
        } else {
            Assert.assertEquals(Arrays.asList(KV.of(this.outputTag, WindowedValue.of(1, plus, WINDOW_2, PaneInfo.NO_FIRING)), KV.of(this.outputTag, WindowedValue.of(3, plus.minus(Duration.millis(1L)), WINDOW_2, PaneInfo.NO_FIRING)), KV.of(this.outputTag, WindowedValue.of(6, plus.minus(Duration.millis(2L)), WINDOW_2, PaneInfo.NO_FIRING))), arrayList);
        }
    }

    private DoFnRunner createStatefulDoFnRunner(DoFn<KV<String, Integer>, Integer> doFn) {
        return createStatefulDoFnRunner(doFn, null);
    }

    private DoFnRunner createStatefulDoFnRunner(DoFn<KV<String, Integer>, Integer> doFn, DoFnRunners.OutputManager outputManager) {
        return createStatefulDoFnRunner(doFn, outputManager, true);
    }

    private DoFnRunner createStatefulDoFnRunner(DoFn<KV<String, Integer>, Integer> doFn, DoFnRunners.OutputManager outputManager, boolean z) {
        return DoFnRunners.defaultStatefulDoFnRunner(doFn, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), getDoFnRunner(doFn, outputManager), this.mockStepContext, WINDOWING_STRATEGY, new StatefulDoFnRunner.TimeInternalsCleanupTimer(this.timerInternals, WINDOWING_STRATEGY), new StatefulDoFnRunner.StateInternalsStateCleaner(doFn, this.stateInternals, WINDOWING_STRATEGY.getWindowFn().windowCoder()), z);
    }

    private DoFnRunner<KV<String, Integer>, Integer> getDoFnRunner(DoFn<KV<String, Integer>, Integer> doFn, DoFnRunners.OutputManager outputManager) {
        return new SimpleDoFnRunner(null, doFn, NullSideInputReader.empty(), (DoFnRunners.OutputManager) MoreObjects.firstNonNull(outputManager, discardingOutputManager()), this.outputTag, Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WINDOWING_STRATEGY, DoFnSchemaInformation.create(), Collections.emptyMap());
    }

    private DoFnRunners.OutputManager discardingOutputManager() {
        return new DoFnRunners.OutputManager() { // from class: org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunnerTest.1
            @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunners.OutputManager
            public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            }
        };
    }

    private static void advanceInputWatermark(InMemoryTimerInternals inMemoryTimerInternals, Instant instant, DoFnRunner<?, ?> doFnRunner) throws Exception {
        inMemoryTimerInternals.advanceInputWatermark(instant);
        while (true) {
            TimerInternals.TimerData removeNextEventTimer = inMemoryTimerInternals.removeNextEventTimer();
            if (removeNextEventTimer == null) {
                return;
            }
            StateNamespace namespace = removeNextEventTimer.getNamespace();
            Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
            doFnRunner.onTimer(removeNextEventTimer.getTimerId(), removeNextEventTimer.getTimerFamilyId(), null, ((StateNamespaces.WindowNamespace) namespace).getWindow(), removeNextEventTimer.getTimestamp(), removeNextEventTimer.getOutputTimestamp(), removeNextEventTimer.getDomain());
        }
    }

    private static DoFnRunners.OutputManager asOutputManager(final List<KV<TupleTag<?>, WindowedValue<?>>> list) {
        return new DoFnRunners.OutputManager() { // from class: org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunnerTest.2
            @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunners.OutputManager
            public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                list.add(KV.of(tupleTag, windowedValue));
            }
        };
    }
}
