package org.apache.beam.fn.harness.control;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingConsumer;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.class */
public class ProcessBundleHandlerTest {
    private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
    private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Mock
    private BeamFnDataClient beamFnDataClient;

    @Captor
    private ArgumentCaptor<ThrowingConsumer<WindowedValue<String>>> consumerCaptor;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void testOrderOfStartAndFinishCalls() throws Exception {
        BeamFnApi.ProcessBundleDescriptor build = BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:org.apache.beam:source:runner:0.1").build()).putOutputs("2L-output", "2L-output-pc").build()).putTransforms("3L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:org.apache.beam:sink:runner:0.1").build()).putInputs("3L-input", "2L-output-pc").build()).putPcollections("2L-output-pc", RunnerApi.PCollection.getDefaultInstance()).build();
        ImmutableMap of = ImmutableMap.of("1L", build);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        PTransformRunnerFactory pTransformRunnerFactory = (pipelineOptions, beamFnDataClient, beamFnStateClient, str, pTransform, supplier, map, map2, map3, listMultimap, consumer, consumer2, bundleSplitListener) -> {
            Assert.assertThat((String) supplier.get(), Matchers.equalTo("999L"));
            arrayList.add(pTransform);
            consumer.accept(() -> {
                arrayList2.add("Start" + str);
            });
            consumer2.accept(() -> {
                arrayList2.add("Finish" + str);
            });
            return null;
        };
        PipelineOptions create = PipelineOptionsFactory.create();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, ImmutableMap.of("urn:org.apache.beam:source:runner:0.1", pTransformRunnerFactory, "urn:org.apache.beam:sink:runner:0.1", pTransformRunnerFactory)).processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
        Assert.assertThat(arrayList, Matchers.contains(build.getTransformsMap().get("3L"), build.getTransformsMap().get("2L")));
        Assert.assertThat(arrayList2, Matchers.contains("Start3L", "Start2L", "Finish2L", "Finish3L"));
    }

    @Test
    public void testCreatingPTransformExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:org.apache.beam:source:runner:0.1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, ImmutableMap.of("urn:org.apache.beam:source:runner:0.1", (pipelineOptions, beamFnDataClient, beamFnStateClient, str, pTransform, supplier, map, map2, map3, listMultimap, consumer, consumer2, bundleSplitListener) -> {
            this.thrown.expect(IllegalStateException.class);
            this.thrown.expectMessage("TestException");
            throw new IllegalStateException("TestException");
        })).processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
    }

    @Test
    public void testPTransformStartExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:org.apache.beam:source:runner:0.1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, ImmutableMap.of("urn:org.apache.beam:source:runner:0.1", (pipelineOptions, beamFnDataClient, beamFnStateClient, str, pTransform, supplier, map, map2, map3, listMultimap, consumer, consumer2, bundleSplitListener) -> {
            this.thrown.expect(IllegalStateException.class);
            this.thrown.expectMessage("TestException");
            consumer.accept(ProcessBundleHandlerTest::throwException);
            return null;
        })).processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
    }

    @Test
    public void testPTransformFinishExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:org.apache.beam:source:runner:0.1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, ImmutableMap.of("urn:org.apache.beam:source:runner:0.1", (pipelineOptions, beamFnDataClient, beamFnStateClient, str, pTransform, supplier, map, map2, map3, listMultimap, consumer, consumer2, bundleSplitListener) -> {
            this.thrown.expect(IllegalStateException.class);
            this.thrown.expectMessage("TestException");
            consumer2.accept(ProcessBundleHandlerTest::throwException);
            return null;
        })).processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
    }

    @Test
    public void testPendingStateCallsBlockTillCompletion() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:org.apache.beam:source:runner:0.1").build()).build()).setStateApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).build());
        final CompletableFuture completableFuture = new CompletableFuture();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = (BeamFnStateGrpcClientCache) Mockito.mock(BeamFnStateGrpcClientCache.class);
        BeamFnStateClient beamFnStateClient = (BeamFnStateClient) Mockito.mock(BeamFnStateClient.class);
        Mockito.when(beamFnStateGrpcClientCache.forApiServiceDescriptor((Endpoints.ApiServiceDescriptor) org.mockito.Matchers.any())).thenReturn(beamFnStateClient);
        ((BeamFnStateClient) Mockito.doAnswer(invocationOnMock -> {
            BeamFnApi.StateRequest.Builder builder = (BeamFnApi.StateRequest.Builder) invocationOnMock.getArguments()[0];
            CompletableFuture completableFuture3 = (CompletableFuture) invocationOnMock.getArguments()[1];
            new Thread(() -> {
                Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
                String instructionReference = builder.getInstructionReference();
                boolean z = -1;
                switch (instructionReference.hashCode()) {
                    case -1149187101:
                        if (instructionReference.equals("SUCCESS")) {
                            z = false;
                            break;
                        }
                        break;
                    case 2150174:
                        if (instructionReference.equals("FAIL")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        completableFuture3.complete(BeamFnApi.StateResponse.getDefaultInstance());
                        return;
                    case true:
                        completableFuture3.completeExceptionally(new RuntimeException("TEST ERROR"));
                        return;
                    default:
                        return;
                }
            }).start();
            return null;
        }).when(beamFnStateClient)).handle((BeamFnApi.StateRequest.Builder) org.mockito.Matchers.any(), (CompletableFuture) org.mockito.Matchers.any());
        PipelineOptions create = PipelineOptionsFactory.create();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient, beamFnStateGrpcClientCache, ImmutableMap.of("urn:org.apache.beam:source:runner:0.1", new PTransformRunnerFactory<Object>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.1
            public Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient2, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, ListMultimap<String, FnDataReceiver<WindowedValue<?>>> listMultimap, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2, BundleSplitListener bundleSplitListener) throws IOException {
                consumer.accept(() -> {
                    doStateCalls(beamFnStateClient2);
                });
                return null;
            }

            private void doStateCalls(BeamFnStateClient beamFnStateClient2) {
                beamFnStateClient2.handle(BeamFnApi.StateRequest.newBuilder().setInstructionReference("SUCCESS"), completableFuture);
                beamFnStateClient2.handle(BeamFnApi.StateRequest.newBuilder().setInstructionReference("FAIL"), completableFuture2);
            }
        })).processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
        Assert.assertTrue(completableFuture.isDone());
        Assert.assertTrue(completableFuture2.isDone());
    }

    @Test
    public void testStateCallsFailIfNoStateApiServiceDescriptorSpecified() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:org.apache.beam:source:runner:0.1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, ImmutableMap.of("urn:org.apache.beam:source:runner:0.1", new PTransformRunnerFactory<Object>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.2
            public Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, ListMultimap<String, FnDataReceiver<WindowedValue<?>>> listMultimap, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2, BundleSplitListener bundleSplitListener) throws IOException {
                consumer.accept(() -> {
                    doStateCalls(beamFnStateClient);
                });
                return null;
            }

            private void doStateCalls(BeamFnStateClient beamFnStateClient) {
                ProcessBundleHandlerTest.this.thrown.expect(IllegalStateException.class);
                ProcessBundleHandlerTest.this.thrown.expectMessage("State API calls are unsupported");
                beamFnStateClient.handle(BeamFnApi.StateRequest.newBuilder().setInstructionReference("SUCCESS"), new CompletableFuture());
            }
        })).processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
    }

    private static void throwException() {
        throw new IllegalStateException("TestException");
    }
}
