package org.apache.beam.runners.fnexecution.control;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.AutoValue_DefaultJobBundleFactory_ServerInfo;
import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.class */
public class DefaultJobBundleFactoryTest {

    @Mock
    private EnvironmentFactory envFactory;

    @Mock
    private RemoteEnvironment remoteEnvironment;

    @Mock
    private InstructionRequestHandler instructionHandler;

    @Mock
    GrpcFnServer<FnApiControlClientPoolService> controlServer;

    @Mock
    GrpcFnServer<GrpcLoggingService> loggingServer;

    @Mock
    GrpcFnServer<ArtifactRetrievalService> retrievalServer;

    @Mock
    GrpcFnServer<StaticGrpcProvisionService> provisioningServer;

    @Mock
    private GrpcFnServer<GrpcDataService> dataServer;

    @Mock
    private GrpcFnServer<GrpcStateService> stateServer;
    private DefaultJobBundleFactory.ServerInfo serverInfo;

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private final RunnerApi.Environment environment = RunnerApi.Environment.newBuilder().setUrn("dummy:urn").build();
    private final IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
    private final BeamFnApi.InstructionResponse instructionResponse = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("instruction-id").build();
    private final EnvironmentFactory.Provider envFactoryProvider = (grpcFnServer, grpcFnServer2, grpcFnServer3, grpcFnServer4, controlClientPool, idGenerator) -> {
        return this.envFactory;
    };
    private final Map<String, EnvironmentFactory.Provider> envFactoryProviderMap = ImmutableMap.of(this.environment.getUrn(), this.envFactoryProvider);

