/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.translation;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class BatchSideInputHandlerFactoryTest {
    private static final String TRANSFORM_ID = "transform-id";
    private static final String SIDE_INPUT_NAME = "side-input";
    private static final String COLLECTION_ID = "collection";
    private static final RunnerApi.FunctionSpec MULTIMAP_ACCESS = RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.MULTIMAP_SIDE_INPUT).build();
    private static final RunnerApi.FunctionSpec ITERABLE_ACCESS = RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.ITERABLE_SIDE_INPUT).build();
    private static final ExecutableStage EXECUTABLE_STAGE = BatchSideInputHandlerFactoryTest.createExecutableStage(Arrays.asList(SideInputReference.of((PipelineNode.PTransformNode)PipelineNode.pTransform((String)"transform-id", (RunnerApi.PTransform)RunnerApi.PTransform.getDefaultInstance()), (String)"side-input", (PipelineNode.PCollectionNode)PipelineNode.pCollection((String)"collection", (RunnerApi.PCollection)RunnerApi.PCollection.getDefaultInstance()))));
    private static final byte[] ENCODED_NULL = BatchSideInputHandlerFactoryTest.encode(null, VoidCoder.of());
    private static final byte[] ENCODED_FOO = BatchSideInputHandlerFactoryTest.encode("foo", StringUtf8Coder.of());
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    @Mock
    private BatchSideInputHandlerFactory.SideInputGetter context;

    @Before
    public void setUpMocks() {
        MockitoAnnotations.initMocks((Object)this);
    }

    @Test
    public void invalidSideInputThrowsException() {
        ExecutableStage stage = BatchSideInputHandlerFactoryTest.createExecutableStage(Collections.emptyList());
        BatchSideInputHandlerFactory factory = BatchSideInputHandlerFactory.forStage((ExecutableStage)stage, (BatchSideInputHandlerFactory.SideInputGetter)this.context);
        this.thrown.expect(Matchers.instanceOf(IllegalArgumentException.class));
        factory.forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, (Coder)KvCoder.of((Coder)VoidCoder.of(), (Coder)VoidCoder.of()), (Coder)GlobalWindow.Coder.INSTANCE);
    }

    @Test
    public void emptyResultForEmptyCollection() {
        BatchSideInputHandlerFactory factory = BatchSideInputHandlerFactory.forStage((ExecutableStage)EXECUTABLE_STAGE, (BatchSideInputHandlerFactory.SideInputGetter)this.context);
        StateRequestHandlers.SideInputHandler handler = factory.forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, (Coder)KvCoder.of((Coder)VoidCoder.of(), (Coder)VarIntCoder.of()), (Coder)GlobalWindow.Coder.INSTANCE);
        Iterable result = handler.get(ENCODED_NULL, (BoundedWindow)GlobalWindow.INSTANCE);
        MatcherAssert.assertThat((Object)result, (Matcher)Matchers.emptyIterable());
    }

    @Test
    public void singleElementForCollection() {
        Mockito.when((Object)this.context.getSideInput(COLLECTION_ID)).thenReturn(Arrays.asList(WindowedValue.valueInGlobalWindow((Object)KV.of(null, (Object)3))));
        BatchSideInputHandlerFactory factory = BatchSideInputHandlerFactory.forStage((ExecutableStage)EXECUTABLE_STAGE, (BatchSideInputHandlerFactory.SideInputGetter)this.context);
        StateRequestHandlers.SideInputHandler handler = factory.forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, (Coder)KvCoder.of((Coder)VoidCoder.of(), (Coder)VarIntCoder.of()), (Coder)GlobalWindow.Coder.INSTANCE);
        Iterable result = handler.get(ENCODED_NULL, (BoundedWindow)GlobalWindow.INSTANCE);
        MatcherAssert.assertThat((Object)result, (Matcher)Matchers.contains((Object[])new Integer[]{3}));
    }

    @Test
    public void groupsValuesByKey() {
        Mockito.when((Object)this.context.getSideInput(COLLECTION_ID)).thenReturn(Arrays.asList(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"foo", (Object)2)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"bar", (Object)3)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"foo", (Object)5))));
        BatchSideInputHandlerFactory factory = BatchSideInputHandlerFactory.forStage((ExecutableStage)EXECUTABLE_STAGE, (BatchSideInputHandlerFactory.SideInputGetter)this.context);
        StateRequestHandlers.SideInputHandler handler = factory.forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, (Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of()), (Coder)GlobalWindow.Coder.INSTANCE);
        Iterable result = handler.get(ENCODED_FOO, (BoundedWindow)GlobalWindow.INSTANCE);
        MatcherAssert.assertThat((Object)result, (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{2, 5}));
    }

    @Test
    public void groupsValuesByWindowAndKey() {
        Instant instantA = new DateTime(2018, 1, 1, 1, 1, DateTimeZone.UTC).toInstant();
        Instant instantB = new DateTime(2018, 1, 1, 1, 2, DateTimeZone.UTC).toInstant();
        Instant instantC = new DateTime(2018, 1, 1, 1, 3, DateTimeZone.UTC).toInstant();
        IntervalWindow windowA = new IntervalWindow(instantA, instantB);
        IntervalWindow windowB = new IntervalWindow(instantB, instantC);
        Mockito.when((Object)this.context.getSideInput(COLLECTION_ID)).thenReturn(Arrays.asList(WindowedValue.of((Object)KV.of((Object)"foo", (Object)1), (Instant)instantA, (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)KV.of((Object)"bar", (Object)2), (Instant)instantA, (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)KV.of((Object)"foo", (Object)3), (Instant)instantA, (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)KV.of((Object)"foo", (Object)4), (Instant)instantB, (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)KV.of((Object)"bar", (Object)5), (Instant)instantB, (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)KV.of((Object)"foo", (Object)6), (Instant)instantB, (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)));
        BatchSideInputHandlerFactory factory = BatchSideInputHandlerFactory.forStage((ExecutableStage)EXECUTABLE_STAGE, (BatchSideInputHandlerFactory.SideInputGetter)this.context);
        StateRequestHandlers.SideInputHandler handler = factory.forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, MULTIMAP_ACCESS, (Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of()), (Coder)IntervalWindow.IntervalWindowCoder.of());
        Iterable resultA = handler.get(ENCODED_FOO, (BoundedWindow)windowA);
        Iterable resultB = handler.get(ENCODED_FOO, (BoundedWindow)windowB);
        MatcherAssert.assertThat((Object)resultA, (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{1, 3}));
        MatcherAssert.assertThat((Object)resultB, (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{4, 6}));
    }

    @Test
    public void iterableAccessPattern() {
        Instant instantA = new DateTime(2018, 1, 1, 1, 1, DateTimeZone.UTC).toInstant();
        Instant instantB = new DateTime(2018, 1, 1, 1, 2, DateTimeZone.UTC).toInstant();
        Instant instantC = new DateTime(2018, 1, 1, 1, 3, DateTimeZone.UTC).toInstant();
        IntervalWindow windowA = new IntervalWindow(instantA, instantB);
        IntervalWindow windowB = new IntervalWindow(instantB, instantC);
        Mockito.when((Object)this.context.getSideInput(COLLECTION_ID)).thenReturn(Arrays.asList(WindowedValue.of((Object)1, (Instant)instantA, (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)2, (Instant)instantA, (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)3, (Instant)instantB, (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)4, (Instant)instantB, (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)));
        BatchSideInputHandlerFactory factory = BatchSideInputHandlerFactory.forStage((ExecutableStage)EXECUTABLE_STAGE, (BatchSideInputHandlerFactory.SideInputGetter)this.context);
        StateRequestHandlers.SideInputHandler handler = factory.forSideInput(TRANSFORM_ID, SIDE_INPUT_NAME, ITERABLE_ACCESS, (Coder)VarIntCoder.of(), (Coder)IntervalWindow.IntervalWindowCoder.of());
        Iterable resultA = handler.get(null, (BoundedWindow)windowA);
        Iterable resultB = handler.get(null, (BoundedWindow)windowB);
        MatcherAssert.assertThat((Object)resultA, (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{1, 2}));
        MatcherAssert.assertThat((Object)resultB, (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{3, 4}));
    }

    private static ExecutableStage createExecutableStage(Collection<SideInputReference> sideInputs) {
        RunnerApi.Components components = RunnerApi.Components.getDefaultInstance();
        RunnerApi.Environment environment = RunnerApi.Environment.getDefaultInstance();
        PipelineNode.PCollectionNode inputCollection = PipelineNode.pCollection((String)"collection-id", (RunnerApi.PCollection)RunnerApi.PCollection.getDefaultInstance());
        return ImmutableExecutableStage.of((RunnerApi.Components)components, (RunnerApi.Environment)environment, (PipelineNode.PCollectionNode)inputCollection, sideInputs, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
    }

    private static <T> byte[] encode(T value, Coder<T> coder) {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try {
            coder.encode(value, (OutputStream)out);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return out.toByteArray();
    }
}

