package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.beam.repackaged.direct_java.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/ParDoEvaluatorTest.class */
public class ParDoEvaluatorTest {

    @Mock
    private EvaluationContext evaluationContext;
    private PCollection<Integer> inputPc;
    private TupleTag<Integer> mainOutputTag;
    private List<TupleTag<?>> additionalOutputTags;
    private BundleFactory bundleFactory;

    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoEvaluatorTest$ReadyInGlobalWindowReader.class */
    public static class ReadyInGlobalWindowReader implements ReadyCheckingSideInputReader {
        private ReadyInGlobalWindowReader() {
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.SideInputReader
        public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
            if (boundedWindow.equals(GlobalWindow.INSTANCE)) {
                return (T) 5;
            }
            Assert.fail("Should only call get in the Global Window, others are not ready");
            throw new AssertionError("Unreachable");
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.SideInputReader
        public <T> boolean contains(PCollectionView<T> pCollectionView) {
            return true;
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.SideInputReader
        public boolean isEmpty() {
            return false;
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.ReadyCheckingSideInputReader
        public boolean isReady(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow) {
            return boundedWindow.equals(GlobalWindow.INSTANCE);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoEvaluatorTest$RecorderFn.class */
    private static class RecorderFn extends DoFn<Integer, Integer> {
        private Collection<Integer> processed = new ArrayList();
        private final PCollectionView<Integer> view;

        public RecorderFn(PCollectionView<Integer> pCollectionView) {
            this.view = pCollectionView;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) throws Exception {
            this.processed.add((Integer) processContext.element());
            processContext.output(Integer.valueOf(((Integer) processContext.element()).intValue() + ((Integer) processContext.sideInput(this.view)).intValue()));
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.inputPc = this.p.apply(Create.of(1, new Integer[]{2, 3}));
        this.mainOutputTag = new TupleTag<Integer>() { // from class: org.apache.beam.runners.direct.ParDoEvaluatorTest.1
        };
        this.additionalOutputTags = TupleTagList.empty().getAll();
        this.bundleFactory = ImmutableListBundleFactory.create();
    }

    @Test
    public void sideInputsNotReadyResultHasUnprocessedElements() {
        PCollectionView<Integer> pCollectionView = (PCollectionView) this.inputPc.apply(Window.into(new IdentitySideInputWindowFn())).apply(View.asSingleton().withDefaultValue(0));
        RecorderFn recorderFn = new RecorderFn(pCollectionView);
        PCollection<Integer> pCollection = (PCollection) this.inputPc.apply(ParDo.of(recorderFn).withSideInputs(new PCollectionView[]{pCollectionView}));
        UncommittedBundle createBundle = this.bundleFactory.createBundle(pCollection);
        Mockito.when(this.evaluationContext.createBundle(pCollection)).thenReturn(createBundle);
        ParDoEvaluator<Integer> createEvaluator = createEvaluator(pCollectionView, recorderFn, this.inputPc, pCollection);
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(10000L));
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(3);
        WindowedValue of = WindowedValue.of(2, new Instant(1234L), intervalWindow, PaneInfo.NO_FIRING);
        WindowedValue of2 = WindowedValue.of(1, new Instant(2468L), ImmutableList.of(intervalWindow, GlobalWindow.INSTANCE), PaneInfo.NO_FIRING);
        createEvaluator.processElement(valueInGlobalWindow);
        createEvaluator.processElement(of);
        createEvaluator.processElement(of2);
        TransformResult finishBundle = createEvaluator.finishBundle();
        MatcherAssert.assertThat(finishBundle.getUnprocessedElements(), Matchers.containsInAnyOrder(new WindowedValue[]{of, WindowedValue.of(1, new Instant(2468L), intervalWindow, PaneInfo.NO_FIRING)}));
        MatcherAssert.assertThat(finishBundle.getOutputBundles(), Matchers.contains(new UncommittedBundle[]{createBundle}));
        MatcherAssert.assertThat(recorderFn.processed, Matchers.containsInAnyOrder(new Integer[]{1, 3}));
        MatcherAssert.assertThat(((UncommittedBundle) Iterables.getOnlyElement(finishBundle.getOutputBundles())).commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow.withValue(8), WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L))}));
    }

    private ParDoEvaluator<Integer> createEvaluator(PCollectionView<Integer> pCollectionView, RecorderFn recorderFn, PCollection<Integer> pCollection, PCollection<Integer> pCollection2) {
        Mockito.when(this.evaluationContext.createSideInputReader(ImmutableList.of(pCollectionView))).thenReturn(new ReadyInGlobalWindowReader());
        DirectExecutionContext directExecutionContext = (DirectExecutionContext) Mockito.mock(DirectExecutionContext.class);
        DirectExecutionContext.DirectStepContext directStepContext = (DirectExecutionContext.DirectStepContext) Mockito.mock(DirectExecutionContext.DirectStepContext.class);
        Mockito.when(directExecutionContext.getStepContext((String) Mockito.any(String.class))).thenReturn(directStepContext);
        Mockito.when(directStepContext.getTimerUpdate()).thenReturn(WatermarkManager.TimerUpdate.empty());
        Mockito.when(this.evaluationContext.getExecutionContext((AppliedPTransform) Mockito.any(AppliedPTransform.class), (StructuralKey) Mockito.any(StructuralKey.class))).thenReturn(directExecutionContext);
        DirectGraphs.performDirectOverrides(this.p);
        return ParDoEvaluator.create(this.evaluationContext, PipelineOptionsFactory.create(), directStepContext, DirectGraphs.getProducer(pCollection2), pCollection.getCoder(), pCollection.getWindowingStrategy(), recorderFn, (StructuralKey) null, ImmutableList.of(pCollectionView), this.mainOutputTag, this.additionalOutputTags, ImmutableMap.of(this.mainOutputTag, pCollection2), DoFnSchemaInformation.create(), Collections.emptyMap(), ParDoEvaluator.defaultRunnerFactory());
    }
}
