/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.AutoValue_DefaultJobBundleFactory_ServerInfo;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class DefaultJobBundleFactoryTest {
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    @Mock
    private EnvironmentFactory envFactory;
    @Mock
    private RemoteEnvironment remoteEnvironment;
    @Mock
    private InstructionRequestHandler instructionHandler;
    @Mock
    GrpcFnServer<FnApiControlClientPoolService> controlServer;
    @Mock
    GrpcFnServer<GrpcLoggingService> loggingServer;
    @Mock
    GrpcFnServer<ArtifactRetrievalService> retrievalServer;
    @Mock
    GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
    @Mock
    private GrpcFnServer<GrpcDataService> dataServer;
    @Mock
    private GrpcFnServer<GrpcStateService> stateServer;
    private final RunnerApi.Environment environment = RunnerApi.Environment.newBuilder().setUrn("dummy:urn").build();
    private final IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
    private final BeamFnApi.InstructionResponse instructionResponse = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("instruction-id").build();
    private final EnvironmentFactory.Provider envFactoryProvider = (controlServiceServer, loggingServiceServer, retrievalServiceServer, provisioningServiceServer, clientPool, idGenerator) -> this.envFactory;
    private final Map<String, EnvironmentFactory.Provider> envFactoryProviderMap = ImmutableMap.of((Object)this.environment.getUrn(), (Object)this.envFactoryProvider);
    private DefaultJobBundleFactory.ServerInfo serverInfo;

    @Before
    public void setUpMocks() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.envFactory.createEnvironment(this.environment)).thenReturn((Object)this.remoteEnvironment);
        Mockito.when((Object)this.remoteEnvironment.getInstructionRequestHandler()).thenReturn((Object)this.instructionHandler);
        Mockito.when((Object)this.instructionHandler.handle((BeamFnApi.InstructionRequest)Matchers.any())).thenReturn(CompletableFuture.completedFuture(this.instructionResponse));
        Mockito.when((Object)this.dataServer.getApiServiceDescriptor()).thenReturn((Object)Endpoints.ApiServiceDescriptor.getDefaultInstance());
        GrpcDataService dataService = (GrpcDataService)Mockito.mock(GrpcDataService.class);
        Mockito.when((Object)dataService.send((LogicalEndpoint)Matchers.any(), (Coder)Matchers.any())).thenReturn((Object)((CloseableFnDataReceiver)Mockito.mock(CloseableFnDataReceiver.class)));
        Mockito.when((Object)((GrpcDataService)this.dataServer.getService())).thenReturn((Object)dataService);
        Mockito.when((Object)this.stateServer.getApiServiceDescriptor()).thenReturn((Object)Endpoints.ApiServiceDescriptor.getDefaultInstance());
        GrpcStateService stateService = (GrpcStateService)Mockito.mock(GrpcStateService.class);
        Mockito.when((Object)stateService.registerForProcessBundleInstructionId((String)Matchers.any(), (StateRequestHandler)Matchers.any())).thenReturn((Object)((StateDelegator.Registration)Mockito.mock(StateDelegator.Registration.class)));
        Mockito.when((Object)((GrpcStateService)this.stateServer.getService())).thenReturn((Object)stateService);
        this.serverInfo = new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder().setControlServer(this.controlServer).setLoggingServer(this.loggingServer).setRetrievalServer(this.retrievalServer).setProvisioningServer(this.provisioningServer).setDataServer(this.dataServer).setStateServer(this.stateServer).build();
    }

    @Test
    public void createsCorrectEnvironment() throws Exception {
        try (DefaultJobBundleFactory bundleFactory = this.createDefaultJobBundleFactory(this.envFactoryProviderMap);){
            bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(this.environment));
            ((EnvironmentFactory)Mockito.verify((Object)this.envFactory)).createEnvironment(this.environment);
        }
    }

    @Test
    public void createsMultipleEnvironmentOfSingleType() throws Exception {
        ServerFactory serverFactory = ServerFactory.createDefault();
        RunnerApi.Environment environmentA = RunnerApi.Environment.newBuilder().setUrn("env:urn:a").setPayload(ByteString.copyFrom((byte[])new byte[1])).build();
        RunnerApi.Environment environmentAA = RunnerApi.Environment.newBuilder().setUrn("env:urn:a").setPayload(ByteString.copyFrom((byte[])new byte[2])).build();
        EnvironmentFactory envFactoryA = (EnvironmentFactory)Mockito.mock(EnvironmentFactory.class);
        Mockito.when((Object)envFactoryA.createEnvironment(environmentA)).thenReturn((Object)this.remoteEnvironment);
        Mockito.when((Object)envFactoryA.createEnvironment(environmentAA)).thenReturn((Object)this.remoteEnvironment);
        EnvironmentFactory.Provider environmentProviderFactoryA = (EnvironmentFactory.Provider)Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when((Object)environmentProviderFactoryA.createEnvironmentFactory((GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (ControlClientPool)Matchers.any(), (IdGenerator)Matchers.any())).thenReturn((Object)envFactoryA);
        Mockito.when((Object)environmentProviderFactoryA.getServerFactory()).thenReturn((Object)serverFactory);
        RunnerApi.Environment environmentB = RunnerApi.Environment.newBuilder().setUrn("env:urn:b").build();
        EnvironmentFactory envFactoryB = (EnvironmentFactory)Mockito.mock(EnvironmentFactory.class);
        Mockito.when((Object)envFactoryB.createEnvironment(environmentB)).thenReturn((Object)this.remoteEnvironment);
        EnvironmentFactory.Provider environmentProviderFactoryB = (EnvironmentFactory.Provider)Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when((Object)environmentProviderFactoryB.createEnvironmentFactory((GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (ControlClientPool)Matchers.any(), (IdGenerator)Matchers.any())).thenReturn((Object)envFactoryB);
        Mockito.when((Object)environmentProviderFactoryB.getServerFactory()).thenReturn((Object)serverFactory);
        ImmutableMap environmentFactoryProviderMap = ImmutableMap.of((Object)environmentA.getUrn(), (Object)environmentProviderFactoryA, (Object)environmentB.getUrn(), (Object)environmentProviderFactoryB);
        try (DefaultJobBundleFactory bundleFactory = this.createDefaultJobBundleFactory((Map<String, EnvironmentFactory.Provider>)environmentFactoryProviderMap);){
            bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(environmentA));
            ((EnvironmentFactory.Provider)Mockito.verify((Object)environmentProviderFactoryA, (VerificationMode)Mockito.times((int)1))).createEnvironmentFactory((GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (ControlClientPool)Matchers.any(), (IdGenerator)Matchers.any());
            ((EnvironmentFactory.Provider)Mockito.verify((Object)environmentProviderFactoryB, (VerificationMode)Mockito.times((int)0))).createEnvironmentFactory((GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (ControlClientPool)Matchers.any(), (IdGenerator)Matchers.any());
            ((EnvironmentFactory)Mockito.verify((Object)envFactoryA, (VerificationMode)Mockito.times((int)1))).createEnvironment(environmentA);
            ((EnvironmentFactory)Mockito.verify((Object)envFactoryA, (VerificationMode)Mockito.times((int)0))).createEnvironment(environmentAA);
            bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(environmentAA));
            ((EnvironmentFactory.Provider)Mockito.verify((Object)environmentProviderFactoryA, (VerificationMode)Mockito.times((int)2))).createEnvironmentFactory((GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (ControlClientPool)Matchers.any(), (IdGenerator)Matchers.any());
            ((EnvironmentFactory.Provider)Mockito.verify((Object)environmentProviderFactoryB, (VerificationMode)Mockito.times((int)0))).createEnvironmentFactory((GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (ControlClientPool)Matchers.any(), (IdGenerator)Matchers.any());
            ((EnvironmentFactory)Mockito.verify((Object)envFactoryA, (VerificationMode)Mockito.times((int)1))).createEnvironment(environmentA);
            ((EnvironmentFactory)Mockito.verify((Object)envFactoryA, (VerificationMode)Mockito.times((int)1))).createEnvironment(environmentAA);
        }
    }

    @Test
    public void creatingMultipleEnvironmentFromMultipleTypes() throws Exception {
        ServerFactory serverFactory = ServerFactory.createDefault();
        RunnerApi.Environment environmentA = RunnerApi.Environment.newBuilder().setUrn("env:urn:a").build();
        EnvironmentFactory envFactoryA = (EnvironmentFactory)Mockito.mock(EnvironmentFactory.class);
        Mockito.when((Object)envFactoryA.createEnvironment(environmentA)).thenReturn((Object)this.remoteEnvironment);
        EnvironmentFactory.Provider environmentProviderFactoryA = (EnvironmentFactory.Provider)Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when((Object)environmentProviderFactoryA.createEnvironmentFactory((GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (ControlClientPool)Matchers.any(), (IdGenerator)Matchers.any())).thenReturn((Object)envFactoryA);
        Mockito.when((Object)environmentProviderFactoryA.getServerFactory()).thenReturn((Object)serverFactory);
        RunnerApi.Environment environmentB = RunnerApi.Environment.newBuilder().setUrn("env:urn:b").build();
        EnvironmentFactory envFactoryB = (EnvironmentFactory)Mockito.mock(EnvironmentFactory.class);
        Mockito.when((Object)envFactoryB.createEnvironment(environmentB)).thenReturn((Object)this.remoteEnvironment);
        EnvironmentFactory.Provider environmentProviderFactoryB = (EnvironmentFactory.Provider)Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when((Object)environmentProviderFactoryB.createEnvironmentFactory((GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (ControlClientPool)Matchers.any(), (IdGenerator)Matchers.any())).thenReturn((Object)envFactoryB);
        Mockito.when((Object)environmentProviderFactoryB.getServerFactory()).thenReturn((Object)serverFactory);
        ImmutableMap environmentFactoryProviderMap = ImmutableMap.of((Object)environmentA.getUrn(), (Object)environmentProviderFactoryA, (Object)environmentB.getUrn(), (Object)environmentProviderFactoryB);
        try (DefaultJobBundleFactory bundleFactory = DefaultJobBundleFactory.create((JobInfo)JobInfo.create((String)"testJob", (String)"testJob", (String)"token", (Struct)Struct.getDefaultInstance()), (Map)environmentFactoryProviderMap);){
            bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(environmentB));
            bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(environmentA));
        }
        ((EnvironmentFactory)Mockito.verify((Object)envFactoryA)).createEnvironment(environmentA);
        ((EnvironmentFactory)Mockito.verify((Object)envFactoryB)).createEnvironment(environmentB);
    }

    @Test
    public void expiresEnvironment() throws Exception {
        ServerFactory serverFactory = ServerFactory.createDefault();
        RunnerApi.Environment environmentA = RunnerApi.Environment.newBuilder().setUrn("env:urn:a").build();
        EnvironmentFactory envFactoryA = (EnvironmentFactory)Mockito.mock(EnvironmentFactory.class);
        Mockito.when((Object)envFactoryA.createEnvironment(environmentA)).thenReturn((Object)this.remoteEnvironment);
        EnvironmentFactory.Provider environmentProviderFactoryA = (EnvironmentFactory.Provider)Mockito.mock(EnvironmentFactory.Provider.class);
        Mockito.when((Object)environmentProviderFactoryA.createEnvironmentFactory((GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (GrpcFnServer)Matchers.any(), (ControlClientPool)Matchers.any(), (IdGenerator)Matchers.any())).thenReturn((Object)envFactoryA);
        Mockito.when((Object)environmentProviderFactoryA.getServerFactory()).thenReturn((Object)serverFactory);
        ImmutableMap environmentFactoryProviderMap = ImmutableMap.of((Object)environmentA.getUrn(), (Object)environmentProviderFactoryA);
        PortablePipelineOptions portableOptions = (PortablePipelineOptions)PipelineOptionsFactory.as(PortablePipelineOptions.class);
        portableOptions.setEnvironmentExpirationMillis(1);
        Struct pipelineOptions = PipelineOptionsTranslation.toProto((PipelineOptions)portableOptions);
        try (DefaultJobBundleFactory bundleFactory = new DefaultJobBundleFactory(JobInfo.create((String)"testJob", (String)"testJob", (String)"token", (Struct)pipelineOptions), (Map)environmentFactoryProviderMap, this.stageIdGenerator, this.serverInfo);){
            OutputReceiverFactory orf = (OutputReceiverFactory)Mockito.mock(OutputReceiverFactory.class);
            StateRequestHandler srh = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
            StageBundleFactory sbf = bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(environmentA));
            Thread.sleep(10L);
            sbf.getBundle(orf, srh, BundleProgressHandler.ignored()).close();
            Thread.sleep(10L);
            sbf.getBundle(orf, srh, BundleProgressHandler.ignored()).close();
        }
        ((EnvironmentFactory)Mockito.verify((Object)envFactoryA, (VerificationMode)Mockito.times((int)3))).createEnvironment(environmentA);
        ((RemoteEnvironment)Mockito.verify((Object)this.remoteEnvironment, (VerificationMode)Mockito.times((int)3))).close();
    }

    @Test
    public void closesEnvironmentOnCleanup() throws Exception {
        try (DefaultJobBundleFactory bundleFactory = this.createDefaultJobBundleFactory(this.envFactoryProviderMap);){
            bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(this.environment));
        }
        ((RemoteEnvironment)Mockito.verify((Object)this.remoteEnvironment)).close();
    }

    @Test
    public void cachesEnvironment() throws Exception {
        try (DefaultJobBundleFactory bundleFactory = this.createDefaultJobBundleFactory(this.envFactoryProviderMap);){
            StageBundleFactory bf1 = bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(this.environment));
            StageBundleFactory bf2 = bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(this.environment));
            System.out.println("bundle factory 1:" + bf1);
            System.out.println("bundle factory 1:" + bf2);
            ((EnvironmentFactory)Mockito.verify((Object)this.envFactory)).createEnvironment(this.environment);
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.envFactory});
        }
    }

    @Test
    public void doesNotCacheDifferentEnvironments() throws Exception {
        RunnerApi.Environment envFoo = RunnerApi.Environment.newBuilder().setUrn("dummy:urn:another").build();
        RemoteEnvironment remoteEnvFoo = (RemoteEnvironment)Mockito.mock(RemoteEnvironment.class);
        InstructionRequestHandler fooInstructionHandler = (InstructionRequestHandler)Mockito.mock(InstructionRequestHandler.class);
        ImmutableMap envFactoryProviderMapFoo = ImmutableMap.of((Object)this.environment.getUrn(), (Object)this.envFactoryProvider, (Object)envFoo.getUrn(), (Object)this.envFactoryProvider);
        Mockito.when((Object)this.envFactory.createEnvironment(envFoo)).thenReturn((Object)remoteEnvFoo);
        Mockito.when((Object)remoteEnvFoo.getInstructionRequestHandler()).thenReturn((Object)fooInstructionHandler);
        Mockito.when((Object)fooInstructionHandler.handle((BeamFnApi.InstructionRequest)Matchers.any())).thenReturn(CompletableFuture.completedFuture(this.instructionResponse));
        try (DefaultJobBundleFactory bundleFactory = this.createDefaultJobBundleFactory((Map<String, EnvironmentFactory.Provider>)envFactoryProviderMapFoo);){
            bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(this.environment));
            bundleFactory.forStage(DefaultJobBundleFactoryTest.getExecutableStage(envFoo));
            ((EnvironmentFactory)Mockito.verify((Object)this.envFactory)).createEnvironment(this.environment);
            ((EnvironmentFactory)Mockito.verify((Object)this.envFactory)).createEnvironment(envFoo);
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.envFactory});
        }
    }

    private DefaultJobBundleFactory createDefaultJobBundleFactory(Map<String, EnvironmentFactory.Provider> envFactoryProviderMap) {
        return new DefaultJobBundleFactory(JobInfo.create((String)"testJob", (String)"testJob", (String)"token", (Struct)Struct.getDefaultInstance()), envFactoryProviderMap, this.stageIdGenerator, this.serverInfo);
    }

    private static ExecutableStage getExecutableStage(RunnerApi.Environment environment) {
        return ExecutableStage.fromPayload((RunnerApi.ExecutableStagePayload)RunnerApi.ExecutableStagePayload.newBuilder().setInput("input-pc").setEnvironment(environment).setComponents(RunnerApi.Components.newBuilder().putPcollections("input-pc", RunnerApi.PCollection.newBuilder().setWindowingStrategyId("windowing-strategy").setCoderId("coder-id").build()).putWindowingStrategies("windowing-strategy", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("coder-id").build()).putCoders("coder-id", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.INTERVAL_WINDOW_CODER_URN).build()).build()).build()).build());
    }
}

