/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContextFactory;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
import org.apache.beam.runners.fnexecution.control.BundleFinalizationHandler;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.powermock.reflect.Whitebox;

@RunWith(value=Parameterized.class)
public class FlinkExecutableStageFunctionTest {
    @Parameterized.Parameter
    public @UnknownKeyFor @NonNull @Initialized boolean isStateful;
    @Rule
    public @UnknownKeyFor @NonNull @Initialized ExpectedException thrown = ExpectedException.none();
    @Mock
    private @UnknownKeyFor @NonNull @Initialized RuntimeContext runtimeContext;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized DistributedCache distributedCache;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized Collector<@UnknownKeyFor @NonNull @Initialized RawUnionValue> collector;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized ExecutableStageContext stageContext;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized StageBundleFactory stageBundleFactory;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized StateRequestHandler stateRequestHandler;
    @Mock
    private // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.ExecutableStagePayload stagePayload = RunnerApi.ExecutableStagePayload.newBuilder().setInput("input").setComponents(RunnerApi.Components.newBuilder().putTransforms("transform", RunnerApi.PTransform.newBuilder().putInputs("bla", "input").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:pardo:v1")).build()).putPcollections("input", RunnerApi.PCollection.getDefaultInstance()).build()).addUserStates(RunnerApi.ExecutableStagePayload.UserStateId.newBuilder().setTransformId("transform").build()).build();
    private final @UnknownKeyFor @NonNull @Initialized JobInfo jobInfo = JobInfo.create((String)"job-id", (String)"job-name", (String)"retrieval-token", (Struct)Struct.getDefaultInstance());

    @Parameterized.Parameters
    public static @UnknownKeyFor @NonNull @Initialized Object @UnknownKeyFor @NonNull @Initialized [] data() {
        return new Object[]{true, false};
    }

