package org.apache.beam.fn.harness;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.TextFormat;
import java.io.PrintStream;
import java.util.EnumMap;
import java.util.function.Function;
import org.apache.beam.fn.harness.channel.ManagedChannelFactory;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.control.RegisterHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.stream.StreamObserverFactory;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/FnHarness.class */
public class FnHarness {
    private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
    private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
    private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
    private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);

    private static BeamFnApi.ApiServiceDescriptor getApiServiceDescriptor(String str) throws TextFormat.ParseException {
        BeamFnApi.ApiServiceDescriptor.Builder newBuilder = BeamFnApi.ApiServiceDescriptor.newBuilder();
        TextFormat.merge(System.getenv(str), newBuilder);
        return newBuilder.build();
    }

    public static void main(String[] strArr) throws Exception {
        System.out.format("SDK Fn Harness started%n", new Object[0]);
        System.out.format("Logging location %s%n", System.getenv(LOGGING_API_SERVICE_DESCRIPTOR));
        System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR));
        System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS));
        main((PipelineOptions) new ObjectMapper().registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())).readValue(System.getenv(PIPELINE_OPTIONS), PipelineOptions.class), getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR), getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR));
    }

    public static void main(PipelineOptions pipelineOptions, BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, BeamFnApi.ApiServiceDescriptor apiServiceDescriptor2) throws Exception {
        ManagedChannelFactory from = ManagedChannelFactory.from(pipelineOptions);
        StreamObserverFactory fromOptions = StreamObserverFactory.fromOptions(pipelineOptions);
        PrintStream printStream = System.err;
        try {
            try {
                from.getClass();
                Function function = from::forDescriptor;
                fromOptions.getClass();
                BeamFnLoggingClient beamFnLoggingClient = new BeamFnLoggingClient(pipelineOptions, apiServiceDescriptor, function, fromOptions::from);
                Throwable th = null;
                try {
                    try {
                        LOG.info("Fn Harness started");
                        EnumMap enumMap = new EnumMap(BeamFnApi.InstructionRequest.RequestCase.class);
                        RegisterHandler registerHandler = new RegisterHandler();
                        from.getClass();
                        Function function2 = from::forDescriptor;
                        fromOptions.getClass();
                        BeamFnDataGrpcClient beamFnDataGrpcClient = new BeamFnDataGrpcClient(pipelineOptions, function2, fromOptions::from);
                        registerHandler.getClass();
                        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(pipelineOptions, registerHandler::getById, beamFnDataGrpcClient);
                        BeamFnApi.InstructionRequest.RequestCase requestCase = BeamFnApi.InstructionRequest.RequestCase.REGISTER;
                        registerHandler.getClass();
                        enumMap.put((EnumMap) requestCase, (BeamFnApi.InstructionRequest.RequestCase) registerHandler::register);
                        BeamFnApi.InstructionRequest.RequestCase requestCase2 = BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE;
                        processBundleHandler.getClass();
                        enumMap.put((EnumMap) requestCase2, (BeamFnApi.InstructionRequest.RequestCase) processBundleHandler::processBundle);
                        from.getClass();
                        Function function3 = from::forDescriptor;
                        fromOptions.getClass();
                        BeamFnControlClient beamFnControlClient = new BeamFnControlClient(apiServiceDescriptor2, function3, fromOptions::from, enumMap);
                        LOG.info("Entering instruction processing loop");
                        beamFnControlClient.processInstructionRequests(pipelineOptions.as(GcsOptions.class).getExecutorService());
                        if (beamFnLoggingClient != null) {
                            if (0 != 0) {
                                try {
                                    beamFnLoggingClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                beamFnLoggingClient.close();
                            }
                        }
                        printStream.println("Shutting SDK harness down.");
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (beamFnLoggingClient != null) {
                        if (th != null) {
                            try {
                                beamFnLoggingClient.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            beamFnLoggingClient.close();
                        }
                    }
                    throw th4;
                }
            } catch (Throwable th6) {
                printStream.println("Shutting SDK harness down.");
                throw th6;
            }
        } catch (Throwable th7) {
            th7.printStackTrace(printStream);
            printStream.println("Shutting SDK harness down.");
        }
    }
}
