package org.apache.beam.fn.harness;

import java.util.Collections;
import java.util.EnumMap;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.fn.harness.control.HarnessMonitoringInfosInstructionHandler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.fn.harness.status.BeamFnStatusClient;
import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.fn.channel.AddHarnessIdInterceptor;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/FnHarness.class */
public class FnHarness {
    private static final String HARNESS_ID = "HARNESS_ID";
    private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
    private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
    private static final String STATUS_API_SERVICE_DESCRIPTOR = "STATUS_API_SERVICE_DESCRIPTOR";
    private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
    private static final String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FnHarness.class);

    private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String str) throws TextFormat.ParseException {
        Endpoints.ApiServiceDescriptor.Builder newBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        TextFormat.merge(str, newBuilder);
        return newBuilder.build();
    }

    public static void main(String[] strArr) throws Exception {
        main((Function<String, String>) System::getenv);
    }

    @VisibleForTesting
    public static void main(Function<String, String> function) throws Exception {
        JvmInitializers.runOnStartup();
        System.out.format("SDK Fn Harness started%n", new Object[0]);
        System.out.format("Harness ID %s%n", function.apply(HARNESS_ID));
        System.out.format("Logging location %s%n", function.apply(LOGGING_API_SERVICE_DESCRIPTOR));
        System.out.format("Control location %s%n", function.apply(CONTROL_API_SERVICE_DESCRIPTOR));
        System.out.format("Status location %s%n", function.apply(STATUS_API_SERVICE_DESCRIPTOR));
        System.out.format("Pipeline options %s%n", function.apply(PIPELINE_OPTIONS));
        String apply = function.apply(HARNESS_ID);
        PipelineOptions fromJson = PipelineOptionsTranslation.fromJson(function.apply(PIPELINE_OPTIONS));
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = getApiServiceDescriptor(function.apply(LOGGING_API_SERVICE_DESCRIPTOR));
        Endpoints.ApiServiceDescriptor apiServiceDescriptor2 = getApiServiceDescriptor(function.apply(CONTROL_API_SERVICE_DESCRIPTOR));
        Endpoints.ApiServiceDescriptor apiServiceDescriptor3 = function.apply(STATUS_API_SERVICE_DESCRIPTOR) == null ? null : getApiServiceDescriptor(function.apply(STATUS_API_SERVICE_DESCRIPTOR));
        String apply2 = function.apply(RUNNER_CAPABILITIES);
        main(apply, fromJson, apply2 == null ? Collections.emptySet() : ImmutableSet.copyOf(apply2.split("\\s+")), apiServiceDescriptor, apiServiceDescriptor2, apiServiceDescriptor3);
    }

    public static void main(String str, PipelineOptions pipelineOptions, Set<String> set, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Endpoints.ApiServiceDescriptor apiServiceDescriptor2, @Nullable Endpoints.ApiServiceDescriptor apiServiceDescriptor3) throws Exception {
        main(str, pipelineOptions, set, apiServiceDescriptor, apiServiceDescriptor2, apiServiceDescriptor3, ExperimentalOptions.hasExperiment(pipelineOptions, "beam_fn_api_epoll") ? ManagedChannelFactory.createEpoll() : ManagedChannelFactory.createDefault(), HarnessStreamObserverFactories.fromOptions(pipelineOptions), Caches.fromOptions(pipelineOptions));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static void main(String str, PipelineOptions pipelineOptions, Set<String> set, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Endpoints.ApiServiceDescriptor apiServiceDescriptor2, Endpoints.ApiServiceDescriptor apiServiceDescriptor3, ManagedChannelFactory managedChannelFactory, OutboundObserverFactory outboundObserverFactory, final Cache<Object, Object> cache) throws Exception {
        ManagedChannelFactory withInterceptors = managedChannelFactory.withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(str)));
        IdGenerator decrementingLongs = IdGenerators.decrementingLongs();
        ShortIdMap shortIdMap = new ShortIdMap();
        ExecutorService executorService = ((GcsOptions) pipelineOptions.as(GcsOptions.class)).getExecutorService();
        ExecutionStateSampler executionStateSampler = new ExecutionStateSampler(pipelineOptions, System::currentTimeMillis);
        try {
            Objects.requireNonNull(withInterceptors);
            BeamFnLoggingClient beamFnLoggingClient = new BeamFnLoggingClient(pipelineOptions, apiServiceDescriptor, withInterceptors::forDescriptor);
            Throwable th = null;
            try {
                try {
                    LOG.info("Fn Harness started");
                    FileSystems.setDefaultPipelineOptions(pipelineOptions);
                    EnumMap enumMap = new EnumMap(BeamFnApi.InstructionRequest.RequestCase.class);
                    ManagedChannel forDescriptor = withInterceptors.forDescriptor(apiServiceDescriptor2);
                    BeamFnControlGrpc.BeamFnControlStub newStub = BeamFnControlGrpc.newStub(forDescriptor);
                    final BeamFnControlGrpc.BeamFnControlBlockingStub newBlockingStub = BeamFnControlGrpc.newBlockingStub(forDescriptor);
                    Objects.requireNonNull(withInterceptors);
                    BeamFnDataGrpcClient beamFnDataGrpcClient = new BeamFnDataGrpcClient(pipelineOptions, withInterceptors::forDescriptor, outboundObserverFactory);
                    BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = new BeamFnStateGrpcClientCache(decrementingLongs, withInterceptors, outboundObserverFactory);
                    FinalizeBundleHandler finalizeBundleHandler = new FinalizeBundleHandler(((GcsOptions) pipelineOptions.as(GcsOptions.class)).getExecutorService());
                    Function<String, BeamFnApi.ProcessBundleDescriptor> function = new Function<String, BeamFnApi.ProcessBundleDescriptor>() { // from class: org.apache.beam.fn.harness.FnHarness.1
                        private static final String PROCESS_BUNDLE_DESCRIPTORS = "ProcessBundleDescriptors";
                        private final Cache<String, BeamFnApi.ProcessBundleDescriptor> cache;

                        {
                            this.cache = Caches.subCache(Cache.this, PROCESS_BUNDLE_DESCRIPTORS, new Object[0]);
                        }

                        @Override // java.util.function.Function
                        public BeamFnApi.ProcessBundleDescriptor apply(String str2) {
                            return this.cache.computeIfAbsent(str2, this::loadDescriptor);
                        }

                        private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String str2) {
                            return newBlockingStub.getProcessBundleDescriptor(BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder().setProcessBundleDescriptorId(str2).build());
                        }
                    };
                    MetricsEnvironment.setProcessWideContainer(MetricsContainerImpl.createProcessWideContainer());
                    ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(pipelineOptions, set, function, beamFnDataGrpcClient, beamFnStateGrpcClientCache, finalizeBundleHandler, shortIdMap, executionStateSampler, cache);
                    beamFnLoggingClient.setProcessBundleHandler(processBundleHandler);
                    BeamFnStatusClient beamFnStatusClient = null;
                    if (apiServiceDescriptor3 != null) {
                        Objects.requireNonNull(withInterceptors);
                        beamFnStatusClient = new BeamFnStatusClient(apiServiceDescriptor3, withInterceptors::forDescriptor, processBundleHandler.getBundleProcessorCache(), pipelineOptions, cache);
                    }
                    enumMap.put((EnumMap) BeamFnApi.InstructionRequest.RequestCase.REGISTER, (BeamFnApi.InstructionRequest.RequestCase) instructionRequest -> {
                        return BeamFnApi.InstructionResponse.newBuilder().setRegister(BeamFnApi.RegisterResponse.getDefaultInstance());
                    });
                    BeamFnApi.InstructionRequest.RequestCase requestCase = BeamFnApi.InstructionRequest.RequestCase.FINALIZE_BUNDLE;
                    Objects.requireNonNull(finalizeBundleHandler);
                    enumMap.put((EnumMap) requestCase, (BeamFnApi.InstructionRequest.RequestCase) finalizeBundleHandler::finalizeBundle);
                    BeamFnApi.InstructionRequest.RequestCase requestCase2 = BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE;
                    Objects.requireNonNull(processBundleHandler);
                    enumMap.put((EnumMap) requestCase2, (BeamFnApi.InstructionRequest.RequestCase) processBundleHandler::processBundle);
                    BeamFnApi.InstructionRequest.RequestCase requestCase3 = BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE_PROGRESS;
                    Objects.requireNonNull(processBundleHandler);
                    enumMap.put((EnumMap) requestCase3, (BeamFnApi.InstructionRequest.RequestCase) processBundleHandler::progress);
                    BeamFnApi.InstructionRequest.RequestCase requestCase4 = BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE_SPLIT;
                    Objects.requireNonNull(processBundleHandler);
                    enumMap.put((EnumMap) requestCase4, (BeamFnApi.InstructionRequest.RequestCase) processBundleHandler::trySplit);
                    enumMap.put((EnumMap) BeamFnApi.InstructionRequest.RequestCase.MONITORING_INFOS, (BeamFnApi.InstructionRequest.RequestCase) instructionRequest2 -> {
                        return BeamFnApi.InstructionResponse.newBuilder().setMonitoringInfos(BeamFnApi.MonitoringInfosMetadataResponse.newBuilder().putAllMonitoringInfo(shortIdMap.get(instructionRequest2.getMonitoringInfos().getMonitoringInfoIdList())));
                    });
                    HarnessMonitoringInfosInstructionHandler harnessMonitoringInfosInstructionHandler = new HarnessMonitoringInfosInstructionHandler(shortIdMap);
                    BeamFnApi.InstructionRequest.RequestCase requestCase5 = BeamFnApi.InstructionRequest.RequestCase.HARNESS_MONITORING_INFOS;
                    Objects.requireNonNull(harnessMonitoringInfosInstructionHandler);
                    enumMap.put((EnumMap) requestCase5, (BeamFnApi.InstructionRequest.RequestCase) harnessMonitoringInfosInstructionHandler::harnessMonitoringInfos);
                    JvmInitializers.runBeforeProcessing(pipelineOptions);
                    LOG.info("Entering instruction processing loop");
                    new BeamFnControlClient((BeamFnControlGrpc.BeamFnControlStub) newStub.withExecutor(MoreExecutors.directExecutor()), outboundObserverFactory, executorService, enumMap).waitForTermination();
                    if (beamFnStatusClient != null) {
                        beamFnStatusClient.close();
                    }
                    processBundleHandler.shutdown();
                    if (0 != 0) {
                        try {
                            beamFnLoggingClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        beamFnLoggingClient.close();
                    }
                    System.out.println("Shutting SDK harness down.");
                    executionStateSampler.stop();
                    executorService.shutdown();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            System.out.println("Shutting SDK harness down.");
            executionStateSampler.stop();
            executorService.shutdown();
            throw th3;
        }
    }
}
