/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.HeaderAccessor;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.LogWriter;
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.rules.ExternalResource;
import org.junit.rules.TestRule;

public class EmbeddedSdkHarness
extends ExternalResource
implements TestRule {
    private ExecutorService executor;
    private GrpcFnServer<GrpcLoggingService> loggingServer;
    private GrpcFnServer<GrpcDataService> dataServer;
    private GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private SdkHarnessClient client;

    public static EmbeddedSdkHarness create() {
        return new EmbeddedSdkHarness();
    }

    private EmbeddedSdkHarness() {
    }

    public SdkHarnessClient client() {
        return this.client;
    }

    public Endpoints.ApiServiceDescriptor dataEndpoint() {
        return this.dataServer.getApiServiceDescriptor();
    }

    protected void before() throws Exception {
        InProcessServerFactory serverFactory = InProcessServerFactory.create();
        this.executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
        MapControlClientPool clientPool = MapControlClientPool.create();
        FnApiControlClientPoolService clientPoolService = FnApiControlClientPoolService.offeringClientsToPool((ControlClientPool.Sink)clientPool.getSink(), (HeaderAccessor)GrpcContextHeaderAccessorProvider.getHeaderAccessor());
        this.loggingServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcLoggingService.forWriter((LogWriter)Slf4jLogWriter.getDefault()), (ServerFactory)serverFactory);
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcDataService.create((ExecutorService)this.executor, (OutboundObserverFactory)OutboundObserverFactory.serverDirect()), (ServerFactory)serverFactory);
        this.controlServer = GrpcFnServer.allocatePortAndCreateFor((FnService)clientPoolService, (ServerFactory)serverFactory);
        InstructionRequestHandler requestHandler = EmbeddedEnvironmentFactory.create((PipelineOptions)PipelineOptionsFactory.create(), this.loggingServer, this.controlServer, (ControlClientPool.Source)clientPool.getSource()).createEnvironment(RunnerApi.Environment.getDefaultInstance()).getInstructionRequestHandler();
        this.client = SdkHarnessClient.usingFnApiClient((InstructionRequestHandler)requestHandler, (FnDataService)((FnDataService)this.dataServer.getService()));
    }

    protected void after() {
        try (GrpcFnServer<GrpcLoggingService> logs = this.loggingServer;
             GrpcFnServer<GrpcDataService> data = this.dataServer;
             GrpcFnServer<FnApiControlClientPoolService> ctl = this.controlServer;
             SdkHarnessClient c = this.client;){
            this.executor.shutdownNow();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

