package org.apache.beam.runners.fnexecution.control;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.EmbeddedSdkHarness;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.class */
public class SdkHarnessClientTest {

    @Mock
    public FnApiControlClient fnApiControlClient;

    @Mock
    public FnDataService dataService;

    @Rule
    public EmbeddedSdkHarness harness = EmbeddedSdkHarness.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private SdkHarnessClient sdkHarnessClient;
    private BeamFnApi.ProcessBundleDescriptor descriptor;
    private String inputPCollection;
    private static final String SDK_GRPC_READ_TRANSFORM = "read";
    private static final String SDK_GRPC_WRITE_TRANSFORM = "write";

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest$TestFn.class */
    private static class TestFn extends DoFn<String, String> {
        private TestFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext) {
            if ("foo".equals(processContext.element())) {
                processContext.output("spam");
            } else if ("bar".equals(processContext.element())) {
                processContext.output("ham");
            } else {
                processContext.output("eggs");
            }
        }
    }

    @Before
    public void setup() throws Exception {
        MockitoAnnotations.initMocks(this);
        this.sdkHarnessClient = SdkHarnessClient.usingFnApiClient(this.fnApiControlClient, this.dataService);
        Pipeline create = Pipeline.create();
        create.apply("create", Create.of("foo", new String[0])).apply("proc", ParDo.of(new TestFn()).withOutputTags(new TupleTag(), TupleTagList.empty()));
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create);
        BeamFnApi.ProcessBundleDescriptor.Builder putAllCoders = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("my_id").putAllEnvironments(proto.getComponents().getEnvironmentsMap()).putAllWindowingStrategies(proto.getComponents().getWindowingStrategiesMap()).putAllCoders(proto.getComponents().getCodersMap());
        putAllCoders.putCoders("wire_coder", CoderTranslation.toProto(WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)).getCoder());
        RunnerApi.PTransform transformsOrThrow = proto.getComponents().getTransformsOrThrow("proc");
        BeamFnApi.RemoteGrpcPort build = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(this.harness.dataEndpoint()).setCoderId("wire_coder").build();
        RemoteGrpcPortRead readFromPort = RemoteGrpcPortRead.readFromPort(build, (String) Iterables.getOnlyElement(transformsOrThrow.getInputsMap().values()));
        RemoteGrpcPortWrite writeToPort = RemoteGrpcPortWrite.writeToPort((String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values()), build);
        for (String str : transformsOrThrow.getInputsMap().values()) {
            putAllCoders.putPcollections(str, proto.getComponents().getPcollectionsOrThrow(str));
        }
        for (String str2 : transformsOrThrow.getOutputsMap().values()) {
            putAllCoders.putPcollections(str2, proto.getComponents().getPcollectionsOrThrow(str2));
        }
        putAllCoders.putTransforms("proc", transformsOrThrow).putTransforms(SDK_GRPC_READ_TRANSFORM, readFromPort.toPTransform()).putTransforms(SDK_GRPC_WRITE_TRANSFORM, writeToPort.toPTransform());
        this.descriptor = putAllCoders.build();
        this.inputPCollection = (String) Iterables.getOnlyElement(this.descriptor.getTransformsOrThrow(SDK_GRPC_READ_TRANSFORM).getOutputsMap().values());
    }

    @Test
    public void testRegisterCachesBundleProcessors() throws Exception {
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture());
        BeamFnApi.ProcessBundleDescriptor build = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("descriptor1").build();
        BeamFnApi.ProcessBundleDescriptor build2 = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("descriptor2").build();
        Map singletonMap = Collections.singletonMap("inputPC", RemoteInputDestination.of(WindowedValue.FullWindowedValueCoder.of(VarIntCoder.of(), GlobalWindow.Coder.INSTANCE), SDK_GRPC_READ_TRANSFORM));
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(build, singletonMap);
        Assert.assertNotSame(processor, this.sdkHarnessClient.getProcessor(build2, singletonMap));
        Assert.assertSame(processor, this.sdkHarnessClient.getProcessor(build, singletonMap));
    }

    @Test
    public void testRegisterWithStateRequiresStateDelegator() throws Exception {
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture());
        BeamFnApi.ProcessBundleDescriptor build = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("test").setStateApiServiceDescriptor(Endpoints.ApiServiceDescriptor.newBuilder().setUrl("foo")).build();
        Map singletonMap = Collections.singletonMap("inputPC", RemoteInputDestination.of(WindowedValue.FullWindowedValueCoder.of(VarIntCoder.of(), GlobalWindow.Coder.INSTANCE), SDK_GRPC_READ_TRANSFORM));
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("containing a state");
        this.sdkHarnessClient.getProcessor(build, singletonMap);
    }

    @Test
    public void testNewBundleNoDataDoesNotCrash() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(completableFuture);
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of(of, SDK_GRPC_READ_TRANSFORM)));
        Mockito.when(this.dataService.send((LogicalEndpoint) Matchers.any(), (Coder) Matchers.eq(of))).thenReturn((CloseableFnDataReceiver) Mockito.mock(CloseableFnDataReceiver.class));
        SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(Collections.emptyMap(), BundleProgressHandler.ignored());
        try {
            completableFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance()).build());
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
        } catch (Throwable th) {
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            throw th;
        }
    }

    @Test
    public void testNewBundleAndProcessElements() throws Exception {
        SdkHarnessClient.BundleProcessor processor = this.harness.client().getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of(WindowedValue.FullWindowedValueCoder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), SDK_GRPC_READ_TRANSFORM)));
        ArrayList arrayList = new ArrayList();
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(LengthPrefixCoder.of(StringUtf8Coder.of()), GlobalWindow.Coder.INSTANCE);
        Objects.requireNonNull(arrayList);
        SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(Collections.singletonMap(SDK_GRPC_WRITE_TRANSFORM, RemoteOutputReceiver.of(of, (v1) -> {
            r3.add(v1);
        })), BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values());
                fnDataReceiver.accept(WindowedValue.valueInGlobalWindow("foo"));
                fnDataReceiver.accept(WindowedValue.valueInGlobalWindow("bar"));
                fnDataReceiver.accept(WindowedValue.valueInGlobalWindow("baz"));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                MatcherAssert.assertThat(arrayList, org.hamcrest.Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow("spam"), WindowedValue.valueInGlobalWindow("ham"), WindowedValue.valueInGlobalWindow("eggs")}));
            } finally {
            }
        } catch (Throwable th2) {
            if (newBundle != null) {
                $closeResource(th, newBundle);
            }
            throw th2;
        }
    }

    @Test
    public void handleCleanupWhenInputSenderFails() throws Exception {
        Exception exc = new Exception();
        InboundDataClient inboundDataClient = (InboundDataClient) Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver closeableFnDataReceiver = (CloseableFnDataReceiver) Mockito.mock(CloseableFnDataReceiver.class);
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(new CompletableFuture());
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of(of, SDK_GRPC_READ_TRANSFORM)));
        Mockito.when(this.dataService.receive((LogicalEndpoint) Matchers.any(), (Coder) Matchers.any(), (FnDataReceiver) Matchers.any())).thenReturn(inboundDataClient);
        Mockito.when(this.dataService.send((LogicalEndpoint) Matchers.any(), (Coder) Matchers.eq(of))).thenReturn(closeableFnDataReceiver);
        ((CloseableFnDataReceiver) Mockito.doThrow(new Throwable[]{exc}).when(closeableFnDataReceiver)).close();
        try {
            SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, (RemoteOutputReceiver) Mockito.mock(RemoteOutputReceiver.class)), (BundleProgressHandler) Mockito.mock(BundleProgressHandler.class));
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            Assert.fail("Exception expected");
        } catch (Exception e) {
            Assert.assertEquals(exc, e);
            ((InboundDataClient) Mockito.verify(inboundDataClient)).cancel();
            Mockito.verifyNoMoreInteractions(new Object[]{inboundDataClient});
        }
    }

    @Test
    public void handleCleanupWithStateWhenInputSenderFails() throws Exception {
        Exception exc = new Exception();
        InboundDataClient inboundDataClient = (InboundDataClient) Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver closeableFnDataReceiver = (CloseableFnDataReceiver) Mockito.mock(CloseableFnDataReceiver.class);
        StateDelegator stateDelegator = (StateDelegator) Mockito.mock(StateDelegator.class);
        StateDelegator.Registration registration = (StateDelegator.Registration) Mockito.mock(StateDelegator.Registration.class);
        Mockito.when(stateDelegator.registerForProcessBundleInstructionId((String) Matchers.any(), (StateRequestHandler) Matchers.any())).thenReturn(registration);
        StateRequestHandler stateRequestHandler = (StateRequestHandler) Mockito.mock(StateRequestHandler.class);
        Mockito.when(stateRequestHandler.getCacheTokens()).thenReturn(Collections.emptyList());
        BundleProgressHandler bundleProgressHandler = (BundleProgressHandler) Mockito.mock(BundleProgressHandler.class);
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(new CompletableFuture());
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap(this.inputPCollection, RemoteInputDestination.of(of, SDK_GRPC_READ_TRANSFORM)), stateDelegator);
        Mockito.when(this.dataService.receive((LogicalEndpoint) Matchers.any(), (Coder) Matchers.any(), (FnDataReceiver) Matchers.any())).thenReturn(inboundDataClient);
        Mockito.when(this.dataService.send((LogicalEndpoint) Matchers.any(), (Coder) Matchers.eq(of))).thenReturn(closeableFnDataReceiver);
        ((CloseableFnDataReceiver) Mockito.doThrow(new Throwable[]{exc}).when(closeableFnDataReceiver)).close();
        try {
            SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, (RemoteOutputReceiver) Mockito.mock(RemoteOutputReceiver.class)), stateRequestHandler, bundleProgressHandler);
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            Assert.fail("Exception expected");
        } catch (Exception e) {
            Assert.assertEquals(exc, e);
            ((StateDelegator.Registration) Mockito.verify(registration)).abort();
            ((InboundDataClient) Mockito.verify(inboundDataClient)).cancel();
            Mockito.verifyNoMoreInteractions(new Object[]{registration, inboundDataClient});
        }
    }

    @Test
    public void handleCleanupWhenProcessingBundleFails() throws Exception {
        Exception exc = new Exception();
        InboundDataClient inboundDataClient = (InboundDataClient) Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver closeableFnDataReceiver = (CloseableFnDataReceiver) Mockito.mock(CloseableFnDataReceiver.class);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(completableFuture);
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of(of, SDK_GRPC_READ_TRANSFORM)));
        Mockito.when(this.dataService.receive((LogicalEndpoint) Matchers.any(), (Coder) Matchers.any(), (FnDataReceiver) Matchers.any())).thenReturn(inboundDataClient);
        Mockito.when(this.dataService.send((LogicalEndpoint) Matchers.any(), (Coder) Matchers.eq(of))).thenReturn(closeableFnDataReceiver);
        try {
            SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, (RemoteOutputReceiver) Mockito.mock(RemoteOutputReceiver.class)), (BundleProgressHandler) Mockito.mock(BundleProgressHandler.class));
            Throwable th = null;
            try {
                try {
                    completableFuture.completeExceptionally(exc);
                    if (newBundle != null) {
                        $closeResource(null, newBundle);
                    }
                    Assert.fail("Exception expected");
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (newBundle != null) {
                    $closeResource(th, newBundle);
                }
                throw th3;
            }
        } catch (ExecutionException e) {
            Assert.assertEquals(exc, e.getCause());
            ((InboundDataClient) Mockito.verify(inboundDataClient)).cancel();
            Mockito.verifyNoMoreInteractions(new Object[]{inboundDataClient});
        }
    }

    @Test
    public void handleCleanupWithStateWhenProcessingBundleFails() throws Exception {
        Exception exc = new Exception();
        InboundDataClient inboundDataClient = (InboundDataClient) Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver closeableFnDataReceiver = (CloseableFnDataReceiver) Mockito.mock(CloseableFnDataReceiver.class);
        StateDelegator stateDelegator = (StateDelegator) Mockito.mock(StateDelegator.class);
        StateDelegator.Registration registration = (StateDelegator.Registration) Mockito.mock(StateDelegator.Registration.class);
        Mockito.when(stateDelegator.registerForProcessBundleInstructionId((String) Matchers.any(), (StateRequestHandler) Matchers.any())).thenReturn(registration);
        StateRequestHandler stateRequestHandler = (StateRequestHandler) Mockito.mock(StateRequestHandler.class);
        Mockito.when(stateRequestHandler.getCacheTokens()).thenReturn(Collections.emptyList());
        BundleProgressHandler bundleProgressHandler = (BundleProgressHandler) Mockito.mock(BundleProgressHandler.class);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(completableFuture);
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap(this.inputPCollection, RemoteInputDestination.of(of, SDK_GRPC_READ_TRANSFORM)), stateDelegator);
        Mockito.when(this.dataService.receive((LogicalEndpoint) Matchers.any(), (Coder) Matchers.any(), (FnDataReceiver) Matchers.any())).thenReturn(inboundDataClient);
        Mockito.when(this.dataService.send((LogicalEndpoint) Matchers.any(), (Coder) Matchers.eq(of))).thenReturn(closeableFnDataReceiver);
        try {
            SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, (RemoteOutputReceiver) Mockito.mock(RemoteOutputReceiver.class)), stateRequestHandler, bundleProgressHandler);
            Throwable th = null;
            try {
                try {
                    completableFuture.completeExceptionally(exc);
                    if (newBundle != null) {
                        $closeResource(null, newBundle);
                    }
                    Assert.fail("Exception expected");
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (newBundle != null) {
                    $closeResource(th, newBundle);
                }
                throw th3;
            }
        } catch (ExecutionException e) {
            Assert.assertEquals(exc, e.getCause());
            ((StateDelegator.Registration) Mockito.verify(registration)).abort();
            ((InboundDataClient) Mockito.verify(inboundDataClient)).cancel();
            Mockito.verifyNoMoreInteractions(new Object[]{registration, inboundDataClient});
        }
    }

    @Test
    public void handleCleanupWhenAwaitingOnClosingOutputReceivers() throws Exception {
        Exception exc = new Exception();
        InboundDataClient inboundDataClient = (InboundDataClient) Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver closeableFnDataReceiver = (CloseableFnDataReceiver) Mockito.mock(CloseableFnDataReceiver.class);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(completableFuture);
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap("inputPC", RemoteInputDestination.of(of, SDK_GRPC_READ_TRANSFORM)));
        Mockito.when(this.dataService.receive((LogicalEndpoint) Matchers.any(), (Coder) Matchers.any(), (FnDataReceiver) Matchers.any())).thenReturn(inboundDataClient);
        Mockito.when(this.dataService.send((LogicalEndpoint) Matchers.any(), (Coder) Matchers.eq(of))).thenReturn(closeableFnDataReceiver);
        ((InboundDataClient) Mockito.doThrow(new Throwable[]{exc}).when(inboundDataClient)).awaitCompletion();
        try {
            SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, (RemoteOutputReceiver) Mockito.mock(RemoteOutputReceiver.class)), (BundleProgressHandler) Mockito.mock(BundleProgressHandler.class));
            try {
                completableFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance()).build());
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                Assert.fail("Exception expected");
            } catch (Throwable th) {
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                throw th;
            }
        } catch (Exception e) {
            Assert.assertEquals(exc, e);
        }
    }

    @Test
    public void handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws Exception {
        Exception exc = new Exception();
        InboundDataClient inboundDataClient = (InboundDataClient) Mockito.mock(InboundDataClient.class);
        CloseableFnDataReceiver closeableFnDataReceiver = (CloseableFnDataReceiver) Mockito.mock(CloseableFnDataReceiver.class);
        StateDelegator stateDelegator = (StateDelegator) Mockito.mock(StateDelegator.class);
        Mockito.when(stateDelegator.registerForProcessBundleInstructionId((String) Matchers.any(), (StateRequestHandler) Matchers.any())).thenReturn((StateDelegator.Registration) Mockito.mock(StateDelegator.Registration.class));
        StateRequestHandler stateRequestHandler = (StateRequestHandler) Mockito.mock(StateRequestHandler.class);
        Mockito.when(stateRequestHandler.getCacheTokens()).thenReturn(Collections.emptyList());
        BundleProgressHandler bundleProgressHandler = (BundleProgressHandler) Mockito.mock(BundleProgressHandler.class);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(completableFuture);
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonMap(this.inputPCollection, RemoteInputDestination.of(of, SDK_GRPC_READ_TRANSFORM)), stateDelegator);
        Mockito.when(this.dataService.receive((LogicalEndpoint) Matchers.any(), (Coder) Matchers.any(), (FnDataReceiver) Matchers.any())).thenReturn(inboundDataClient);
        Mockito.when(this.dataService.send((LogicalEndpoint) Matchers.any(), (Coder) Matchers.eq(of))).thenReturn(closeableFnDataReceiver);
        ((InboundDataClient) Mockito.doThrow(new Throwable[]{exc}).when(inboundDataClient)).awaitCompletion();
        try {
            SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, (RemoteOutputReceiver) Mockito.mock(RemoteOutputReceiver.class)), stateRequestHandler, bundleProgressHandler);
            Throwable th = null;
            try {
                try {
                    completableFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance()).build());
                    if (newBundle != null) {
                        $closeResource(null, newBundle);
                    }
                    Assert.fail("Exception expected");
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (newBundle != null) {
                    $closeResource(th, newBundle);
                }
                throw th3;
            }
        } catch (Exception e) {
            Assert.assertEquals(exc, e);
        }
    }

    @Test
    public void verifyCacheTokensAreUsedInNewBundleRequest() {
        Mockito.when(this.fnApiControlClient.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture());
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("descriptor1").build(), Collections.singletonMap("inputPC", RemoteInputDestination.of(WindowedValue.FullWindowedValueCoder.of(VarIntCoder.of(), GlobalWindow.Coder.INSTANCE), SDK_GRPC_READ_TRANSFORM)));
        Mockito.when(this.dataService.send((LogicalEndpoint) Matchers.any(), (Coder) Matchers.any())).thenReturn((CloseableFnDataReceiver) Mockito.mock(CloseableFnDataReceiver.class));
        StateRequestHandler stateRequestHandler = (StateRequestHandler) Mockito.mock(StateRequestHandler.class);
        List singletonList = Collections.singletonList(BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().getDefaultInstanceForType());
        Mockito.when(stateRequestHandler.getCacheTokens()).thenReturn(singletonList);
        processor.newBundle(ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, (RemoteOutputReceiver) Mockito.mock(RemoteOutputReceiver.class)), stateRequestHandler, BundleProgressHandler.ignored());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BeamFnApi.InstructionRequest.class);
        ((FnApiControlClient) Mockito.verify(this.fnApiControlClient, Mockito.times(2))).handle((BeamFnApi.InstructionRequest) forClass.capture());
        List allValues = forClass.getAllValues();
        MatcherAssert.assertThat(((BeamFnApi.InstructionRequest) allValues.get(0)).getRequestCase(), org.hamcrest.Matchers.is(BeamFnApi.InstructionRequest.RequestCase.REGISTER));
        MatcherAssert.assertThat(((BeamFnApi.InstructionRequest) allValues.get(1)).getRequestCase(), org.hamcrest.Matchers.is(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE));
        MatcherAssert.assertThat(((BeamFnApi.InstructionRequest) allValues.get(1)).getProcessBundle().getCacheTokensList(), org.hamcrest.Matchers.is(singletonList));
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
