package org.apache.beam.fn.harness.control;

import com.google.auto.value.AutoValue;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeSet;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataReadRunner;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.BeamFnTimerClient;
import org.apache.beam.fn.harness.data.BeamFnTimerGrpcClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.data.QueueingBeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SetMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler.class */
public class ProcessBundleHandler {
    private static final String DATA_INPUT_URN = "beam:runner:source:v1";
    private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";
    public static final String JAVA_SOURCE_URN = "beam:source:java:0.1";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ProcessBundleHandler.class);

    @VisibleForTesting
    static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES;
    private final PipelineOptions options;
    private final Function<String, Message> fnApiRegistry;
    private final BeamFnDataClient beamFnDataClient;
    private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
    private final FinalizeBundleHandler finalizeBundleHandler;
    private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
    private final PTransformRunnerFactory defaultPTransformRunnerFactory;

    @VisibleForTesting
    final BundleProcessorCache bundleProcessorCache;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$BlockTillStateCallsFinish.class */
    public static class BlockTillStateCallsFinish extends HandleStateCallsForBundle {
        private final BeamFnStateClient beamFnStateClient;
        private final Phaser phaser;
        private int currentPhase;

        private BlockTillStateCallsFinish(BeamFnStateClient beamFnStateClient) {
            this.beamFnStateClient = beamFnStateClient;
            this.phaser = new Phaser(1);
            this.currentPhase = this.phaser.getPhase();
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            int unarrivedParties = this.phaser.getUnarrivedParties();
            if (unarrivedParties > 0) {
                ProcessBundleHandler.LOG.debug("Waiting for {} parties to arrive before closing, current phase {}.", Integer.valueOf(unarrivedParties), Integer.valueOf(this.currentPhase));
            }
            this.currentPhase = this.phaser.arriveAndAwaitAdvance();
        }

        @Override // org.apache.beam.fn.harness.state.BeamFnStateClient
        public void handle(BeamFnApi.StateRequest.Builder builder, CompletableFuture<BeamFnApi.StateResponse> completableFuture) {
            this.phaser.register();
            completableFuture.whenComplete((stateResponse, th) -> {
                this.phaser.arriveAndDeregister();
            });
            this.beamFnStateClient.handle(builder, completableFuture);
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$BundleProcessor.class */
    public static abstract class BundleProcessor {
        private String instructionId;

        public static BundleProcessor create(PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, List<ThrowingRunnable> list, List<ThrowingRunnable> list2, List<PTransformRunnerFactory.ProgressRequestCallback> list3, BundleSplitListener.InMemory inMemory, PCollectionConsumerRegistry pCollectionConsumerRegistry, MetricsContainerStepMap metricsContainerStepMap, ExecutionStateTracker executionStateTracker, HandleStateCallsForBundle handleStateCallsForBundle, QueueingBeamFnDataClient queueingBeamFnDataClient, Collection<FinalizeBundleHandler.CallbackRegistration> collection) {
            return new AutoValue_ProcessBundleHandler_BundleProcessor(pTransformFunctionRegistry, pTransformFunctionRegistry2, list, list2, list3, inMemory, pCollectionConsumerRegistry, metricsContainerStepMap, executionStateTracker, handleStateCallsForBundle, queueingBeamFnDataClient, collection, new ArrayList());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract PTransformFunctionRegistry getStartFunctionRegistry();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract PTransformFunctionRegistry getFinishFunctionRegistry();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<ThrowingRunnable> getResetFunctions();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<ThrowingRunnable> getTearDownFunctions();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<PTransformRunnerFactory.ProgressRequestCallback> getProgressRequestCallbacks();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract BundleSplitListener.InMemory getSplitListener();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract PCollectionConsumerRegistry getpCollectionConsumerRegistry();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract MetricsContainerStepMap getMetricsContainerRegistry();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ExecutionStateTracker getStateTracker();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract HandleStateCallsForBundle getBeamFnStateClient();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract QueueingBeamFnDataClient getQueueingClient();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Collection<FinalizeBundleHandler.CallbackRegistration> getBundleFinalizationCallbackRegistrations();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Collection<BeamFnDataReadRunner> getChannelRoots();

        String getInstructionId() {
            return this.instructionId;
        }

        void setInstructionId(String str) {
            this.instructionId = str;
        }

        void reset() throws Exception {
            getStartFunctionRegistry().reset();
            getFinishFunctionRegistry().reset();
            getSplitListener().clear();
            getpCollectionConsumerRegistry().reset();
            getMetricsContainerRegistry().reset();
            getStateTracker().reset();
            ExecutionStateSampler.instance().reset();
            getBundleFinalizationCallbackRegistrations().clear();
            Iterator<ThrowingRunnable> it = getResetFunctions().iterator();
            while (it.hasNext()) {
                it.next().run();
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$BundleProcessorCache.class */
    public static class BundleProcessorCache {
        private final Map<String, ConcurrentLinkedQueue<BundleProcessor>> cachedBundleProcessors = Maps.newConcurrentMap();
        private final Map<String, BundleProcessor> activeBundleProcessors = Collections.synchronizedMap(new WeakHashMap());

        @Pure
        public int hashCode() {
            return super.hashCode();
        }

        BundleProcessorCache() {
        }

        Map<String, ConcurrentLinkedQueue<BundleProcessor>> getCachedBundleProcessors() {
            return this.cachedBundleProcessors;
        }

        BundleProcessor get(String str, String str2, Supplier<BundleProcessor> supplier) {
            BundleProcessor poll = this.cachedBundleProcessors.computeIfAbsent(str, str3 -> {
                return new ConcurrentLinkedQueue();
            }).poll();
            if (poll == null) {
                poll = supplier.get();
            }
            poll.setInstructionId(str2);
            this.activeBundleProcessors.put(str2, poll);
            return poll;
        }

        BundleProcessor find(String str) {
            return this.activeBundleProcessors.get(str);
        }

        void release(String str, BundleProcessor bundleProcessor) {
            this.activeBundleProcessors.remove(bundleProcessor.getInstructionId());
            try {
                bundleProcessor.reset();
                this.cachedBundleProcessors.get(str).add(bundleProcessor);
            } catch (Exception e) {
                ProcessBundleHandler.LOG.warn("Was unable to reset bundle processor safely. Bundle processor will be discarded and re-instantiated on next bundle for descriptor {}.", str, e);
            }
        }

        void shutdown() throws Exception {
            Iterator<ConcurrentLinkedQueue<BundleProcessor>> it = this.cachedBundleProcessors.values().iterator();
            while (it.hasNext()) {
                Iterator<BundleProcessor> it2 = it.next().iterator();
                while (it2.hasNext()) {
                    for (ThrowingRunnable throwingRunnable : it2.next().getTearDownFunctions()) {
                        ProcessBundleHandler.LOG.debug("Tearing down function {}", throwingRunnable);
                        throwingRunnable.run();
                    }
                }
            }
            this.cachedBundleProcessors.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$FailAllStateCallsForBundle.class */
    public static class FailAllStateCallsForBundle extends HandleStateCallsForBundle {
        private final BeamFnApi.ProcessBundleRequest request;

        private FailAllStateCallsForBundle(BeamFnApi.ProcessBundleRequest processBundleRequest) {
            this.request = processBundleRequest;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
        }

        @Override // org.apache.beam.fn.harness.state.BeamFnStateClient
        public void handle(BeamFnApi.StateRequest.Builder builder, CompletableFuture<BeamFnApi.StateResponse> completableFuture) {
            throw new IllegalStateException(String.format("State API calls are unsupported because the ProcessBundleRequest %s does not support state.", this.request));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$FailAllTimerRegistrations.class */
    public static class FailAllTimerRegistrations implements BeamFnTimerClient {
        private final BeamFnApi.ProcessBundleRequest request;

        private FailAllTimerRegistrations(BeamFnApi.ProcessBundleRequest processBundleRequest) {
            this.request = processBundleRequest;
        }

        @Override // org.apache.beam.fn.harness.data.BeamFnTimerClient
        public <T> BeamFnTimerClient.TimerHandler<T> register(LogicalEndpoint logicalEndpoint, Coder<Timer<T>> coder, FnDataReceiver<Timer<T>> fnDataReceiver) {
            throw new IllegalStateException(String.format("Timers are unsupported because the ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", this.request));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$HandleStateCallsForBundle.class */
    public static abstract class HandleStateCallsForBundle implements AutoCloseable, BeamFnStateClient {
        HandleStateCallsForBundle() {
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$UnknownPTransformRunnerFactory.class */
    private static class UnknownPTransformRunnerFactory implements PTransformRunnerFactory<Object> {
        private final Set<String> knownUrns;

        private UnknownPTransformRunnerFactory(Set<String> set) {
            this.knownUrns = set;
        }

        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory
        public Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, BeamFnTimerClient beamFnTimerClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2, Consumer<PTransformRunnerFactory.ProgressRequestCallback> consumer3, BundleSplitListener bundleSplitListener, DoFn.BundleFinalizer bundleFinalizer) {
            String format = String.format("No factory registered for %s, known factories %s", pTransform.getSpec().getUrn(), this.knownUrns);
            ProcessBundleHandler.LOG.error(format);
            throw new IllegalStateException(format);
        }
    }

    public ProcessBundleHandler(PipelineOptions pipelineOptions, Function<String, Message> function, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, FinalizeBundleHandler finalizeBundleHandler) {
        this(pipelineOptions, function, beamFnDataClient, beamFnStateGrpcClientCache, finalizeBundleHandler, REGISTERED_RUNNER_FACTORIES, new BundleProcessorCache());
    }

    @VisibleForTesting
    ProcessBundleHandler(PipelineOptions pipelineOptions, Function<String, Message> function, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, FinalizeBundleHandler finalizeBundleHandler, Map<String, PTransformRunnerFactory> map, BundleProcessorCache bundleProcessorCache) {
        this.options = pipelineOptions;
        this.fnApiRegistry = function;
        this.beamFnDataClient = beamFnDataClient;
        this.beamFnStateGrpcClientCache = beamFnStateGrpcClientCache;
        this.finalizeBundleHandler = finalizeBundleHandler;
        this.urnToPTransformRunnerFactoryMap = map;
        this.defaultPTransformRunnerFactory = new UnknownPTransformRunnerFactory(map.keySet());
        this.bundleProcessorCache = bundleProcessorCache;
    }

    private void createRunnerAndConsumersForPTransformRecursively(BeamFnStateClient beamFnStateClient, BeamFnTimerClient beamFnTimerClient, BeamFnDataClient beamFnDataClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, SetMultimap<String, String> setMultimap, PCollectionConsumerRegistry pCollectionConsumerRegistry, Set<String> set, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2, Consumer<PTransformRunnerFactory.ProgressRequestCallback> consumer3, BundleSplitListener bundleSplitListener, DoFn.BundleFinalizer bundleFinalizer, Collection<BeamFnDataReadRunner> collection) throws IOException {
        Iterator<String> it = pTransform.getOutputsMap().values().iterator();
        while (it.hasNext()) {
            for (String str2 : setMultimap.get((SetMultimap<String, String>) it.next())) {
                createRunnerAndConsumersForPTransformRecursively(beamFnStateClient, beamFnTimerClient, beamFnDataClient, str2, processBundleDescriptor.getTransformsMap().get(str2), supplier, processBundleDescriptor, setMultimap, pCollectionConsumerRegistry, set, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, consumer3, bundleSplitListener, bundleFinalizer, collection);
            }
        }
        if (!pTransform.hasSpec()) {
            throw new IllegalArgumentException(String.format("Cannot process transform with no spec: %s", TextFormat.printToString(pTransform)));
        }
        if (pTransform.getSubtransformsCount() > 0) {
            throw new IllegalArgumentException(String.format("Cannot process composite transform: %s", TextFormat.printToString(pTransform)));
        }
        if (set.contains(str)) {
            return;
        }
        Object createRunnerForPTransform = this.urnToPTransformRunnerFactoryMap.getOrDefault(pTransform.getSpec().getUrn(), this.defaultPTransformRunnerFactory).createRunnerForPTransform(this.options, beamFnDataClient, beamFnStateClient, beamFnTimerClient, str, pTransform, supplier, processBundleDescriptor.getPcollectionsMap(), processBundleDescriptor.getCodersMap(), processBundleDescriptor.getWindowingStrategiesMap(), pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, consumer3, bundleSplitListener, bundleFinalizer);
        if (createRunnerForPTransform instanceof BeamFnDataReadRunner) {
            collection.add((BeamFnDataReadRunner) createRunnerForPTransform);
        }
        set.add(str);
    }

    public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest instructionRequest) throws Exception {
        BeamFnApi.ProcessBundleResponse.Builder newBuilder = BeamFnApi.ProcessBundleResponse.newBuilder();
        BundleProcessor bundleProcessor = this.bundleProcessorCache.get(instructionRequest.getProcessBundle().getProcessBundleDescriptorId(), instructionRequest.getInstructionId(), () -> {
            try {
                return createBundleProcessor(instructionRequest.getProcessBundle().getProcessBundleDescriptorId(), instructionRequest.getProcessBundle());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        PTransformFunctionRegistry startFunctionRegistry = bundleProcessor.getStartFunctionRegistry();
        PTransformFunctionRegistry finishFunctionRegistry = bundleProcessor.getFinishFunctionRegistry();
        ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
        QueueingBeamFnDataClient queueingClient = bundleProcessor.getQueueingClient();
        HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient();
        try {
            Closeable activate = stateTracker.activate();
            Throwable th = null;
            try {
                try {
                    for (ThrowingRunnable throwingRunnable : startFunctionRegistry.getFunctions()) {
                        LOG.debug("Starting function {}", throwingRunnable);
                        throwingRunnable.run();
                    }
                    queueingClient.drainAndBlock();
                    for (ThrowingRunnable throwingRunnable2 : Lists.reverse(finishFunctionRegistry.getFunctions())) {
                        LOG.debug("Finishing function {}", throwingRunnable2);
                        throwingRunnable2.run();
                    }
                    if (activate != null) {
                        $closeResource(null, activate);
                    }
                    newBuilder.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
                    newBuilder.addAllMonitoringInfos(bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringInfos());
                    newBuilder.addAllMonitoringInfos(bundleProcessor.getpCollectionConsumerRegistry().getExecutionTimeMonitoringInfos());
                    newBuilder.addAllMonitoringInfos(bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
                    newBuilder.addAllMonitoringInfos(bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos());
                    Iterator<PTransformRunnerFactory.ProgressRequestCallback> it = bundleProcessor.getProgressRequestCallbacks().iterator();
                    while (it.hasNext()) {
                        newBuilder.addAllMonitoringInfos(it.next().getMonitoringInfos());
                    }
                    if (!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
                        this.finalizeBundleHandler.registerCallbacks(bundleProcessor.getInstructionId(), ImmutableList.copyOf((Collection) bundleProcessor.getBundleFinalizationCallbackRegistrations()));
                        newBuilder.setRequiresFinalization(true);
                    }
                    this.bundleProcessorCache.release(instructionRequest.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
                    if (beamFnStateClient != null) {
                        $closeResource(null, beamFnStateClient);
                    }
                    return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(newBuilder);
                } finally {
                }
            } catch (Throwable th2) {
                if (activate != null) {
                    $closeResource(th, activate);
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (beamFnStateClient != null) {
                $closeResource(null, beamFnStateClient);
            }
            throw th3;
        }
    }

    public BeamFnApi.InstructionResponse.Builder progress(BeamFnApi.InstructionRequest instructionRequest) throws Exception {
        BundleProcessor find = this.bundleProcessorCache.find(instructionRequest.getProcessBundleProgress().getInstructionId());
        BeamFnApi.ProcessBundleProgressResponse.Builder newBuilder = BeamFnApi.ProcessBundleProgressResponse.newBuilder();
        if (find == null) {
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance());
        }
        newBuilder.addAllMonitoringInfos(find.getStartFunctionRegistry().getExecutionTimeMonitoringInfos());
        newBuilder.addAllMonitoringInfos(find.getpCollectionConsumerRegistry().getExecutionTimeMonitoringInfos());
        newBuilder.addAllMonitoringInfos(find.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
        newBuilder.addAllMonitoringInfos(find.getMetricsContainerRegistry().getMonitoringInfos());
        Iterator<PTransformRunnerFactory.ProgressRequestCallback> it = find.getProgressRequestCallbacks().iterator();
        while (it.hasNext()) {
            newBuilder.addAllMonitoringInfos(it.next().getMonitoringInfos());
        }
        return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(newBuilder);
    }

    public BeamFnApi.InstructionResponse.Builder trySplit(BeamFnApi.InstructionRequest instructionRequest) {
        BundleProcessor find = this.bundleProcessorCache.find(instructionRequest.getProcessBundleSplit().getInstructionId());
        BeamFnApi.ProcessBundleSplitResponse.Builder newBuilder = BeamFnApi.ProcessBundleSplitResponse.newBuilder();
        if (find == null) {
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleSplit(BeamFnApi.ProcessBundleSplitResponse.getDefaultInstance());
        }
        Iterator<BeamFnDataReadRunner> it = find.getChannelRoots().iterator();
        while (it.hasNext()) {
            it.next().trySplit(instructionRequest.getProcessBundleSplit(), newBuilder);
        }
        return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleSplit(newBuilder);
    }

    public void shutdown() throws Exception {
        this.bundleProcessorCache.shutdown();
    }

    private BundleProcessor createBundleProcessor(String str, BeamFnApi.ProcessBundleRequest processBundleRequest) throws IOException {
        QueueingBeamFnDataClient queueingBeamFnDataClient = new QueueingBeamFnDataClient(this.beamFnDataClient);
        BeamFnApi.ProcessBundleDescriptor processBundleDescriptor = (BeamFnApi.ProcessBundleDescriptor) this.fnApiRegistry.apply(str);
        SetMultimap<String, String> create = HashMultimap.create();
        MetricsContainerStepMap metricsContainerStepMap = new MetricsContainerStepMap();
        ExecutionStateTracker executionStateTracker = new ExecutionStateTracker(ExecutionStateSampler.instance());
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(metricsContainerStepMap, executionStateTracker);
        Set<String> hashSet = new HashSet<>();
        PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry(metricsContainerStepMap, executionStateTracker, ExecutionStateTracker.START_STATE_NAME);
        PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry(metricsContainerStepMap, executionStateTracker, ExecutionStateTracker.FINISH_STATE_NAME);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (Map.Entry<String, RunnerApi.PTransform> entry : processBundleDescriptor.getTransformsMap().entrySet()) {
            Iterator<String> it = entry.getValue().getInputsMap().values().iterator();
            while (it.hasNext()) {
                create.put(it.next(), entry.getKey());
            }
        }
        HandleStateCallsForBundle blockTillStateCallsFinish = processBundleDescriptor.hasStateApiServiceDescriptor() ? new BlockTillStateCallsFinish(this.beamFnStateGrpcClientCache.forApiServiceDescriptor(processBundleDescriptor.getStateApiServiceDescriptor())) : new FailAllStateCallsForBundle(processBundleRequest);
        BeamFnTimerClient beamFnTimerGrpcClient = processBundleDescriptor.hasTimerApiServiceDescriptor() ? new BeamFnTimerGrpcClient(queueingBeamFnDataClient, processBundleDescriptor.getTimerApiServiceDescriptor()) : new FailAllTimerRegistrations(processBundleRequest);
        BundleSplitListener.InMemory create2 = BundleSplitListener.InMemory.create();
        final ArrayList arrayList4 = new ArrayList();
        DoFn.BundleFinalizer bundleFinalizer = new DoFn.BundleFinalizer() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandler.1
            @Override // org.apache.beam.sdk.transforms.DoFn.BundleFinalizer
            public void afterBundleCommit(Instant instant, DoFn.BundleFinalizer.Callback callback) {
                arrayList4.add(FinalizeBundleHandler.CallbackRegistration.create(instant, callback));
            }
        };
        BundleProcessor create3 = BundleProcessor.create(pTransformFunctionRegistry, pTransformFunctionRegistry2, arrayList, arrayList2, arrayList3, create2, pCollectionConsumerRegistry, metricsContainerStepMap, executionStateTracker, blockTillStateCallsFinish, queueingBeamFnDataClient, arrayList4);
        for (Map.Entry<String, RunnerApi.PTransform> entry2 : processBundleDescriptor.getTransformsMap().entrySet()) {
            if ("beam:runner:source:v1".equals(entry2.getValue().getSpec().getUrn()) || "beam:runner:sink:v1".equals(entry2.getValue().getSpec().getUrn()) || JAVA_SOURCE_URN.equals(entry2.getValue().getSpec().getUrn()) || PTransformTranslation.READ_TRANSFORM_URN.equals(entry2.getValue().getSpec().getUrn())) {
                String key = entry2.getKey();
                RunnerApi.PTransform value = entry2.getValue();
                Objects.requireNonNull(create3);
                Supplier<String> supplier = create3::getInstructionId;
                Objects.requireNonNull(arrayList);
                Consumer<ThrowingRunnable> consumer = (v1) -> {
                    r13.add(v1);
                };
                Objects.requireNonNull(arrayList2);
                Consumer<ThrowingRunnable> consumer2 = (v1) -> {
                    r14.add(v1);
                };
                Objects.requireNonNull(arrayList3);
                createRunnerAndConsumersForPTransformRecursively(blockTillStateCallsFinish, beamFnTimerGrpcClient, queueingBeamFnDataClient, key, value, supplier, processBundleDescriptor, create, pCollectionConsumerRegistry, hashSet, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, (v1) -> {
                    r15.add(v1);
                }, create2, bundleFinalizer, create3.getChannelRoots());
            }
        }
        return create3;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }

    static {
        TreeSet newTreeSet = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
        newTreeSet.addAll(Lists.newArrayList(ServiceLoader.load(PTransformRunnerFactory.Registrar.class, ReflectHelpers.findClassLoader())));
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator it = newTreeSet.iterator();
        while (it.hasNext()) {
            builder.putAll(((PTransformRunnerFactory.Registrar) it.next()).getPTransformRunnerFactories());
        }
        REGISTERED_RUNNER_FACTORIES = builder.build();
    }
}