    @Before
    public void setUpMocks() throws Exception {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.envFactory.createEnvironment(this.environment)).thenReturn(this.remoteEnvironment);
        Mockito.when(this.remoteEnvironment.getInstructionRequestHandler()).thenReturn(this.instructionHandler);
        Mockito.when(this.instructionHandler.handle((BeamFnApi.InstructionRequest) Matchers.any())).thenReturn(CompletableFuture.completedFuture(this.instructionResponse));
        Mockito.when(this.dataServer.getApiServiceDescriptor()).thenReturn(Endpoints.ApiServiceDescriptor.getDefaultInstance());
        GrpcDataService grpcDataService = (GrpcDataService) Mockito.mock(GrpcDataService.class);
        Mockito.when(grpcDataService.send((LogicalEndpoint) Matchers.any(), (Coder) Matchers.any())).thenReturn((CloseableFnDataReceiver) Mockito.mock(CloseableFnDataReceiver.class));
        Mockito.when(this.dataServer.getService()).thenReturn(grpcDataService);
        Mockito.when(this.stateServer.getApiServiceDescriptor()).thenReturn(Endpoints.ApiServiceDescriptor.getDefaultInstance());
        GrpcStateService grpcStateService = (GrpcStateService) Mockito.mock(GrpcStateService.class);
        Mockito.when(grpcStateService.registerForProcessBundleInstructionId((String) Matchers.any(), (StateRequestHandler) Matchers.any())).thenReturn((StateDelegator.Registration) Mockito.mock(StateDelegator.Registration.class));
        Mockito.when(this.stateServer.getService()).thenReturn(grpcStateService);
        this.serverInfo = new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder().setControlServer(this.controlServer).setLoggingServer(this.loggingServer).setRetrievalServer(this.retrievalServer).setProvisioningServer(this.provisioningServer).setDataServer(this.dataServer).setStateServer(this.stateServer).build();
    }

    @Test
    public void createsCorrectEnvironment() throws Exception {
        DefaultJobBundleFactory createDefaultJobBundleFactory = createDefaultJobBundleFactory(this.envFactoryProviderMap);
        try {
            createDefaultJobBundleFactory.forStage(getExecutableStage(this.environment));
            ((EnvironmentFactory) Mockito.verify(this.envFactory)).createEnvironment(this.environment);
            if (createDefaultJobBundleFactory != null) {
                $closeResource(null, createDefaultJobBundleFactory);
            }
        } catch (Throwable th) {
            if (createDefaultJobBundleFactory != null) {
                $closeResource(null, createDefaultJobBundleFactory);
            }
            throw th;
        }
    }

    @Test
    public void createsMultipleEnvironmentOfSingleType() throws Exception {
        ServerFactory createDefault = ServerFactory.createDefault();
        RunnerApi.Environment build = RunnerApi.Environment.newBuilder().setUrn("env:urn:a").setPayload(ByteString.copyFrom(new byte[1])).build();
        RunnerApi.Environment build2 = RunnerApi.Environment.newBuilder().setUrn("env:urn:a").setPayload(ByteString.copyFrom(new byte[2])).build();
        EnvironmentFactory environmentFactory = (EnvironmentFactory) Mockito.mock(EnvironmentFactory.class);
        Mockito.when(environmentFactory.createEnvironment(build)).thenReturn(this.remoteEnvironment);
        Mockito.when(environmentFactory.createEnvironment(build2)).thenReturn(this.remoteEnvironment);
        EnvironmentFactory.Provider provider = (EnvironmentFactory.Provider) Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when(provider.createEnvironmentFactory((GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (ControlClientPool) Matchers.any(), (IdGenerator) Matchers.any())).thenReturn(environmentFactory);
        Mockito.when(provider.getServerFactory()).thenReturn(createDefault);
        RunnerApi.Environment build3 = RunnerApi.Environment.newBuilder().setUrn("env:urn:b").build();
        EnvironmentFactory environmentFactory2 = (EnvironmentFactory) Mockito.mock(EnvironmentFactory.class);
        Mockito.when(environmentFactory2.createEnvironment(build3)).thenReturn(this.remoteEnvironment);
        EnvironmentFactory.Provider provider2 = (EnvironmentFactory.Provider) Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when(provider2.createEnvironmentFactory((GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (ControlClientPool) Matchers.any(), (IdGenerator) Matchers.any())).thenReturn(environmentFactory2);
        Mockito.when(provider2.getServerFactory()).thenReturn(createDefault);
        DefaultJobBundleFactory createDefaultJobBundleFactory = createDefaultJobBundleFactory(ImmutableMap.of(build.getUrn(), provider, build3.getUrn(), provider2));
        Throwable th = null;
        try {
            try {
                createDefaultJobBundleFactory.forStage(getExecutableStage(build));
                ((EnvironmentFactory.Provider) Mockito.verify(provider, Mockito.times(1))).createEnvironmentFactory((GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (ControlClientPool) Matchers.any(), (IdGenerator) Matchers.any());
                ((EnvironmentFactory.Provider) Mockito.verify(provider2, Mockito.times(0))).createEnvironmentFactory((GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (ControlClientPool) Matchers.any(), (IdGenerator) Matchers.any());
                ((EnvironmentFactory) Mockito.verify(environmentFactory, Mockito.times(1))).createEnvironment(build);
                ((EnvironmentFactory) Mockito.verify(environmentFactory, Mockito.times(0))).createEnvironment(build2);
                createDefaultJobBundleFactory.forStage(getExecutableStage(build2));
                ((EnvironmentFactory.Provider) Mockito.verify(provider, Mockito.times(2))).createEnvironmentFactory((GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (ControlClientPool) Matchers.any(), (IdGenerator) Matchers.any());
                ((EnvironmentFactory.Provider) Mockito.verify(provider2, Mockito.times(0))).createEnvironmentFactory((GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (ControlClientPool) Matchers.any(), (IdGenerator) Matchers.any());
                ((EnvironmentFactory) Mockito.verify(environmentFactory, Mockito.times(1))).createEnvironment(build);
                ((EnvironmentFactory) Mockito.verify(environmentFactory, Mockito.times(1))).createEnvironment(build2);
                if (createDefaultJobBundleFactory != null) {
                    $closeResource(null, createDefaultJobBundleFactory);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createDefaultJobBundleFactory != null) {
                $closeResource(th, createDefaultJobBundleFactory);
            }
            throw th3;
        }
    }

    @Test
    public void creatingMultipleEnvironmentFromMultipleTypes() throws Exception {
        ServerFactory createDefault = ServerFactory.createDefault();
        RunnerApi.Environment build = RunnerApi.Environment.newBuilder().setUrn("env:urn:a").build();
        EnvironmentFactory environmentFactory = (EnvironmentFactory) Mockito.mock(EnvironmentFactory.class);
        Mockito.when(environmentFactory.createEnvironment(build)).thenReturn(this.remoteEnvironment);
        EnvironmentFactory.Provider provider = (EnvironmentFactory.Provider) Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when(provider.createEnvironmentFactory((GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (ControlClientPool) Matchers.any(), (IdGenerator) Matchers.any())).thenReturn(environmentFactory);
        Mockito.when(provider.getServerFactory()).thenReturn(createDefault);
        RunnerApi.Environment build2 = RunnerApi.Environment.newBuilder().setUrn("env:urn:b").build();
        EnvironmentFactory environmentFactory2 = (EnvironmentFactory) Mockito.mock(EnvironmentFactory.class);
        Mockito.when(environmentFactory2.createEnvironment(build2)).thenReturn(this.remoteEnvironment);
        EnvironmentFactory.Provider provider2 = (EnvironmentFactory.Provider) Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when(provider2.createEnvironmentFactory((GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (ControlClientPool) Matchers.any(), (IdGenerator) Matchers.any())).thenReturn(environmentFactory2);
        Mockito.when(provider2.getServerFactory()).thenReturn(createDefault);
        DefaultJobBundleFactory create = DefaultJobBundleFactory.create(JobInfo.create("testJob", "testJob", "token", Struct.getDefaultInstance()), ImmutableMap.of(build.getUrn(), provider, build2.getUrn(), provider2));
        try {
            create.forStage(getExecutableStage(build2));
            create.forStage(getExecutableStage(build));
            if (create != null) {
                $closeResource(null, create);
            }
            ((EnvironmentFactory) Mockito.verify(environmentFactory)).createEnvironment(build);
            ((EnvironmentFactory) Mockito.verify(environmentFactory2)).createEnvironment(build2);
        } catch (Throwable th) {
            if (create != null) {
                $closeResource(null, create);
            }
            throw th;
        }
    }

    @Test
    public void expiresEnvironment() throws Exception {
        ServerFactory createDefault = ServerFactory.createDefault();
        RunnerApi.Environment build = RunnerApi.Environment.newBuilder().setUrn("env:urn:a").build();
        EnvironmentFactory environmentFactory = (EnvironmentFactory) Mockito.mock(EnvironmentFactory.class);
        Mockito.when(environmentFactory.createEnvironment(build)).thenReturn(this.remoteEnvironment);
        EnvironmentFactory.Provider provider = (EnvironmentFactory.Provider) Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when(provider.createEnvironmentFactory((GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (GrpcFnServer) Matchers.any(), (ControlClientPool) Matchers.any(), (IdGenerator) Matchers.any())).thenReturn(environmentFactory);
        Mockito.when(provider.getServerFactory()).thenReturn(createDefault);
        ImmutableMap of = ImmutableMap.of(build.getUrn(), provider);
        PortablePipelineOptions as = PipelineOptionsFactory.as(PortablePipelineOptions.class);
        as.setEnvironmentExpirationMillis(1);
        DefaultJobBundleFactory defaultJobBundleFactory = new DefaultJobBundleFactory(JobInfo.create("testJob", "testJob", "token", PipelineOptionsTranslation.toProto(as)), of, this.stageIdGenerator, this.serverInfo);
        Throwable th = null;
        try {
            try {
                OutputReceiverFactory outputReceiverFactory = (OutputReceiverFactory) Mockito.mock(OutputReceiverFactory.class);
                StateRequestHandler stateRequestHandler = (StateRequestHandler) Mockito.mock(StateRequestHandler.class);
                Mockito.when(stateRequestHandler.getCacheTokens()).thenReturn(Collections.emptyList());
                StageBundleFactory forStage = defaultJobBundleFactory.forStage(getExecutableStage(build));
                Thread.sleep(10L);
                forStage.getBundle(outputReceiverFactory, stateRequestHandler, BundleProgressHandler.ignored()).close();
                Thread.sleep(10L);
                forStage.getBundle(outputReceiverFactory, stateRequestHandler, BundleProgressHandler.ignored()).close();
                $closeResource(null, defaultJobBundleFactory);
                ((EnvironmentFactory) Mockito.verify(environmentFactory, Mockito.times(3))).createEnvironment(build);
                ((RemoteEnvironment) Mockito.verify(this.remoteEnvironment, Mockito.times(3))).close();
            } finally {
            }
        } catch (Throwable th2) {
            $closeResource(th, defaultJobBundleFactory);
            throw th2;
        }
    }

    @Test
    public void closesEnvironmentOnCleanup() throws Exception {
        DefaultJobBundleFactory createDefaultJobBundleFactory = createDefaultJobBundleFactory(this.envFactoryProviderMap);
        try {
            createDefaultJobBundleFactory.forStage(getExecutableStage(this.environment));
            if (createDefaultJobBundleFactory != null) {
                $closeResource(null, createDefaultJobBundleFactory);
            }
            ((RemoteEnvironment) Mockito.verify(this.remoteEnvironment)).close();
        } catch (Throwable th) {
            if (createDefaultJobBundleFactory != null) {
                $closeResource(null, createDefaultJobBundleFactory);
            }
            throw th;
        }
    }

    @Test
    public void cachesEnvironment() throws Exception {
        DefaultJobBundleFactory createDefaultJobBundleFactory = createDefaultJobBundleFactory(this.envFactoryProviderMap);
        try {
            StageBundleFactory forStage = createDefaultJobBundleFactory.forStage(getExecutableStage(this.environment));
            StageBundleFactory forStage2 = createDefaultJobBundleFactory.forStage(getExecutableStage(this.environment));
            System.out.println("bundle factory 1:" + forStage);
            System.out.println("bundle factory 1:" + forStage2);
            ((EnvironmentFactory) Mockito.verify(this.envFactory)).createEnvironment(this.environment);
            Mockito.verifyNoMoreInteractions(new Object[]{this.envFactory});
            if (createDefaultJobBundleFactory != null) {
                $closeResource(null, createDefaultJobBundleFactory);
            }
        } catch (Throwable th) {
            if (createDefaultJobBundleFactory != null) {
                $closeResource(null, createDefaultJobBundleFactory);
            }
            throw th;
        }
    }

    @Test
    public void doesNotCacheDifferentEnvironments() throws Exception {
        RunnerApi.Environment build = RunnerApi.Environment.newBuilder().setUrn("dummy:urn:another").build();
        RemoteEnvironment remoteEnvironment = (RemoteEnvironment) Mockito.mock(RemoteEnvironment.class);
        InstructionRequestHandler instructionRequestHandler = (InstructionRequestHandler) Mockito.mock(InstructionRequestHandler.class);
        ImmutableMap of = ImmutableMap.of(this.environment.getUrn(), this.envFactoryProvider, build.getUrn(), this.envFactoryProvider);
        Mockito.when(this.envFactory.createEnvironment(build)).thenReturn(remoteEnvironment);
        Mockito.when(remoteEnvironment.getInstructionRequestHandler()).thenReturn(instructionRequestHandler);
        Mockito.when(instructionRequestHandler.handle((BeamFnApi.InstructionRequest) Matchers.any())).thenReturn(CompletableFuture.completedFuture(this.instructionResponse));
        DefaultJobBundleFactory createDefaultJobBundleFactory = createDefaultJobBundleFactory(of);
        Throwable th = null;
        try {
            try {
                createDefaultJobBundleFactory.forStage(getExecutableStage(this.environment));
                createDefaultJobBundleFactory.forStage(getExecutableStage(build));
                ((EnvironmentFactory) Mockito.verify(this.envFactory)).createEnvironment(this.environment);
                ((EnvironmentFactory) Mockito.verify(this.envFactory)).createEnvironment(build);
                Mockito.verifyNoMoreInteractions(new Object[]{this.envFactory});
                if (createDefaultJobBundleFactory != null) {
                    $closeResource(null, createDefaultJobBundleFactory);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createDefaultJobBundleFactory != null) {
                $closeResource(th, createDefaultJobBundleFactory);
            }
            throw th3;
        }
    }

    private DefaultJobBundleFactory createDefaultJobBundleFactory(Map<String, EnvironmentFactory.Provider> map) {
        return new DefaultJobBundleFactory(JobInfo.create("testJob", "testJob", "token", Struct.getDefaultInstance()), map, this.stageIdGenerator, this.serverInfo);
    }

    private static ExecutableStage getExecutableStage(RunnerApi.Environment environment) {
        return ExecutableStage.fromPayload(RunnerApi.ExecutableStagePayload.newBuilder().setInput("input-pc").setEnvironment(environment).setComponents(RunnerApi.Components.newBuilder().putPcollections("input-pc", RunnerApi.PCollection.newBuilder().setWindowingStrategyId("windowing-strategy").setCoderId("coder-id").build()).putWindowingStrategies("windowing-strategy", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("coder-id").build()).putCoders("coder-id", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.INTERVAL_WINDOW_CODER_URN).build()).build()).build()).build());
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
