package org.apache.beam.runners.direct;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/SideInputContainerTest.class */
public class SideInputContainerTest {
    private static final BoundedWindow FIRST_WINDOW = new BoundedWindow() { // from class: org.apache.beam.runners.direct.SideInputContainerTest.1
        public Instant maxTimestamp() {
            return new Instant(789541L);
        }

        public String toString() {
            return "firstWindow";
        }
    };
    private static final BoundedWindow SECOND_WINDOW = new BoundedWindow() { // from class: org.apache.beam.runners.direct.SideInputContainerTest.2
        public Instant maxTimestamp() {
            return new Instant(14564786L);
        }

        public String toString() {
            return "secondWindow";
        }
    };

    @Rule
    public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Mock
    private EvaluationContext context;
    private SideInputContainer container;
    private PCollectionView<Map<String, Integer>> mapView;
    private PCollectionView<Double> singletonView;
    private PCollectionView<Iterable<Integer>> iterableView;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        PCollection apply = this.pipeline.apply("forBaseCollection", Create.of(1, new Integer[]{2, 3, 4}));
        this.mapView = apply.apply("forKeyTypes", WithKeys.of("foo")).apply("asMapView", View.asMap());
        this.singletonView = apply.apply("forCombinedTypes", Mean.globally().asSingletonView());
        this.iterableView = apply.apply("asIterableView", View.asIterable());
        this.container = SideInputContainer.create(this.context, ImmutableList.of(this.iterableView, this.mapView, this.singletonView));
    }

    @Test
    public void getAfterWriteReturnsPaneInWindow() throws Exception {
        this.container.write(this.mapView, ImmutableList.of(WindowedValue.of(KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.of(KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
        Map map = (Map) this.container.createReaderForViews(ImmutableList.of(this.mapView)).get(this.mapView, FIRST_WINDOW);
        Assert.assertThat(map, Matchers.hasEntry("one", 1));
        Assert.assertThat(map, Matchers.hasEntry("two", 2));
        Assert.assertThat(Integer.valueOf(map.size()), Matchers.is(2));
    }

    @Test
    public void getReturnsLatestPaneInWindow() throws Exception {
        this.container.write(this.mapView, ImmutableList.of(WindowedValue.of(KV.of("one", 1), new Instant(1L), SECOND_WINDOW, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), WindowedValue.of(KV.of("two", 2), new Instant(20L), SECOND_WINDOW, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY))));
        Map map = (Map) this.container.createReaderForViews(ImmutableList.of(this.mapView)).get(this.mapView, SECOND_WINDOW);
        Assert.assertThat(map, Matchers.hasEntry("one", 1));
        Assert.assertThat(map, Matchers.hasEntry("two", 2));
        Assert.assertThat(Integer.valueOf(map.size()), Matchers.is(2));
        this.container.write(this.mapView, ImmutableList.of(WindowedValue.of(KV.of("three", 3), new Instant(300L), SECOND_WINDOW, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1L, -1L))));
        Map map2 = (Map) this.container.createReaderForViews(ImmutableList.of(this.mapView)).get(this.mapView, SECOND_WINDOW);
        Assert.assertThat(map2, Matchers.hasEntry("three", 3));
        Assert.assertThat(Integer.valueOf(map2.size()), Matchers.is(1));
    }

    @Test
    public void getNotReadyThrows() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("not ready");
        this.container.createReaderForViews(ImmutableList.of(this.mapView)).get(this.mapView, GlobalWindow.INSTANCE);
    }

    @Test
    public void withPCollectionViewsErrorsForContainsNotInViews() {
        PCollectionView multimapView = PCollectionViews.multimapView(this.pipeline, WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("with unknown views " + ImmutableList.of(multimapView).toString());
        this.container.createReaderForViews(ImmutableList.of(multimapView));
    }

    @Test
    public void withViewsForViewNotInContainerFails() {
        PCollectionView multimapView = PCollectionViews.multimapView(this.pipeline, WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("unknown views");
        this.thrown.expectMessage(multimapView.toString());
        this.container.createReaderForViews(ImmutableList.of(multimapView));
    }

    @Test
    public void getOnReaderForViewNotInReaderFails() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("unknown view: " + this.iterableView.toString());
        this.container.createReaderForViews(ImmutableList.of(this.mapView)).get(this.iterableView, GlobalWindow.INSTANCE);
    }

    @Test
    public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
        this.container.write(this.singletonView, ImmutableList.of(WindowedValue.of(Double.valueOf(2.875d), FIRST_WINDOW.maxTimestamp().minus(200L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.of(Double.valueOf(4.125d), SECOND_WINDOW.maxTimestamp().minus(2000000L), SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
        Assert.assertThat(this.container.createReaderForViews(ImmutableList.of(this.singletonView)).get(this.singletonView, FIRST_WINDOW), Matchers.equalTo(Double.valueOf(2.875d)));
        Assert.assertThat(this.container.createReaderForViews(ImmutableList.of(this.singletonView)).get(this.singletonView, SECOND_WINDOW), Matchers.equalTo(Double.valueOf(4.125d)));
    }

    @Test
    public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
        this.container.write(this.iterableView, ImmutableList.of(WindowedValue.of(44, FIRST_WINDOW.maxTimestamp().minus(200L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.of(44, FIRST_WINDOW.maxTimestamp().minus(200L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
        Assert.assertThat(this.container.createReaderForViews(ImmutableList.of(this.iterableView)).get(this.iterableView, FIRST_WINDOW), Matchers.contains(new Integer[]{44, 44}));
    }

    @Test
    public void writeForElementInMultipleWindowsSucceeds() throws Exception {
        this.container.write(this.singletonView, ImmutableList.of(WindowedValue.of(Double.valueOf(2.875d), FIRST_WINDOW.maxTimestamp().minus(200L), ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
        Assert.assertThat(this.container.createReaderForViews(ImmutableList.of(this.singletonView)).get(this.singletonView, FIRST_WINDOW), Matchers.equalTo(Double.valueOf(2.875d)));
        Assert.assertThat(this.container.createReaderForViews(ImmutableList.of(this.singletonView)).get(this.singletonView, SECOND_WINDOW), Matchers.equalTo(Double.valueOf(2.875d)));
    }

    @Test
    public void finishDoesNotOverwriteWrittenElements() throws Exception {
        this.container.write(this.mapView, ImmutableList.of(WindowedValue.of(KV.of("one", 1), new Instant(1L), SECOND_WINDOW, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), WindowedValue.of(KV.of("two", 2), new Instant(20L), SECOND_WINDOW, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY))));
        immediatelyInvokeCallback(this.mapView, SECOND_WINDOW);
        Map map = (Map) this.container.createReaderForViews(ImmutableList.of(this.mapView)).get(this.mapView, SECOND_WINDOW);
        Assert.assertThat(map, Matchers.hasEntry("one", 1));
        Assert.assertThat(map, Matchers.hasEntry("two", 2));
        Assert.assertThat(Integer.valueOf(map.size()), Matchers.is(2));
    }

    @Test
    public void finishOnPendingViewsSetsEmptyElements() throws Exception {
        immediatelyInvokeCallback(this.mapView, SECOND_WINDOW);
        Assert.assertThat(Boolean.valueOf(((Map) getFutureOfView(this.container.createReaderForViews(ImmutableList.of(this.mapView)), this.mapView, SECOND_WINDOW).get()).isEmpty()), Matchers.is(true));
    }

    @Test
    public void isReadyInEmptyReaderThrows() {
        ReadyCheckingSideInputReader createReaderForViews = this.container.createReaderForViews(ImmutableList.of());
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("does not contain");
        this.thrown.expectMessage(ImmutableList.of().toString());
        createReaderForViews.isReady(this.mapView, GlobalWindow.INSTANCE);
    }

    @Test
    public void isReadyForSomeNotReadyViewsFalseUntilElements() {
        this.container.write(this.mapView, ImmutableList.of(WindowedValue.of(KV.of("one", 1), SECOND_WINDOW.maxTimestamp().minus(100L), SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
        ReadyCheckingSideInputReader createReaderForViews = this.container.createReaderForViews(ImmutableList.of(this.mapView, this.singletonView));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.mapView, FIRST_WINDOW)), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.mapView, SECOND_WINDOW)), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.singletonView, SECOND_WINDOW)), Matchers.is(false));
        this.container.write(this.mapView, ImmutableList.of(WindowedValue.of(KV.of("too", 2), FIRST_WINDOW.maxTimestamp().minus(100L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.mapView, FIRST_WINDOW)), Matchers.is(false));
        this.container.write(this.singletonView, ImmutableList.of(WindowedValue.of(Double.valueOf(1.25d), SECOND_WINDOW.maxTimestamp().minus(100L), SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.mapView, SECOND_WINDOW)), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.singletonView, SECOND_WINDOW)), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.mapView, GlobalWindow.INSTANCE)), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.singletonView, GlobalWindow.INSTANCE)), Matchers.is(false));
        ReadyCheckingSideInputReader createReaderForViews2 = this.container.createReaderForViews(ImmutableList.of(this.mapView, this.singletonView));
        Assert.assertThat(Boolean.valueOf(createReaderForViews2.isReady(this.mapView, SECOND_WINDOW)), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(createReaderForViews2.isReady(this.singletonView, SECOND_WINDOW)), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(createReaderForViews2.isReady(this.mapView, FIRST_WINDOW)), Matchers.is(true));
    }

    @Test
    public void isReadyForEmptyWindowTrue() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        immediatelyInvokeCallback(this.mapView, GlobalWindow.INSTANCE);
        CountDownLatch invokeLatchedCallback = invokeLatchedCallback(this.singletonView, GlobalWindow.INSTANCE, countDownLatch);
        ReadyCheckingSideInputReader createReaderForViews = this.container.createReaderForViews(ImmutableList.of(this.mapView, this.singletonView));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.mapView, GlobalWindow.INSTANCE)), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.singletonView, GlobalWindow.INSTANCE)), Matchers.is(false));
        invokeLatchedCallback.countDown();
        if (!countDownLatch.await(1500L, TimeUnit.MILLISECONDS)) {
            Assert.fail("Callback to set empty values did not complete!");
        }
        Assert.assertThat(Boolean.valueOf(createReaderForViews.isReady(this.singletonView, GlobalWindow.INSTANCE)), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(this.container.createReaderForViews(ImmutableList.of(this.mapView, this.singletonView)).isReady(this.singletonView, GlobalWindow.INSTANCE)), Matchers.is(true));
    }

    private void immediatelyInvokeCallback(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow) {
        ((EvaluationContext) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.beam.runners.direct.SideInputContainerTest.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m12answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((Runnable) invocationOnMock.getArguments()[3]).run();
                return null;
            }
        }).when(this.context)).scheduleAfterOutputWouldBeProduced((PValue) Mockito.eq(pCollectionView), (BoundedWindow) Mockito.eq(boundedWindow), (WindowingStrategy) Mockito.eq(pCollectionView.getWindowingStrategyInternal()), (Runnable) Mockito.any(Runnable.class));
    }

    private CountDownLatch invokeLatchedCallback(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow, final CountDownLatch countDownLatch) {
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ((EvaluationContext) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.beam.runners.direct.SideInputContainerTest.4
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m13answer(InvocationOnMock invocationOnMock) throws Throwable {
                final Runnable runnable = (Runnable) invocationOnMock.getArguments()[3];
                Executors.newSingleThreadExecutor().submit(new Runnable() { // from class: org.apache.beam.runners.direct.SideInputContainerTest.4.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            if (!countDownLatch2.await(1500L, TimeUnit.MILLISECONDS)) {
                                Assert.fail("Run latch didn't count down within timeout");
                            }
                            runnable.run();
                            countDownLatch.countDown();
                        } catch (InterruptedException e) {
                            Assert.fail("Unexpectedly interrupted while waiting for latch to be counted down");
                        }
                    }
                });
                return null;
            }
        }).when(this.context)).scheduleAfterOutputWouldBeProduced((PValue) Mockito.eq(pCollectionView), (BoundedWindow) Mockito.eq(boundedWindow), (WindowingStrategy) Mockito.eq(pCollectionView.getWindowingStrategyInternal()), (Runnable) Mockito.any(Runnable.class));
        return countDownLatch2;
    }

    private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader sideInputReader, final PCollectionView<ValueT> pCollectionView, final BoundedWindow boundedWindow) {
        return Executors.newSingleThreadExecutor().submit(new Callable<ValueT>() { // from class: org.apache.beam.runners.direct.SideInputContainerTest.5
            @Override // java.util.concurrent.Callable
            public ValueT call() throws Exception {
                return (ValueT) sideInputReader.get(pCollectionView, boundedWindow);
            }
        });
    }
}
