package org.apache.beam.runners.direct;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.TransformInputs;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.class */
public class StatefulParDoEvaluatorFactoryTest implements Serializable {

    @Mock
    private transient EvaluationContext mockEvaluationContext;

    @Mock
    private transient DirectExecutionContext mockExecutionContext;

    @Mock
    private transient DirectExecutionContext.DirectStepContext mockStepContext;

    @Mock
    private transient ReadyCheckingSideInputReader mockSideInputReader;

    @Mock
    private transient UncommittedBundle<Integer> mockUncommittedBundle;
    private static final String KEY = "any-key";
    private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create();
    private final transient PipelineOptions options = PipelineOptionsFactory.create();
    private final transient StateInternals stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(KEY, (CopyOnAccessInMemoryStateInternals) null);

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.mockStepContext.stateInternals()).thenReturn(this.stateInternals);
        Mockito.when(this.mockEvaluationContext.createSideInputReader(Matchers.anyList())).thenReturn(SideInputContainer.create(this.mockEvaluationContext, Collections.emptyList()).createReaderForViews(Collections.emptyList()));
    }

    @Test
    public void windowCleanupScheduled() throws Exception {
        PCollection apply = this.pipeline.apply(Create.of(KV.of("hello", 1), new KV[]{KV.of("hello", 2)})).apply(Window.into(FixedWindows.of(Duration.millis(10L))));
        TupleTag tupleTag = new TupleTag();
        PCollection coder = apply.apply(new ParDoMultiOverrideFactory.GbkThenStatefulParDo(new DoFn<KV<String, Integer>, Integer>() { // from class: org.apache.beam.runners.direct.StatefulParDoEvaluatorFactoryTest.1

            @DoFn.StateId("my-state-id")
            private final StateSpec<ValueState<String>> spec = StateSpecs.value(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void process(DoFn<KV<String, Integer>, Integer>.ProcessContext processContext) {
            }
        }, tupleTag, TupleTagList.empty(), Collections.emptyList())).get(tupleTag).setCoder(VarIntCoder.of());
        StatefulParDoEvaluatorFactory statefulParDoEvaluatorFactory = new StatefulParDoEvaluatorFactory(this.mockEvaluationContext, this.options);
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(coder);
        Mockito.when(this.mockEvaluationContext.getExecutionContext((AppliedPTransform) Matchers.eq(producer), (StructuralKey) Mockito.any())).thenReturn(this.mockExecutionContext);
        Mockito.when(this.mockExecutionContext.getStepContext(Matchers.anyString())).thenReturn(this.mockStepContext);
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(9L));
        IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(10L), new Instant(19L));
        StateNamespace window = StateNamespaces.window(IntervalWindow.getCoder(), intervalWindow);
        StateNamespace window2 = StateNamespaces.window(IntervalWindow.getCoder(), intervalWindow2);
        StateTag tagForSpec = StateTags.tagForSpec("my-state-id", StateSpecs.value(StringUtf8Coder.of()));
        this.stateInternals.state(window, tagForSpec).write("first");
        this.stateInternals.state(window2, tagForSpec).write("second");
        statefulParDoEvaluatorFactory.forApplication(producer, BUNDLE_FACTORY.createBundle(apply).add(WindowedValue.of(KV.of("hello", 1), new Instant(3L), intervalWindow, PaneInfo.NO_FIRING)).add(WindowedValue.of(KV.of("hello", 2), new Instant(11L), intervalWindow2, PaneInfo.NO_FIRING)).commit(Instant.now()));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Runnable.class);
        ((EvaluationContext) Mockito.verify(this.mockEvaluationContext)).scheduleAfterWindowExpiration((AppliedPTransform) Matchers.eq(producer), (BoundedWindow) Matchers.eq(intervalWindow), (WindowingStrategy) Mockito.any(), (Runnable) forClass.capture());
        ((Runnable) forClass.getValue()).run();
        Assert.assertThat((String) this.stateInternals.state(window, tagForSpec).read(), org.hamcrest.Matchers.nullValue());
        Assert.assertThat((String) this.stateInternals.state(window2, tagForSpec).read(), org.hamcrest.Matchers.equalTo("second"));
        ((EvaluationContext) Mockito.verify(this.mockEvaluationContext)).scheduleAfterWindowExpiration((AppliedPTransform) Matchers.eq(producer), (BoundedWindow) Matchers.eq(intervalWindow2), (WindowingStrategy) Mockito.any(), (Runnable) forClass.capture());
        ((Runnable) forClass.getValue()).run();
        Assert.assertThat((String) this.stateInternals.state(window2, tagForSpec).read(), org.hamcrest.Matchers.nullValue());
    }

    @Test
    public void testUnprocessedElements() throws Exception {
        PCollection apply = this.pipeline.apply(Create.of(KV.of("hello", 1), new KV[]{KV.of("hello", 2)})).apply(Window.into(FixedWindows.of(Duration.millis(10L))));
        PCollectionView apply2 = this.pipeline.apply("Create side input", Create.of(42, new Integer[0])).apply("Window side input", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("View side input", View.asList());
        TupleTag tupleTag = new TupleTag();
        PCollection coder = apply.apply(new ParDoMultiOverrideFactory.GbkThenStatefulParDo(new DoFn<KV<String, Integer>, Integer>() { // from class: org.apache.beam.runners.direct.StatefulParDoEvaluatorFactoryTest.2

            @DoFn.StateId("my-state-id")
            private final StateSpec<ValueState<String>> spec = StateSpecs.value(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void process(DoFn<KV<String, Integer>, Integer>.ProcessContext processContext) {
            }
        }, tupleTag, TupleTagList.empty(), Collections.singletonList(apply2))).get(tupleTag).setCoder(VarIntCoder.of());
        StatefulParDoEvaluatorFactory statefulParDoEvaluatorFactory = new StatefulParDoEvaluatorFactory(this.mockEvaluationContext, this.options);
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(coder);
        Mockito.when(this.mockEvaluationContext.getExecutionContext((AppliedPTransform) Matchers.eq(producer), (StructuralKey) Mockito.any())).thenReturn(this.mockExecutionContext);
        Mockito.when(this.mockExecutionContext.getStepContext(Matchers.anyString())).thenReturn(this.mockStepContext);
        Mockito.when(this.mockEvaluationContext.createBundle((PCollection) Matchers.any())).thenReturn(this.mockUncommittedBundle);
        Mockito.when(this.mockStepContext.getTimerUpdate()).thenReturn(WatermarkManager.TimerUpdate.empty());
        Mockito.when(this.mockEvaluationContext.createSideInputReader(Matchers.anyList())).thenReturn(this.mockSideInputReader);
        Mockito.when(Boolean.valueOf(this.mockSideInputReader.isReady((PCollectionView) Matchers.any(), (BoundedWindow) Matchers.any()))).thenReturn(false);
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(9L));
        WindowedValue of = WindowedValue.of(KV.of("hello", 1), new Instant(3L), intervalWindow, PaneInfo.NO_FIRING);
        WindowedValue withValue = of.withValue(KeyedWorkItems.elementsWorkItem("hello", ImmutableList.of(of, of.withValue(KV.of("hello", 13)), of.withValue(KV.of("hello", 15)))));
        TransformEvaluator forApplication = statefulParDoEvaluatorFactory.forApplication(producer, BUNDLE_FACTORY.createBundle((PCollection) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(producer))).add(withValue).commit(Instant.now()));
        forApplication.processElement(withValue);
        TransformResult finishBundle = forApplication.finishBundle();
        ArrayList arrayList = new ArrayList();
        for (WindowedValue windowedValue : finishBundle.getUnprocessedElements()) {
            Assert.assertThat((BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows()), org.hamcrest.Matchers.equalTo(intervalWindow));
            Assert.assertThat((String) ((KeyedWorkItem) windowedValue.getValue()).key(), org.hamcrest.Matchers.equalTo("hello"));
            Iterator it = ((KeyedWorkItem) windowedValue.getValue()).elementsIterable().iterator();
            while (it.hasNext()) {
                arrayList.add((Integer) ((KV) ((WindowedValue) it.next()).getValue()).getValue());
            }
        }
        Assert.assertThat(arrayList, org.hamcrest.Matchers.containsInAnyOrder(new Integer[]{1, 13, 15}));
    }
}
