package org.apache.beam.repackaged.direct_java.runners.fnexecution.environment;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.repackaged.direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.direct_java.sdk.fn.server.GrpcFnServer;
import org.apache.beam.repackaged.direct_java.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.repackaged.direct_java.sdk.fn.server.ServerFactory;
import org.apache.beam.repackaged.direct_java.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.repackaged.direct_java.sdk.fn.test.InProcessManagedChannelFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/fnexecution/environment/EmbeddedEnvironmentFactory.class */
public class EmbeddedEnvironmentFactory implements EnvironmentFactory {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedEnvironmentFactory.class);
    private final PipelineOptions options;
    private final GrpcFnServer<GrpcLoggingService> loggingServer;
    private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private final ControlClientPool.Source clientSource;

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/fnexecution/environment/EmbeddedEnvironmentFactory$Provider.class */
    public static class Provider implements EnvironmentFactory.Provider {
        private final PipelineOptions pipelineOptions;

        public Provider(PipelineOptions pipelineOptions) {
            this.pipelineOptions = pipelineOptions;
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EnvironmentFactory.Provider
        public EnvironmentFactory createEnvironmentFactory(GrpcFnServer<FnApiControlClientPoolService> grpcFnServer, GrpcFnServer<GrpcLoggingService> grpcFnServer2, GrpcFnServer<ArtifactRetrievalService> grpcFnServer3, GrpcFnServer<StaticGrpcProvisionService> grpcFnServer4, ControlClientPool controlClientPool, IdGenerator idGenerator) {
            return EmbeddedEnvironmentFactory.create(this.pipelineOptions, grpcFnServer2, grpcFnServer, controlClientPool.getSource());
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EnvironmentFactory.Provider
        public ServerFactory getServerFactory() {
            return InProcessServerFactory.create();
        }
    }

    public static EnvironmentFactory create(PipelineOptions pipelineOptions, GrpcFnServer<GrpcLoggingService> grpcFnServer, GrpcFnServer<FnApiControlClientPoolService> grpcFnServer2, ControlClientPool.Source source) {
        return new EmbeddedEnvironmentFactory(pipelineOptions, grpcFnServer, grpcFnServer2, source);
    }

    private EmbeddedEnvironmentFactory(PipelineOptions pipelineOptions, GrpcFnServer<GrpcLoggingService> grpcFnServer, GrpcFnServer<FnApiControlClientPoolService> grpcFnServer2, ControlClientPool.Source source) {
        this.options = pipelineOptions;
        this.loggingServer = grpcFnServer;
        this.controlServer = grpcFnServer2;
        Preconditions.checkArgument(grpcFnServer.getApiServiceDescriptor() != null, "Logging Server cannot have a null %s", Endpoints.ApiServiceDescriptor.class.getSimpleName());
        Preconditions.checkArgument(grpcFnServer2.getApiServiceDescriptor() != null, "Control Server cannot have a null %s", Endpoints.ApiServiceDescriptor.class.getSimpleName());
        this.clientSource = source;
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EnvironmentFactory
    public RemoteEnvironment createEnvironment(RunnerApi.Environment environment, String str) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Future submit = newSingleThreadExecutor.submit(() -> {
            try {
                FnHarness.main(str, this.options, Collections.emptySet(), this.loggingServer.getApiServiceDescriptor(), this.controlServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor) null, InProcessManagedChannelFactory.create(), OutboundObserverFactory.clientDirect(), Caches.fromOptions(this.options));
                return null;
            } catch (NoClassDefFoundError e) {
                LOG.error("{} while executing an in-process FnHarness. To use the {}, the 'org.apache.beam:beam-sdks-java-harness' artifact and its dependencies must be on the classpath", new Object[]{NoClassDefFoundError.class.getSimpleName(), EmbeddedEnvironmentFactory.class.getSimpleName(), e});
                throw e;
            }
        });
        newSingleThreadExecutor.submit(() -> {
            try {
                submit.get();
            } catch (Throwable th) {
                newSingleThreadExecutor.shutdownNow();
            }
        });
        InstructionRequestHandler instructionRequestHandler = null;
        while (instructionRequestHandler == null) {
            try {
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (TimeoutException e2) {
                LOG.info("Still waiting for startup of FnHarness");
            }
            if (newSingleThreadExecutor.isShutdown()) {
                throw new IllegalStateException("FnHarness startup failed");
                break;
            }
            instructionRequestHandler = this.clientSource.take(str, Duration.ofSeconds(5L));
        }
        return RemoteEnvironment.forHandler(environment, instructionRequestHandler);
    }
}
