package org.apache.beam.runners.flink.translation.functions;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.fnexecution.control.BundleFinalizationHandler;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.powermock.reflect.Whitebox;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.class */
public class FlinkExecutableStageFunctionTest {

    @Parameterized.Parameter
    public boolean isStateful;

    @Mock
    private RuntimeContext runtimeContext;

    @Mock
    private DistributedCache distributedCache;

    @Mock
    private Collector<RawUnionValue> collector;

    @Mock
    private ExecutableStageContext stageContext;

    @Mock
    private StageBundleFactory stageBundleFactory;

    @Mock
    private StateRequestHandler stateRequestHandler;

    @Mock
    private ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private final RunnerApi.ExecutableStagePayload stagePayload = RunnerApi.ExecutableStagePayload.newBuilder().setInput("input").setComponents(RunnerApi.Components.newBuilder().putTransforms("transform", RunnerApi.PTransform.newBuilder().putInputs("bla", "input").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:pardo:v1")).build()).putPcollections("input", RunnerApi.PCollection.getDefaultInstance()).build()).addUserStates(RunnerApi.ExecutableStagePayload.UserStateId.newBuilder().setTransformId("transform").build()).build();
    private final JobInfo jobInfo = JobInfo.create("job-id", "job-name", "retrieval-token", Struct.getDefaultInstance());

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{true, false};
    }

    @Before
    public void setUpMocks() throws Exception {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.runtimeContext.getDistributedCache()).thenReturn(this.distributedCache);
        Mockito.when(this.stageContext.getStageBundleFactory((ExecutableStage) Matchers.any())).thenReturn(this.stageBundleFactory);
        RemoteBundle remoteBundle = (RemoteBundle) Mockito.mock(RemoteBundle.class);
        Mockito.when(this.stageBundleFactory.getBundle((OutputReceiverFactory) Matchers.any(), (StateRequestHandler) Matchers.any(StateRequestHandler.class), (BundleProgressHandler) Matchers.any(BundleProgressHandler.class), (BundleFinalizationHandler) Matchers.any(BundleFinalizationHandler.class))).thenReturn(remoteBundle);
        Mockito.when(this.stageBundleFactory.getBundle((OutputReceiverFactory) Matchers.any(), (TimerReceiverFactory) Matchers.any(TimerReceiverFactory.class), (StateRequestHandler) Matchers.any(StateRequestHandler.class), (BundleProgressHandler) Matchers.any(BundleProgressHandler.class))).thenReturn(remoteBundle);
        Mockito.when(remoteBundle.getInputReceivers()).thenReturn(ImmutableMap.builder().put("input", Mockito.mock(FnDataReceiver.class)).build());
        Mockito.when(this.processBundleDescriptor.getTimerSpecs()).thenReturn(Collections.emptyMap());
    }

    @Test
    public void sdkErrorsSurfaceOnClose() throws Exception {
        FlinkExecutableStageFunction<Integer> function = getFunction(Collections.emptyMap());
        function.open(new Configuration());
        RemoteBundle remoteBundle = (RemoteBundle) Mockito.mock(RemoteBundle.class);
        Mockito.when(this.stageBundleFactory.getBundle((OutputReceiverFactory) Matchers.any(), (StateRequestHandler) Matchers.any(StateRequestHandler.class), (BundleProgressHandler) Matchers.any(BundleProgressHandler.class), (BundleFinalizationHandler) Matchers.any(BundleFinalizationHandler.class))).thenReturn(remoteBundle);
        Mockito.when(remoteBundle.getInputReceivers()).thenReturn(ImmutableMap.of("input", (FnDataReceiver) Mockito.mock(FnDataReceiver.class)));
        Exception exc = new Exception();
        ((RemoteBundle) Mockito.doThrow(new Throwable[]{exc}).when(remoteBundle)).close();
        this.thrown.expect(org.hamcrest.Matchers.is(exc));
        function.mapPartition(Collections.emptyList(), this.collector);
    }

    @Test
    public void expectedInputsAreSent() throws Exception {
        FlinkExecutableStageFunction<Integer> function = getFunction(Collections.emptyMap());
        function.open(new Configuration());
        RemoteBundle remoteBundle = (RemoteBundle) Mockito.mock(RemoteBundle.class);
        Mockito.when(this.stageBundleFactory.getBundle((OutputReceiverFactory) Matchers.any(), (StateRequestHandler) Matchers.any(StateRequestHandler.class), (BundleProgressHandler) Matchers.any(BundleProgressHandler.class), (BundleFinalizationHandler) Matchers.any(BundleFinalizationHandler.class))).thenReturn(remoteBundle);
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        Mockito.when(remoteBundle.getInputReceivers()).thenReturn(ImmutableMap.of("input", fnDataReceiver));
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(1);
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow(2);
        WindowedValue valueInGlobalWindow3 = WindowedValue.valueInGlobalWindow(3);
        function.mapPartition(Arrays.asList(valueInGlobalWindow, valueInGlobalWindow2, valueInGlobalWindow3), this.collector);
        ((FnDataReceiver) Mockito.verify(fnDataReceiver)).accept(valueInGlobalWindow);
        ((FnDataReceiver) Mockito.verify(fnDataReceiver)).accept(valueInGlobalWindow2);
        ((FnDataReceiver) Mockito.verify(fnDataReceiver)).accept(valueInGlobalWindow3);
        Mockito.verifyNoMoreInteractions(new Object[]{fnDataReceiver});
    }

