package org.apache.beam.runners.fnexecution.translation;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactoryTest.class */
public class BatchSideInputHandlerFactoryTest {

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Mock
    private BatchSideInputHandlerFactory.SideInputGetter context;
    private static final RunnerApi.FunctionSpec MULTIMAP_ACCESS = RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.MULTIMAP_SIDE_INPUT).build();
    private static final RunnerApi.FunctionSpec ITERABLE_ACCESS = RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.ITERABLE_SIDE_INPUT).build();
    private static final String TRANSFORM_ID = "transform-id";
    private static final String SIDE_INPUT_NAME = "side-input";
    private static final String COLLECTION_ID = "collection";
    private static final ExecutableStage EXECUTABLE_STAGE = createExecutableStage(Arrays.asList(SideInputReference.of(PipelineNode.pTransform(TRANSFORM_ID, RunnerApi.PTransform.getDefaultInstance()), SIDE_INPUT_NAME, PipelineNode.pCollection(COLLECTION_ID, RunnerApi.PCollection.getDefaultInstance()))));
    private static final byte[] ENCODED_NULL = encode(null, VoidCoder.of());
    private static final byte[] ENCODED_FOO = encode("foo", StringUtf8Coder.of());

    @Before
    public void setUpMocks() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void invalidSideInputThrowsException() {
        BatchSideInputHandlerFactory forStage = BatchSideInputHandlerFactory.forStage(createExecutableStage(Collections.emptyList()), this.context);
        this.thrown.expect(Matchers.instanceOf(IllegalArgumentException.class));
        forStage.forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, KvCoder.of(VoidCoder.of(), VoidCoder.of()), GlobalWindow.Coder.INSTANCE);
    }

    @Test
    public void emptyResultForEmptyCollection() {
        MatcherAssert.assertThat(BatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, this.context).forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, KvCoder.of(VoidCoder.of(), VarIntCoder.of()), GlobalWindow.Coder.INSTANCE).get(ENCODED_NULL, GlobalWindow.INSTANCE), Matchers.emptyIterable());
    }

    @Test
    public void singleElementForCollection() {
        Mockito.when(this.context.getSideInput(COLLECTION_ID)).thenReturn(Arrays.asList(WindowedValue.valueInGlobalWindow(KV.of((Object) null, 3))));
        MatcherAssert.assertThat(BatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, this.context).forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, KvCoder.of(VoidCoder.of(), VarIntCoder.of()), GlobalWindow.Coder.INSTANCE).get(ENCODED_NULL, GlobalWindow.INSTANCE), Matchers.contains(new Integer[]{3}));
    }

    @Test
    public void groupsValuesByKey() {
        Mockito.when(this.context.getSideInput(COLLECTION_ID)).thenReturn(Arrays.asList(WindowedValue.valueInGlobalWindow(KV.of("foo", 2)), WindowedValue.valueInGlobalWindow(KV.of("bar", 3)), WindowedValue.valueInGlobalWindow(KV.of("foo", 5))));
        MatcherAssert.assertThat(BatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, this.context).forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), GlobalWindow.Coder.INSTANCE).get(ENCODED_FOO, GlobalWindow.INSTANCE), Matchers.containsInAnyOrder(new Integer[]{2, 5}));
    }

    @Test
    public void groupsValuesByWindowAndKey() {
        Instant instant = new DateTime(2018, 1, 1, 1, 1, DateTimeZone.UTC).toInstant();
        Instant instant2 = new DateTime(2018, 1, 1, 1, 2, DateTimeZone.UTC).toInstant();
        Instant instant3 = new DateTime(2018, 1, 1, 1, 3, DateTimeZone.UTC).toInstant();
        IntervalWindow intervalWindow = new IntervalWindow(instant, instant2);
        IntervalWindow intervalWindow2 = new IntervalWindow(instant2, instant3);
        Mockito.when(this.context.getSideInput(COLLECTION_ID)).thenReturn(Arrays.asList(WindowedValue.of(KV.of("foo", 1), instant, intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of(KV.of("bar", 2), instant, intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of(KV.of("foo", 3), instant, intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of(KV.of("foo", 4), instant2, intervalWindow2, PaneInfo.NO_FIRING), WindowedValue.of(KV.of("bar", 5), instant2, intervalWindow2, PaneInfo.NO_FIRING), WindowedValue.of(KV.of("foo", 6), instant2, intervalWindow2, PaneInfo.NO_FIRING)));
        StateRequestHandlers.SideInputHandler forSideInput = BatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, this.context).forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), IntervalWindow.IntervalWindowCoder.of());
        Iterable iterable = forSideInput.get(ENCODED_FOO, intervalWindow);
        Iterable iterable2 = forSideInput.get(ENCODED_FOO, intervalWindow2);
        MatcherAssert.assertThat(iterable, Matchers.containsInAnyOrder(new Integer[]{1, 3}));
        MatcherAssert.assertThat(iterable2, Matchers.containsInAnyOrder(new Integer[]{4, 6}));
    }

    @Test
    public void iterableAccessPattern() {
        Instant instant = new DateTime(2018, 1, 1, 1, 1, DateTimeZone.UTC).toInstant();
        Instant instant2 = new DateTime(2018, 1, 1, 1, 2, DateTimeZone.UTC).toInstant();
        Instant instant3 = new DateTime(2018, 1, 1, 1, 3, DateTimeZone.UTC).toInstant();
        IntervalWindow intervalWindow = new IntervalWindow(instant, instant2);
        IntervalWindow intervalWindow2 = new IntervalWindow(instant2, instant3);
        Mockito.when(this.context.getSideInput(COLLECTION_ID)).thenReturn(Arrays.asList(WindowedValue.of(1, instant, intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of(2, instant, intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of(3, instant2, intervalWindow2, PaneInfo.NO_FIRING), WindowedValue.of(4, instant2, intervalWindow2, PaneInfo.NO_FIRING)));
        StateRequestHandlers.SideInputHandler forSideInput = BatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, this.context).forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, ITERABLE_ACCESS, VarIntCoder.of(), IntervalWindow.IntervalWindowCoder.of());
        Iterable iterable = forSideInput.get((byte[]) null, intervalWindow);
        Iterable iterable2 = forSideInput.get((byte[]) null, intervalWindow2);
        MatcherAssert.assertThat(iterable, Matchers.containsInAnyOrder(new Integer[]{1, 2}));
        MatcherAssert.assertThat(iterable2, Matchers.containsInAnyOrder(new Integer[]{3, 4}));
    }

    private static ExecutableStage createExecutableStage(Collection<SideInputReference> collection) {
        return ImmutableExecutableStage.of(RunnerApi.Components.getDefaultInstance(), RunnerApi.Environment.getDefaultInstance(), PipelineNode.pCollection("collection-id", RunnerApi.PCollection.getDefaultInstance()), collection, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
    }

    private static <T> byte[] encode(T t, Coder<T> coder) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            coder.encode(t, byteArrayOutputStream);
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
