package org.apache.beam.runners.direct;

import java.util.Collection;
import java.util.Collections;
import org.apache.beam.runners.core.WindowMatchers;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.class */
public class WindowEvaluatorFactoryTest {
    private static final Instant EPOCH = new Instant(0);
    private PCollection<Long> input;
    private WindowEvaluatorFactory factory;

    @Mock
    private EvaluationContext evaluationContext;
    private BundleFactory bundleFactory;
    private WindowedValue<Long> valueInGlobalWindow = WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2));
    private WindowedValue<Long> valueInIntervalWindow = WindowedValue.of(2L, new Instant(-10), new IntervalWindow(new Instant(-100), EPOCH), PaneInfo.NO_FIRING);
    private IntervalWindow intervalWindow1 = new IntervalWindow(EPOCH, BoundedWindow.TIMESTAMP_MAX_VALUE);
    private IntervalWindow intervalWindow2 = new IntervalWindow(EPOCH.plus(Duration.standardDays(3)), EPOCH.plus(Duration.standardDays(6)));
    private WindowedValue<Long> valueInGlobalAndTwoIntervalWindows = WindowedValue.of(1L, EPOCH.plus(Duration.standardDays(3)), ImmutableList.of(GlobalWindow.INSTANCE, this.intervalWindow1, this.intervalWindow2), PaneInfo.NO_FIRING);

    /* loaded from: input_file:org/apache/beam/runners/direct/WindowEvaluatorFactoryTest$EvaluatorTestWindowFn.class */
    private static class EvaluatorTestWindowFn extends NonMergingWindowFn<Long, BoundedWindow> {
        private EvaluatorTestWindowFn() {
        }

        public Collection<BoundedWindow> assignWindows(WindowFn<Long, BoundedWindow>.AssignContext assignContext) throws Exception {
            return assignContext.window().equals(GlobalWindow.INSTANCE) ? Collections.singleton(new IntervalWindow(assignContext.timestamp(), assignContext.timestamp().plus(1L))) : Collections.singleton(assignContext.window());
        }

        public boolean isCompatible(WindowFn<?, ?> windowFn) {
            return false;
        }

        public Coder<BoundedWindow> windowCoder() {
            return GlobalWindow.Coder.INSTANCE;
        }

        public BoundedWindow getSideInputWindow(BoundedWindow boundedWindow) {
            return null;
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.input = TestPipeline.create().apply(Create.of(new Long[]{1L, 2L, 3L}));
        this.bundleFactory = ImmutableListBundleFactory.create();
        this.factory = new WindowEvaluatorFactory(this.evaluationContext);
    }

    @Test
    public void nullWindowFunSucceeds() throws Exception {
        Window.Bound<Long> accumulatingFiredPanes = Window.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))).accumulatingFiredPanes();
        PCollection<Long> pCollection = (PCollection) this.input.apply(accumulatingFiredPanes);
        DirectRunner.CommittedBundle<Long> createInputBundle = createInputBundle();
        DirectRunner.UncommittedBundle<Long> createOutputBundle = createOutputBundle(pCollection, createInputBundle);
        Assert.assertThat(Iterables.getOnlyElement(runEvaluator(pCollection, createInputBundle, accumulatingFiredPanes).getOutputBundles()), Matchers.equalTo(createOutputBundle));
        Assert.assertThat(createOutputBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{this.valueInIntervalWindow, this.valueInGlobalWindow, this.valueInGlobalAndTwoIntervalWindows}));
    }

    @Test
    public void singleWindowFnSucceeds() throws Exception {
        Duration standardDays = Duration.standardDays(7L);
        Window.Bound<Long> into = Window.into(FixedWindows.of(standardDays));
        PCollection<Long> pCollection = (PCollection) this.input.apply(into);
        DirectRunner.CommittedBundle<Long> createInputBundle = createInputBundle();
        DirectRunner.UncommittedBundle<Long> createOutputBundle = createOutputBundle(pCollection, createInputBundle);
        IntervalWindow intervalWindow = new IntervalWindow(EPOCH, EPOCH.plus(standardDays));
        IntervalWindow intervalWindow2 = new IntervalWindow(EPOCH.minus(standardDays), EPOCH);
        Assert.assertThat(Iterables.getOnlyElement(runEvaluator(pCollection, createInputBundle, into).getOutputBundles()), Matchers.equalTo(createOutputBundle));
        Assert.assertThat(createOutputBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new Matcher[]{WindowMatchers.isSingleWindowedValue(3L, new Instant(2L), intervalWindow, PaneInfo.NO_FIRING), WindowMatchers.isSingleWindowedValue(2L, new Instant(-10L), intervalWindow2, PaneInfo.NO_FIRING), WindowMatchers.isSingleWindowedValue(1L, EPOCH.plus(Duration.standardDays(3L)), intervalWindow, PaneInfo.NO_FIRING), WindowMatchers.isSingleWindowedValue(1L, EPOCH.plus(Duration.standardDays(3L)), intervalWindow, PaneInfo.NO_FIRING), WindowMatchers.isSingleWindowedValue(1L, EPOCH.plus(Duration.standardDays(3L)), intervalWindow, PaneInfo.NO_FIRING)}));
    }

    @Test
    public void multipleWindowsWindowFnSucceeds() throws Exception {
        Duration standardDays = Duration.standardDays(6L);
        Duration standardDays2 = Duration.standardDays(3L);
        Window.Bound<Long> into = Window.into(SlidingWindows.of(standardDays).every(standardDays2));
        PCollection<Long> pCollection = (PCollection) this.input.apply(into);
        DirectRunner.CommittedBundle<Long> createInputBundle = createInputBundle();
        DirectRunner.UncommittedBundle<Long> createOutputBundle = createOutputBundle(pCollection, createInputBundle);
        Assert.assertThat(Iterables.getOnlyElement(runEvaluator(pCollection, createInputBundle, into).getOutputBundles()), Matchers.equalTo(createOutputBundle));
        DirectRunner.CommittedBundle commit = createOutputBundle.commit(Instant.now());
        IntervalWindow intervalWindow = new IntervalWindow(EPOCH, EPOCH.plus(standardDays));
        IntervalWindow intervalWindow2 = new IntervalWindow(EPOCH.plus(standardDays2), EPOCH.plus(standardDays2).plus(standardDays));
        IntervalWindow intervalWindow3 = new IntervalWindow(EPOCH.minus(standardDays), EPOCH);
        IntervalWindow intervalWindow4 = new IntervalWindow(EPOCH.minus(standardDays).plus(standardDays2), EPOCH.plus(standardDays2));
        Assert.assertThat(commit.getElements(), Matchers.containsInAnyOrder(new Matcher[]{WindowMatchers.isWindowedValue(this.valueInGlobalWindow.getValue(), this.valueInGlobalWindow.getTimestamp(), ImmutableSet.of(intervalWindow, intervalWindow4), PaneInfo.NO_FIRING), WindowMatchers.isWindowedValue(this.valueInIntervalWindow.getValue(), this.valueInIntervalWindow.getTimestamp(), ImmutableSet.of(intervalWindow3, intervalWindow4), PaneInfo.NO_FIRING), WindowMatchers.isWindowedValue(this.valueInGlobalAndTwoIntervalWindows.getValue(), this.valueInGlobalAndTwoIntervalWindows.getTimestamp(), ImmutableSet.of(intervalWindow, intervalWindow2), PaneInfo.NO_FIRING), WindowMatchers.isWindowedValue(this.valueInGlobalAndTwoIntervalWindows.getValue(), this.valueInGlobalAndTwoIntervalWindows.getTimestamp(), ImmutableSet.of(intervalWindow, intervalWindow2), PaneInfo.NO_FIRING), WindowMatchers.isWindowedValue(this.valueInGlobalAndTwoIntervalWindows.getValue(), this.valueInGlobalAndTwoIntervalWindows.getTimestamp(), ImmutableSet.of(intervalWindow, intervalWindow2), PaneInfo.NO_FIRING)}));
    }

    @Test
    public void referencesEarlierWindowsSucceeds() throws Exception {
        Window.Bound<Long> into = Window.into(new EvaluatorTestWindowFn());
        PCollection<Long> pCollection = (PCollection) this.input.apply(into);
        DirectRunner.CommittedBundle<Long> createInputBundle = createInputBundle();
        DirectRunner.UncommittedBundle<Long> createOutputBundle = createOutputBundle(pCollection, createInputBundle);
        Assert.assertThat(Iterables.getOnlyElement(runEvaluator(pCollection, createInputBundle, into).getOutputBundles()), Matchers.equalTo(createOutputBundle));
        Assert.assertThat(createOutputBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new Matcher[]{WindowMatchers.isSingleWindowedValue(this.valueInGlobalWindow.getValue(), this.valueInGlobalWindow.getTimestamp(), new IntervalWindow(this.valueInGlobalWindow.getTimestamp(), this.valueInGlobalWindow.getTimestamp().plus(1L)), PaneInfo.NO_FIRING), WindowMatchers.isWindowedValue(this.valueInIntervalWindow.getValue(), this.valueInIntervalWindow.getTimestamp(), this.valueInIntervalWindow.getWindows(), PaneInfo.NO_FIRING), WindowMatchers.isSingleWindowedValue(this.valueInGlobalAndTwoIntervalWindows.getValue(), this.valueInGlobalAndTwoIntervalWindows.getTimestamp(), new IntervalWindow(this.valueInGlobalAndTwoIntervalWindows.getTimestamp(), this.valueInGlobalAndTwoIntervalWindows.getTimestamp().plus(1L)), PaneInfo.NO_FIRING), WindowMatchers.isSingleWindowedValue(this.valueInGlobalAndTwoIntervalWindows.getValue(), this.valueInGlobalAndTwoIntervalWindows.getTimestamp(), this.intervalWindow1, PaneInfo.NO_FIRING), WindowMatchers.isSingleWindowedValue(this.valueInGlobalAndTwoIntervalWindows.getValue(), this.valueInGlobalAndTwoIntervalWindows.getTimestamp(), this.intervalWindow2, PaneInfo.NO_FIRING)}));
    }

    private DirectRunner.CommittedBundle<Long> createInputBundle() {
        return this.bundleFactory.createBundle(this.input).add(this.valueInGlobalWindow).add(this.valueInGlobalAndTwoIntervalWindows).add(this.valueInIntervalWindow).commit(Instant.now());
    }

    private DirectRunner.UncommittedBundle<Long> createOutputBundle(PCollection<Long> pCollection, DirectRunner.CommittedBundle<Long> committedBundle) {
        DirectRunner.UncommittedBundle<Long> createBundle = this.bundleFactory.createBundle(pCollection);
        Mockito.when(this.evaluationContext.createBundle(pCollection)).thenReturn(createBundle);
        return createBundle;
    }

    private TransformResult<Long> runEvaluator(PCollection<Long> pCollection, DirectRunner.CommittedBundle<Long> committedBundle, Window.Bound<Long> bound) throws Exception {
        TransformEvaluator forApplication = this.factory.forApplication(AppliedPTransform.of("Window", this.input, pCollection, bound), committedBundle);
        forApplication.processElement(this.valueInGlobalWindow);
        forApplication.processElement(this.valueInGlobalAndTwoIntervalWindows);
        forApplication.processElement(this.valueInIntervalWindow);
        return forApplication.finishBundle();
    }
}