    @Test
    public void outputsAreTaggedCorrectly() throws Exception {
        final WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(3);
        final WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow(4);
        final WindowedValue valueInGlobalWindow3 = WindowedValue.valueInGlobalWindow(5);
        ImmutableMap of = ImmutableMap.of("one", 1, "two", 2, "three", 3);
        Mockito.when(this.stageContext.getStageBundleFactory((ExecutableStage) Matchers.any())).thenReturn(new StageBundleFactory() { // from class: org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunctionTest.1
            private boolean once;

            public RemoteBundle getBundle(final OutputReceiverFactory outputReceiverFactory, TimerReceiverFactory timerReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler, BundleFinalizationHandler bundleFinalizationHandler) {
                return new RemoteBundle() { // from class: org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunctionTest.1.1
                    public String getId() {
                        return "bundle-id";
                    }

                    public Map<String, FnDataReceiver> getInputReceivers() {
                        return ImmutableMap.of("input", obj -> {
                        });
                    }

                    public Map<KV<String, String>, FnDataReceiver<Timer>> getTimerReceivers() {
                        return Collections.emptyMap();
                    }

                    public void requestProgress() {
                        throw new UnsupportedOperationException();
                    }

                    public void split(double d) {
                        throw new UnsupportedOperationException();
                    }

                    public void close() throws Exception {
                        if (AnonymousClass1.this.once) {
                            return;
                        }
                        outputReceiverFactory.create("one").accept(valueInGlobalWindow);
                        outputReceiverFactory.create("two").accept(valueInGlobalWindow2);
                        outputReceiverFactory.create("three").accept(valueInGlobalWindow3);
                        AnonymousClass1.this.once = true;
                    }
                };
            }

            public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
                return FlinkExecutableStageFunctionTest.this.processBundleDescriptor;
            }

            public InstructionRequestHandler getInstructionRequestHandler() {
                return null;
            }

            public void close() throws Exception {
            }
        });
        FlinkExecutableStageFunction<Integer> function = getFunction(of);
        function.open(new Configuration());
        if (this.isStateful) {
            function.reduce(Collections.emptyList(), this.collector);
        } else {
            function.mapPartition(Collections.emptyList(), this.collector);
        }
        ((Collector) Mockito.verify(this.collector)).collect(new RawUnionValue(1, valueInGlobalWindow));
        ((Collector) Mockito.verify(this.collector)).collect(new RawUnionValue(2, valueInGlobalWindow2));
        ((Collector) Mockito.verify(this.collector)).collect(new RawUnionValue(3, valueInGlobalWindow3));
        Mockito.verifyNoMoreInteractions(new Object[]{this.collector});
    }

    @Test
    public void testStageBundleClosed() throws Exception {
        FlinkExecutableStageFunction<Integer> function = getFunction(Collections.emptyMap());
        function.open(new Configuration());
        function.close();
        ((StageBundleFactory) Mockito.verify(this.stageBundleFactory)).getProcessBundleDescriptor();
        ((StageBundleFactory) Mockito.verify(this.stageBundleFactory)).close();
        Mockito.verifyNoMoreInteractions(new Object[]{this.stageBundleFactory});
    }

    @Test
    public void testAccumulatorRegistrationOnOperatorClose() throws Exception {
        FlinkExecutableStageFunction<Integer> function = getFunction(Collections.emptyMap());
        function.open(new Configuration());
        FlinkMetricContainer flinkMetricContainer = (FlinkMetricContainer) Mockito.spy((FlinkMetricContainer) Whitebox.getInternalState(function, "metricContainer"));
        Whitebox.setInternalState(function, "metricContainer", flinkMetricContainer);
        function.close();
        ((FlinkMetricContainer) Mockito.verify(flinkMetricContainer)).registerMetricsForPipelineResult();
    }

    private FlinkExecutableStageFunction<Integer> getFunction(Map<String, Integer> map) {
        FlinkExecutableStageContextFactory flinkExecutableStageContextFactory = (FlinkExecutableStageContextFactory) Mockito.mock(FlinkExecutableStageContextFactory.class);
        Mockito.when(flinkExecutableStageContextFactory.get((JobInfo) Matchers.any())).thenReturn(this.stageContext);
        FlinkExecutableStageFunction<Integer> flinkExecutableStageFunction = new FlinkExecutableStageFunction<>("step", PipelineOptionsFactory.create(), this.stagePayload, this.jobInfo, map, flinkExecutableStageContextFactory, (Coder) null);
        flinkExecutableStageFunction.setRuntimeContext(this.runtimeContext);
        Whitebox.setInternalState(flinkExecutableStageFunction, "stateRequestHandler", this.stateRequestHandler);
        return flinkExecutableStageFunction;
    }
}
