/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.EmbeddedSdkHarness;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class SdkHarnessClientTest {
    @Mock
    public FnApiControlClient fnApiControlClient;
    @Mock
    public FnDataService dataService;
    @Rule
    public EmbeddedSdkHarness harness = EmbeddedSdkHarness.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private SdkHarnessClient sdkHarnessClient;
    private BeamFnApi.ProcessBundleDescriptor descriptor;
    private String inputPCollection;
    private static final String SDK_GRPC_READ_TRANSFORM = "read";
    private static final String SDK_GRPC_WRITE_TRANSFORM = "write";

    @Before
    public void setup() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        this.sdkHarnessClient = SdkHarnessClient.usingFnApiClient((InstructionRequestHandler)this.fnApiControlClient, (FnDataService)this.dataService);
        Pipeline userPipeline = Pipeline.create();
        TupleTag outputTag = new TupleTag();
        ((PCollection)userPipeline.apply("create", (PTransform)Create.of((Object)"foo", (Object[])new String[0]))).apply("proc", (PTransform)ParDo.of((DoFn)new TestFn()).withOutputTags(outputTag, TupleTagList.empty()));
        RunnerApi.Pipeline userProto = PipelineTranslation.toProto((Pipeline)userPipeline);
        BeamFnApi.ProcessBundleDescriptor.Builder pbdBuilder = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("my_id").putAllEnvironments(userProto.getComponents().getEnvironmentsMap()).putAllWindowingStrategies(userProto.getComponents().getWindowingStrategiesMap()).putAllCoders(userProto.getComponents().getCodersMap());
        RunnerApi.Coder fullValueCoder = CoderTranslation.toProto((Coder)WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE)).getCoder();
        pbdBuilder.putCoders("wire_coder", fullValueCoder);
        RunnerApi.PTransform targetProcessor = userProto.getComponents().getTransformsOrThrow("proc");
        BeamFnApi.RemoteGrpcPort port = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(this.harness.dataEndpoint()).setCoderId("wire_coder").build();
        RemoteGrpcPortRead readNode = RemoteGrpcPortRead.readFromPort((BeamFnApi.RemoteGrpcPort)port, (String)((String)Iterables.getOnlyElement(targetProcessor.getInputsMap().values())));
        RemoteGrpcPortWrite writeNode = RemoteGrpcPortWrite.writeToPort((String)((String)Iterables.getOnlyElement(targetProcessor.getOutputsMap().values())), (BeamFnApi.RemoteGrpcPort)port);
        for (String pc : targetProcessor.getInputsMap().values()) {
            pbdBuilder.putPcollections(pc, userProto.getComponents().getPcollectionsOrThrow(pc));
        }
        for (String pc : targetProcessor.getOutputsMap().values()) {
            pbdBuilder.putPcollections(pc, userProto.getComponents().getPcollectionsOrThrow(pc));
        }
        pbdBuilder.putTransforms("proc", targetProcessor).putTransforms(SDK_GRPC_READ_TRANSFORM, readNode.toPTransform()).putTransforms(SDK_GRPC_WRITE_TRANSFORM, writeNode.toPTransform());
        this.descriptor = pbdBuilder.build();
        this.inputPCollection = (String)Iterables.getOnlyElement(this.descriptor.getTransformsOrThrow(SDK_GRPC_READ_TRANSFORM).getOutputsMap().values());
    }

    @Test
    public void testRegisterCachesBundleProcessors() throws Exception {
        CompletableFuture registerResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(registerResponseFuture);
        BeamFnApi.ProcessBundleDescriptor descriptor1 = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("descriptor1").build();
        BeamFnApi.ProcessBundleDescriptor descriptor2 = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("descriptor2").build();
        Map<String, RemoteInputDestination> remoteInputs = Collections.singletonMap("inputPC", RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarIntCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (String)SDK_GRPC_READ_TRANSFORM));
        SdkHarnessClient.BundleProcessor processor1 = this.sdkHarnessClient.getProcessor(descriptor1, remoteInputs);
        SdkHarnessClient.BundleProcessor processor2 = this.sdkHarnessClient.getProcessor(descriptor2, remoteInputs);
        Assert.assertNotSame((Object)processor1, (Object)processor2);
        Assert.assertSame((Object)processor1, (Object)this.sdkHarnessClient.getProcessor(descriptor1, remoteInputs));
    }

    @Test
    public void testRegisterWithStateRequiresStateDelegator() throws Exception {
        CompletableFuture registerResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(registerResponseFuture);
        BeamFnApi.ProcessBundleDescriptor descriptor = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("test").setStateApiServiceDescriptor(Endpoints.ApiServiceDescriptor.newBuilder().setUrl("foo")).build();
        Map<String, RemoteInputDestination> remoteInputs = Collections.singletonMap("inputPC", RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarIntCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (String)SDK_GRPC_READ_TRANSFORM));
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("containing a state");
        this.sdkHarnessClient.getProcessor(descriptor, remoteInputs);
    }

    @Test
    public void testNewBundleNoDataDoesNotCrash() throws Exception {
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.send((LogicalEndpoint)Matchers.any(), (Coder)Matchers.eq((Object)coder))).thenReturn((Object)((CloseableFnDataReceiver)Mockito.mock(CloseableFnDataReceiver.class)));
        try (SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle(Collections.emptyMap(), BundleProgressHandler.ignored());){
            BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
            processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
        }
    }

    @Test
    public void testNewBundleAndProcessElements() throws Exception {
        SdkHarnessClient client = this.harness.client();
        SdkHarnessClient.BundleProcessor processor = client.getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (String)SDK_GRPC_READ_TRANSFORM)));
        ArrayList outputs = new ArrayList();
        try (SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle(Collections.singletonMap(SDK_GRPC_WRITE_TRANSFORM, RemoteOutputReceiver.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)LengthPrefixCoder.of((Coder)StringUtf8Coder.of()), (Coder)GlobalWindow.Coder.INSTANCE), outputs::add)), BundleProgressHandler.ignored());){
            FnDataReceiver bundleInputReceiver = (FnDataReceiver)Iterables.getOnlyElement(activeBundle.getInputReceivers().values());
            bundleInputReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)"foo"));
            bundleInputReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)"bar"));
            bundleInputReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)"baz"));
        }
        Assert.assertThat(outputs, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"spam"), WindowedValue.valueInGlobalWindow((Object)"ham"), WindowedValue.valueInGlobalWindow((Object)"eggs")}));
    }

    @Test
    public void handleCleanupWhenInputSenderFails() throws Exception {
        Exception testException = new Exception();
        InboundDataClient mockOutputReceiver = (InboundDataClient)Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver mockInputSender = (CloseableFnDataReceiver)Mockito.mock(CloseableFnDataReceiver.class);
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.receive((LogicalEndpoint)Matchers.any(), (Coder)Matchers.any(), (FnDataReceiver)Matchers.any())).thenReturn((Object)mockOutputReceiver);
        Mockito.when((Object)this.dataService.send((LogicalEndpoint)Matchers.any(), (Coder)Matchers.eq((Object)coder))).thenReturn((Object)mockInputSender);
        ((CloseableFnDataReceiver)Mockito.doThrow((Throwable[])new Throwable[]{testException}).when((Object)mockInputSender)).close();
        RemoteOutputReceiver mockRemoteOutputReceiver = (RemoteOutputReceiver)Mockito.mock(RemoteOutputReceiver.class);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        try {
            SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)mockRemoteOutputReceiver), mockProgressHandler);
            Throwable throwable = null;
            if (activeBundle != null) {
                SdkHarnessClientTest.$closeResource(throwable, (AutoCloseable)activeBundle);
            }
            Assert.fail((String)"Exception expected");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)testException, (Object)e);
            ((InboundDataClient)Mockito.verify((Object)mockOutputReceiver)).cancel();
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockOutputReceiver});
        }
    }

    @Test
    public void handleCleanupWithStateWhenInputSenderFails() throws Exception {
        Exception testException = new Exception();
        InboundDataClient mockOutputReceiver = (InboundDataClient)Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver mockInputSender = (CloseableFnDataReceiver)Mockito.mock(CloseableFnDataReceiver.class);
        StateDelegator mockStateDelegator = (StateDelegator)Mockito.mock(StateDelegator.class);
        StateDelegator.Registration mockStateRegistration = (StateDelegator.Registration)Mockito.mock(StateDelegator.Registration.class);
        Mockito.when((Object)mockStateDelegator.registerForProcessBundleInstructionId((String)Matchers.any(), (StateRequestHandler)Matchers.any())).thenReturn((Object)mockStateRegistration);
        StateRequestHandler mockStateHandler = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap(this.inputPCollection, RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)), mockStateDelegator);
        Mockito.when((Object)this.dataService.receive((LogicalEndpoint)Matchers.any(), (Coder)Matchers.any(), (FnDataReceiver)Matchers.any())).thenReturn((Object)mockOutputReceiver);
        Mockito.when((Object)this.dataService.send((LogicalEndpoint)Matchers.any(), (Coder)Matchers.eq((Object)coder))).thenReturn((Object)mockInputSender);
        ((CloseableFnDataReceiver)Mockito.doThrow((Throwable[])new Throwable[]{testException}).when((Object)mockInputSender)).close();
        RemoteOutputReceiver mockRemoteOutputReceiver = (RemoteOutputReceiver)Mockito.mock(RemoteOutputReceiver.class);
        try {
            SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)mockRemoteOutputReceiver), mockStateHandler, mockProgressHandler);
            Throwable throwable = null;
            if (activeBundle != null) {
                SdkHarnessClientTest.$closeResource(throwable, (AutoCloseable)activeBundle);
            }
            Assert.fail((String)"Exception expected");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)testException, (Object)e);
            ((StateDelegator.Registration)Mockito.verify((Object)mockStateRegistration)).abort();
            ((InboundDataClient)Mockito.verify((Object)mockOutputReceiver)).cancel();
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockStateRegistration, mockOutputReceiver});
        }
    }

    @Test
    public void handleCleanupWhenProcessingBundleFails() throws Exception {
        Exception testException = new Exception();
        InboundDataClient mockOutputReceiver = (InboundDataClient)Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver mockInputSender = (CloseableFnDataReceiver)Mockito.mock(CloseableFnDataReceiver.class);
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.receive((LogicalEndpoint)Matchers.any(), (Coder)Matchers.any(), (FnDataReceiver)Matchers.any())).thenReturn((Object)mockOutputReceiver);
        Mockito.when((Object)this.dataService.send((LogicalEndpoint)Matchers.any(), (Coder)Matchers.eq((Object)coder))).thenReturn((Object)mockInputSender);
        RemoteOutputReceiver mockRemoteOutputReceiver = (RemoteOutputReceiver)Mockito.mock(RemoteOutputReceiver.class);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        try {
            try (SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)mockRemoteOutputReceiver), mockProgressHandler);){
                processBundleResponseFuture.completeExceptionally(testException);
            }
            Assert.fail((String)"Exception expected");
        }
        catch (ExecutionException e) {
            Assert.assertEquals((Object)testException, (Object)e.getCause());
            ((InboundDataClient)Mockito.verify((Object)mockOutputReceiver)).cancel();
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockOutputReceiver});
        }
    }

    @Test
    public void handleCleanupWithStateWhenProcessingBundleFails() throws Exception {
        Exception testException = new Exception();
        InboundDataClient mockOutputReceiver = (InboundDataClient)Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver mockInputSender = (CloseableFnDataReceiver)Mockito.mock(CloseableFnDataReceiver.class);
        StateDelegator mockStateDelegator = (StateDelegator)Mockito.mock(StateDelegator.class);
        StateDelegator.Registration mockStateRegistration = (StateDelegator.Registration)Mockito.mock(StateDelegator.Registration.class);
        Mockito.when((Object)mockStateDelegator.registerForProcessBundleInstructionId((String)Matchers.any(), (StateRequestHandler)Matchers.any())).thenReturn((Object)mockStateRegistration);
        StateRequestHandler mockStateHandler = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap(this.inputPCollection, RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)), mockStateDelegator);
        Mockito.when((Object)this.dataService.receive((LogicalEndpoint)Matchers.any(), (Coder)Matchers.any(), (FnDataReceiver)Matchers.any())).thenReturn((Object)mockOutputReceiver);
        Mockito.when((Object)this.dataService.send((LogicalEndpoint)Matchers.any(), (Coder)Matchers.eq((Object)coder))).thenReturn((Object)mockInputSender);
        RemoteOutputReceiver mockRemoteOutputReceiver = (RemoteOutputReceiver)Mockito.mock(RemoteOutputReceiver.class);
        try {
            try (SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)mockRemoteOutputReceiver), mockStateHandler, mockProgressHandler);){
                processBundleResponseFuture.completeExceptionally(testException);
            }
            Assert.fail((String)"Exception expected");
        }
        catch (ExecutionException e) {
            Assert.assertEquals((Object)testException, (Object)e.getCause());
            ((StateDelegator.Registration)Mockito.verify((Object)mockStateRegistration)).abort();
            ((InboundDataClient)Mockito.verify((Object)mockOutputReceiver)).cancel();
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockStateRegistration, mockOutputReceiver});
        }
    }

    @Test
    public void handleCleanupWhenAwaitingOnClosingOutputReceivers() throws Exception {
        Exception testException = new Exception();
        InboundDataClient mockOutputReceiver = (InboundDataClient)Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver mockInputSender = (CloseableFnDataReceiver)Mockito.mock(CloseableFnDataReceiver.class);
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.receive((LogicalEndpoint)Matchers.any(), (Coder)Matchers.any(), (FnDataReceiver)Matchers.any())).thenReturn((Object)mockOutputReceiver);
        Mockito.when((Object)this.dataService.send((LogicalEndpoint)Matchers.any(), (Coder)Matchers.eq((Object)coder))).thenReturn((Object)mockInputSender);
        ((InboundDataClient)Mockito.doThrow((Throwable[])new Throwable[]{testException}).when((Object)mockOutputReceiver)).awaitCompletion();
        RemoteOutputReceiver mockRemoteOutputReceiver = (RemoteOutputReceiver)Mockito.mock(RemoteOutputReceiver.class);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        try {
            try (SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)mockRemoteOutputReceiver), mockProgressHandler);){
                BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
                processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
            }
            Assert.fail((String)"Exception expected");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)testException, (Object)e);
        }
    }

    @Test
    public void handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws Exception {
        Exception testException = new Exception();
        InboundDataClient mockOutputReceiver = (InboundDataClient)Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver mockInputSender = (CloseableFnDataReceiver)Mockito.mock(CloseableFnDataReceiver.class);
        StateDelegator mockStateDelegator = (StateDelegator)Mockito.mock(StateDelegator.class);
        StateDelegator.Registration mockStateRegistration = (StateDelegator.Registration)Mockito.mock(StateDelegator.Registration.class);
        Mockito.when((Object)mockStateDelegator.registerForProcessBundleInstructionId((String)Matchers.any(), (StateRequestHandler)Matchers.any())).thenReturn((Object)mockStateRegistration);
        StateRequestHandler mockStateHandler = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap(this.inputPCollection, RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)), mockStateDelegator);
        Mockito.when((Object)this.dataService.receive((LogicalEndpoint)Matchers.any(), (Coder)Matchers.any(), (FnDataReceiver)Matchers.any())).thenReturn((Object)mockOutputReceiver);
        Mockito.when((Object)this.dataService.send((LogicalEndpoint)Matchers.any(), (Coder)Matchers.eq((Object)coder))).thenReturn((Object)mockInputSender);
        ((InboundDataClient)Mockito.doThrow((Throwable[])new Throwable[]{testException}).when((Object)mockOutputReceiver)).awaitCompletion();
        RemoteOutputReceiver mockRemoteOutputReceiver = (RemoteOutputReceiver)Mockito.mock(RemoteOutputReceiver.class);
        try {
            try (SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)mockRemoteOutputReceiver), mockStateHandler, mockProgressHandler);){
                BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
                processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
            }
            Assert.fail((String)"Exception expected");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)testException, (Object)e);
        }
    }

    private static class TestFn
    extends DoFn<String, String> {
        private TestFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            if ("foo".equals(context.element())) {
                context.output((Object)"spam");
            } else if ("bar".equals(context.element())) {
                context.output((Object)"ham");
            } else {
                context.output((Object)"eggs");
            }
        }
    }
}

