package org.apache.beam.runners.fnexecution.control;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.transforms.Create;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.class */
public class SingleEnvironmentInstanceJobBundleFactoryTest {

    @Mock
    private EnvironmentFactory environmentFactory;

    @Mock
    private InstructionRequestHandler instructionRequestHandler;
    private ExecutorService executor = Executors.newCachedThreadPool();
    private GrpcFnServer<GrpcDataService> dataServer;
    private GrpcFnServer<GrpcStateService> stateServer;
    private JobBundleFactory factory;

    @Before
    public void setup() throws Exception {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.instructionRequestHandler.handle((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(CompletableFuture.completedFuture(BeamFnApi.InstructionResponse.getDefaultInstance()));
        InProcessServerFactory create = InProcessServerFactory.create();
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(this.executor, OutboundObserverFactory.serverDirect()), create);
        this.stateServer = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), create);
        this.factory = SingleEnvironmentInstanceJobBundleFactory.create(this.environmentFactory, this.dataServer, this.stateServer);
    }

    @After
    public void teardown() throws Exception {
        GrpcFnServer<GrpcDataService> grpcFnServer = this.dataServer;
        try {
            GrpcFnServer<GrpcStateService> grpcFnServer2 = this.stateServer;
            Throwable th = null;
            try {
                try {
                    this.executor.shutdownNow();
                    if (grpcFnServer2 != null) {
                        $closeResource(null, grpcFnServer2);
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (grpcFnServer2 != null) {
                    $closeResource(th, grpcFnServer2);
                }
                throw th3;
            }
        } finally {
            if (grpcFnServer != null) {
                $closeResource(null, grpcFnServer);
            }
        }
    }

    @Test
    public void closeShutsDownEnvironments() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply("Create", Create.of(1, new Integer[]{2, 3}));
        create.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
        ExecutableStage executableStage = (ExecutableStage) GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages().stream().findFirst().get();
        RemoteEnvironment remoteEnvironment = (RemoteEnvironment) Mockito.mock(RemoteEnvironment.class);
        Mockito.when(remoteEnvironment.getInstructionRequestHandler()).thenReturn(this.instructionRequestHandler);
        Mockito.when(this.environmentFactory.createEnvironment(executableStage.getEnvironment())).thenReturn(remoteEnvironment);
        this.factory.forStage(executableStage);
        this.factory.close();
        ((RemoteEnvironment) Mockito.verify(remoteEnvironment)).close();
    }

    @Test
    public void closeShutsDownEnvironmentsWhenSomeFail() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply("Create", Create.of(1, new Integer[]{2, 3}));
        create.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
        ExecutableStage executableStage = (ExecutableStage) GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages().stream().findFirst().get();
        RunnerApi.ExecutableStagePayload parseFrom = RunnerApi.ExecutableStagePayload.parseFrom(executableStage.toPTransform("foo").getSpec().getPayload());
        ExecutableStage fromPayload = ExecutableStage.fromPayload(parseFrom.toBuilder().setEnvironment(Environments.createDockerEnvironment("second_env")).build());
        ExecutableStage fromPayload2 = ExecutableStage.fromPayload(parseFrom.toBuilder().setEnvironment(Environments.createDockerEnvironment("third_env")).build());
        RemoteEnvironment remoteEnvironment = (RemoteEnvironment) Mockito.mock(RemoteEnvironment.class, "First Remote Env");
        RemoteEnvironment remoteEnvironment2 = (RemoteEnvironment) Mockito.mock(RemoteEnvironment.class, "Second Remote Env");
        RemoteEnvironment remoteEnvironment3 = (RemoteEnvironment) Mockito.mock(RemoteEnvironment.class, "Third Remote Env");
        Mockito.when(this.environmentFactory.createEnvironment(executableStage.getEnvironment())).thenReturn(remoteEnvironment);
        Mockito.when(this.environmentFactory.createEnvironment(fromPayload.getEnvironment())).thenReturn(remoteEnvironment2);
        Mockito.when(this.environmentFactory.createEnvironment(fromPayload2.getEnvironment())).thenReturn(remoteEnvironment3);
        Mockito.when(remoteEnvironment.getInstructionRequestHandler()).thenReturn(this.instructionRequestHandler);
        Mockito.when(remoteEnvironment2.getInstructionRequestHandler()).thenReturn(this.instructionRequestHandler);
        Mockito.when(remoteEnvironment3.getInstructionRequestHandler()).thenReturn(this.instructionRequestHandler);
        this.factory.forStage(executableStage);
        this.factory.forStage(fromPayload);
        this.factory.forStage(fromPayload2);
        IllegalStateException illegalStateException = new IllegalStateException("first stage");
        ((RemoteEnvironment) Mockito.doThrow(illegalStateException).when(remoteEnvironment)).close();
        IllegalStateException illegalStateException2 = new IllegalStateException("third stage");
        ((RemoteEnvironment) Mockito.doThrow(illegalStateException2).when(remoteEnvironment3)).close();
        try {
            this.factory.close();
            Assert.fail("Factory close should have thrown");
        } catch (IllegalStateException e) {
            if (e.equals(illegalStateException)) {
                Assert.assertThat(ImmutableList.copyOf(e.getSuppressed()), org.hamcrest.Matchers.contains(new Throwable[]{illegalStateException2}));
            } else {
                if (!e.equals(illegalStateException2)) {
                    throw e;
                }
                Assert.assertThat(ImmutableList.copyOf(e.getSuppressed()), org.hamcrest.Matchers.contains(new Throwable[]{illegalStateException}));
            }
            ((RemoteEnvironment) Mockito.verify(remoteEnvironment)).close();
            ((RemoteEnvironment) Mockito.verify(remoteEnvironment2)).close();
            ((RemoteEnvironment) Mockito.verify(remoteEnvironment3)).close();
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