    @Before
    public void setUpMocks() throws @UnknownKeyFor @NonNull @Initialized Exception {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.runtimeContext.getDistributedCache()).thenReturn((Object)this.distributedCache);
        Mockito.when((Object)this.stageContext.getStageBundleFactory((ExecutableStage)Matchers.any())).thenReturn((Object)this.stageBundleFactory);
        RemoteBundle remoteBundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(StateRequestHandler.class), (BundleProgressHandler)Matchers.any(BundleProgressHandler.class), (BundleFinalizationHandler)Matchers.any(BundleFinalizationHandler.class))).thenReturn((Object)remoteBundle);
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (TimerReceiverFactory)Matchers.any(TimerReceiverFactory.class), (StateRequestHandler)Matchers.any(StateRequestHandler.class), (BundleProgressHandler)Matchers.any(BundleProgressHandler.class))).thenReturn((Object)remoteBundle);
        ImmutableMap input = ImmutableMap.builder().put((Object)"input", Mockito.mock(FnDataReceiver.class)).build();
        Mockito.when((Object)remoteBundle.getInputReceivers()).thenReturn((Object)input);
        Mockito.when((Object)this.processBundleDescriptor.getTimerSpecs()).thenReturn(Collections.emptyMap());
    }

    @Test
    public void sdkErrorsSurfaceOnClose() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkExecutableStageFunction<Integer> function = this.getFunction(Collections.emptyMap());
        function.open(new Configuration());
        RemoteBundle bundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(StateRequestHandler.class), (BundleProgressHandler)Matchers.any(BundleProgressHandler.class), (BundleFinalizationHandler)Matchers.any(BundleFinalizationHandler.class))).thenReturn((Object)bundle);
        FnDataReceiver receiver = (FnDataReceiver)Mockito.mock(FnDataReceiver.class);
        Mockito.when((Object)bundle.getInputReceivers()).thenReturn((Object)ImmutableMap.of((Object)"input", (Object)receiver));
        Exception expected = new Exception();
        ((RemoteBundle)Mockito.doThrow((Throwable[])new Throwable[]{expected}).when((Object)bundle)).close();
        this.thrown.expect(org.hamcrest.Matchers.is((Object)expected));
        function.mapPartition(Collections.emptyList(), this.collector);
    }

    @Test
    public void expectedInputsAreSent() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkExecutableStageFunction<Integer> function = this.getFunction(Collections.emptyMap());
        function.open(new Configuration());
        RemoteBundle bundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(StateRequestHandler.class), (BundleProgressHandler)Matchers.any(BundleProgressHandler.class), (BundleFinalizationHandler)Matchers.any(BundleFinalizationHandler.class))).thenReturn((Object)bundle);
        FnDataReceiver receiver = (FnDataReceiver)Mockito.mock(FnDataReceiver.class);
        Mockito.when((Object)bundle.getInputReceivers()).thenReturn((Object)ImmutableMap.of((Object)"input", (Object)receiver));
        WindowedValue one = WindowedValue.valueInGlobalWindow((Object)1);
        WindowedValue two = WindowedValue.valueInGlobalWindow((Object)2);
        WindowedValue three = WindowedValue.valueInGlobalWindow((Object)3);
        function.mapPartition(Arrays.asList(one, two, three), this.collector);
        ((FnDataReceiver)Mockito.verify((Object)receiver)).accept((Object)one);
        ((FnDataReceiver)Mockito.verify((Object)receiver)).accept((Object)two);
        ((FnDataReceiver)Mockito.verify((Object)receiver)).accept((Object)three);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{receiver});
    }

    @Test
    public void outputsAreTaggedCorrectly() throws @UnknownKeyFor @NonNull @Initialized Exception {
        final WindowedValue three = WindowedValue.valueInGlobalWindow((Object)3);
        final WindowedValue four = WindowedValue.valueInGlobalWindow((Object)4);
        final WindowedValue five = WindowedValue.valueInGlobalWindow((Object)5);
        ImmutableMap outputTagMap = ImmutableMap.of((Object)"one", (Object)1, (Object)"two", (Object)2, (Object)"three", (Object)3);
        StageBundleFactory stageBundleFactory = new StageBundleFactory(){
            private @UnknownKeyFor @NonNull @Initialized boolean once;

            public @UnknownKeyFor @NonNull @Initialized RemoteBundle getBundle(final @UnknownKeyFor @NonNull @Initialized OutputReceiverFactory receiverFactory, @UnknownKeyFor @NonNull @Initialized TimerReceiverFactory timerReceiverFactory, @UnknownKeyFor @NonNull @Initialized StateRequestHandler stateRequestHandler, @UnknownKeyFor @NonNull @Initialized BundleProgressHandler progressHandler, @UnknownKeyFor @NonNull @Initialized BundleFinalizationHandler finalizationHandler) {
                return new RemoteBundle(){

                    public @UnknownKeyFor @NonNull @Initialized String getId() {
                        return "bundle-id";
                    }

                    public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized FnDataReceiver> getInputReceivers() {
                        return ImmutableMap.of((Object)"input", input -> {});
                    }

                    public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String>, @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized Timer>> getTimerReceivers() {
                        return Collections.emptyMap();
                    }

                    public void requestProgress() {
                        throw new UnsupportedOperationException();
                    }

                    public void split(@UnknownKeyFor @NonNull @Initialized double fractionOfRemainder) {
                        throw new UnsupportedOperationException();
                    }

                    public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
                        if (once) {
                            return;
                        }
                        receiverFactory.create("one").accept((Object)three);
                        receiverFactory.create("two").accept((Object)four);
                        receiverFactory.create("three").accept((Object)five);
                        once = true;
                    }
                };
            }

            public // Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
                return FlinkExecutableStageFunctionTest.this.processBundleDescriptor;
            }

            public @UnknownKeyFor @NonNull @Initialized InstructionRequestHandler getInstructionRequestHandler() {
                return null;
            }

            public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
            }
        };
        Mockito.when((Object)this.stageContext.getStageBundleFactory((ExecutableStage)Matchers.any())).thenReturn((Object)stageBundleFactory);
        FlinkExecutableStageFunction<Integer> function = this.getFunction((Map<String, Integer>)outputTagMap);
        function.open(new Configuration());
        if (this.isStateful) {
            function.reduce(Collections.emptyList(), this.collector);
        } else {
            function.mapPartition(Collections.emptyList(), this.collector);
        }
        ((Collector)Mockito.verify(this.collector)).collect((Object)new RawUnionValue(1, (Object)three));
        ((Collector)Mockito.verify(this.collector)).collect((Object)new RawUnionValue(2, (Object)four));
        ((Collector)Mockito.verify(this.collector)).collect((Object)new RawUnionValue(3, (Object)five));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.collector});
    }

    @Test
    public void testStageBundleClosed() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkExecutableStageFunction<Integer> function = this.getFunction(Collections.emptyMap());
        function.open(new Configuration());
        function.close();
        ((StageBundleFactory)Mockito.verify((Object)this.stageBundleFactory)).getProcessBundleDescriptor();
        ((StageBundleFactory)Mockito.verify((Object)this.stageBundleFactory)).close();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.stageBundleFactory});
    }

    @Test
    public void testAccumulatorRegistrationOnOperatorClose() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkExecutableStageFunction<Integer> function = this.getFunction(Collections.emptyMap());
        function.open(new Configuration());
        String metricContainerFieldName = "metricContainer";
        FlinkMetricContainer monitoredContainer = (FlinkMetricContainer)Mockito.spy((Object)((FlinkMetricContainer)Whitebox.getInternalState(function, (String)metricContainerFieldName)));
        Whitebox.setInternalState(function, (String)metricContainerFieldName, (Object)monitoredContainer);
        function.close();
        ((FlinkMetricContainer)Mockito.verify((Object)monitoredContainer)).registerMetricsForPipelineResult();
    }

    private @UnknownKeyFor @NonNull @Initialized FlinkExecutableStageFunction<@UnknownKeyFor @NonNull @Initialized Integer> getFunction(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> outputMap) {
        FlinkExecutableStageContextFactory contextFactory = (FlinkExecutableStageContextFactory)Mockito.mock(FlinkExecutableStageContextFactory.class);
        Mockito.when((Object)contextFactory.get((JobInfo)Matchers.any())).thenReturn((Object)this.stageContext);
        FlinkExecutableStageFunction function = new FlinkExecutableStageFunction("step", PipelineOptionsFactory.create(), this.stagePayload, this.jobInfo, outputMap, contextFactory, null);
        function.setRuntimeContext(this.runtimeContext);
        Whitebox.setInternalState((Object)function, (String)"stateRequestHandler", (Object)this.stateRequestHandler);
        return function;
    }
}

