package org.apache.beam.fn.harness.control;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeSet;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataReadRunner;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.TimerEndpoint;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.util.construction.BeamUrns;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.Timer;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.SetMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler.class */
public class ProcessBundleHandler {
    private static final String DATA_INPUT_URN = "beam:runner:source:v1";
    private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";
    public static final String JAVA_SOURCE_URN = "beam:source:java:0.1";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ProcessBundleHandler.class);

    @VisibleForTesting
    static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES;
    private final PipelineOptions options;
    private final Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry;
    private final BeamFnDataClient beamFnDataClient;
    private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
    private final FinalizeBundleHandler finalizeBundleHandler;
    private final ShortIdMap shortIds;
    private final boolean runnerAcceptsShortIds;
    private final ExecutionStateSampler executionStateSampler;
    private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
    private final PTransformRunnerFactory defaultPTransformRunnerFactory;
    private final Cache<Object, Object> processWideCache;

    @VisibleForTesting
    final BundleProcessorCache bundleProcessorCache;
    private final Set<String> runnerCapabilities;
    private final DataSampler dataSampler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$BlockTillStateCallsFinish.class */
    public static class BlockTillStateCallsFinish extends HandleStateCallsForBundle {
        private final BeamFnStateClient beamFnStateClient;
        private final Phaser phaser;
        private int currentPhase;

        private BlockTillStateCallsFinish(BeamFnStateClient beamFnStateClient) {
            this.beamFnStateClient = beamFnStateClient;
            this.phaser = new Phaser(1);
            this.currentPhase = this.phaser.getPhase();
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            int unarrivedParties = this.phaser.getUnarrivedParties();
            if (unarrivedParties > 0) {
                ProcessBundleHandler.LOG.debug("Waiting for {} parties to arrive before closing, current phase {}.", Integer.valueOf(unarrivedParties), Integer.valueOf(this.currentPhase));
            }
            this.currentPhase = this.phaser.arriveAndAwaitAdvance();
        }

        @Override // org.apache.beam.fn.harness.state.BeamFnStateClient
        public CompletableFuture<BeamFnApi.StateResponse> handle(BeamFnApi.StateRequest.Builder builder) {
            CompletableFuture<BeamFnApi.StateResponse> handle = this.beamFnStateClient.handle(builder);
            this.phaser.register();
            handle.whenComplete((stateResponse, th) -> {
                this.phaser.arriveAndDeregister();
            });
            return handle;
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$BundleProcessor.class */
    public static abstract class BundleProcessor {
        private String instructionId;
        private List<BeamFnApi.ProcessBundleRequest.CacheToken> cacheTokens;
        private Caches.ClearableCache<Object, Object> bundleCache;
        private BeamFnDataInboundObserver inboundObserver2;

        public static BundleProcessor create(Cache<Object, Object> cache, BundleProgressReporter.InMemory inMemory, BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, List<ThrowingRunnable> list, List<ThrowingRunnable> list2, BundleSplitListener.InMemory inMemory2, PCollectionConsumerRegistry pCollectionConsumerRegistry, MetricsEnvironmentStateForBundle metricsEnvironmentStateForBundle, ExecutionStateSampler.ExecutionStateTracker executionStateTracker, HandleStateCallsForBundle handleStateCallsForBundle, Collection<FinalizeBundleHandler.CallbackRegistration> collection, Set<String> set) {
            return new AutoValue_ProcessBundleHandler_BundleProcessor(cache, inMemory, processBundleDescriptor, pTransformFunctionRegistry, pTransformFunctionRegistry2, list, list2, inMemory2, pCollectionConsumerRegistry, metricsEnvironmentStateForBundle, executionStateTracker, handleStateCallsForBundle, new ArrayList(), new ArrayList(), new ArrayList(), collection, new ArrayList(), new LinkedHashMap(), set, new ReentrantLock());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Cache<?, ?> getProcessWideCache();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract BundleProgressReporter.InMemory getBundleProgressReporterAndRegistrar();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract PTransformFunctionRegistry getStartFunctionRegistry();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract PTransformFunctionRegistry getFinishFunctionRegistry();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<ThrowingRunnable> getResetFunctions();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<ThrowingRunnable> getTearDownFunctions();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract BundleSplitListener.InMemory getSplitListener();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract PCollectionConsumerRegistry getpCollectionConsumerRegistry();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract MetricsEnvironmentStateForBundle getMetricsEnvironmentStateForBundle();

        public abstract ExecutionStateSampler.ExecutionStateTracker getStateTracker();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract HandleStateCallsForBundle getBeamFnStateClient();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<Endpoints.ApiServiceDescriptor> getInboundEndpointApiServiceDescriptors();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<DataEndpoint<?>> getInboundDataEndpoints();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<TimerEndpoint<?>> getTimerEndpoints();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Collection<FinalizeBundleHandler.CallbackRegistration> getBundleFinalizationCallbackRegistrations();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Collection<BeamFnDataReadRunner> getChannelRoots();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Map<Endpoints.ApiServiceDescriptor, BeamFnDataOutboundAggregator> getOutboundAggregators();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Set<String> getRunnerCapabilities();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Lock getProgressRequestLock();

        synchronized String getInstructionId() {
            return this.instructionId;
        }

        synchronized List<BeamFnApi.ProcessBundleRequest.CacheToken> getCacheTokens() {
            return this.cacheTokens;
        }

        synchronized Cache<Object, Object> getBundleCache() {
            if (this.bundleCache == null) {
                this.bundleCache = new Caches.ClearableCache<>(Caches.subCache(getProcessWideCache(), "Bundle", this.instructionId));
            }
            return this.bundleCache;
        }

        BeamFnDataInboundObserver getInboundObserver() {
            return this.inboundObserver2;
        }

        void finish() {
            this.inboundObserver2 = BeamFnDataInboundObserver.forConsumers(getInboundDataEndpoints(), getTimerEndpoints());
            Iterator<BeamFnDataOutboundAggregator> it = getOutboundAggregators().values().iterator();
            while (it.hasNext()) {
                it.next().start();
            }
        }

        synchronized void setupForProcessBundleRequest(BeamFnApi.InstructionRequest instructionRequest) {
            this.instructionId = instructionRequest.getInstructionId();
            this.cacheTokens = instructionRequest.getProcessBundle().getCacheTokensList();
            getMetricsEnvironmentStateForBundle().start(getStateTracker().getMetricsContainer());
        }

        void reset() throws Exception {
            synchronized (this) {
                this.instructionId = null;
                this.cacheTokens = null;
                if (this.bundleCache != null) {
                    this.bundleCache.clear();
                    this.bundleCache = null;
                }
            }
            getSplitListener().clear();
            getMetricsEnvironmentStateForBundle().reset();
            getStateTracker().reset();
            getBundleFinalizationCallbackRegistrations().clear();
            Iterator<ThrowingRunnable> it = getResetFunctions().iterator();
            while (it.hasNext()) {
                it.next().run();
            }
            getInboundObserver().reset();
            getBundleProgressReporterAndRegistrar().reset();
            getProgressRequestLock().unlock();
        }

        void discard() {
            synchronized (this) {
                this.instructionId = null;
                this.cacheTokens = null;
                if (this.bundleCache != null) {
                    this.bundleCache.clear();
                }
                getMetricsEnvironmentStateForBundle().discard();
                Iterator<BeamFnDataOutboundAggregator> it = getOutboundAggregators().values().iterator();
                while (it.hasNext()) {
                    it.next().discard();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void shutdown() {
            for (ThrowingRunnable throwingRunnable : getTearDownFunctions()) {
                ProcessBundleHandler.LOG.debug("Tearing down function {}", throwingRunnable);
                try {
                    throwingRunnable.run();
                } catch (Exception e) {
                    ProcessBundleHandler.LOG.error("Exceptions are thrown from DoFn.teardown method. Note that it will not fail the pipeline execution,", (Throwable) e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$BundleProcessorCache.class */
    public static class BundleProcessorCache {
        private final LoadingCache<String, ConcurrentLinkedQueue<BundleProcessor>> cachedBundleProcessors = CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(1)).removalListener(removalNotification -> {
            ((ConcurrentLinkedQueue) removalNotification.getValue()).forEach(bundleProcessor -> {
                bundleProcessor.shutdown();
            });
        }).build(new CacheLoader<String, ConcurrentLinkedQueue<BundleProcessor>>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessorCache.1
            @Override // org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader
            public ConcurrentLinkedQueue<BundleProcessor> load(String str) throws Exception {
                return new ConcurrentLinkedQueue<>();
            }
        });
        private final Map<String, BundleProcessor> activeBundleProcessors = Collections.synchronizedMap(new WeakHashMap());

        @Pure
        public int hashCode() {
            return super.hashCode();
        }

        BundleProcessorCache() {
        }

        @VisibleForTesting
        Map<String, ConcurrentLinkedQueue<BundleProcessor>> getCachedBundleProcessors() {
            return ImmutableMap.copyOf((Map) this.cachedBundleProcessors.asMap());
        }

        public Map<String, BundleProcessor> getActiveBundleProcessors() {
            return ImmutableMap.copyOf((Map) this.activeBundleProcessors);
        }

        BundleProcessor get(BeamFnApi.InstructionRequest instructionRequest, Supplier<BundleProcessor> supplier) {
            BundleProcessor poll = this.cachedBundleProcessors.getUnchecked(instructionRequest.getProcessBundle().getProcessBundleDescriptorId()).poll();
            if (poll == null) {
                poll = supplier.get();
            }
            poll.setupForProcessBundleRequest(instructionRequest);
            this.activeBundleProcessors.put(instructionRequest.getInstructionId(), poll);
            return poll;
        }

        public BundleProcessor find(String str) {
            return this.activeBundleProcessors.get(str);
        }

        void release(String str, BundleProcessor bundleProcessor) {
            this.activeBundleProcessors.remove(bundleProcessor.getInstructionId());
            try {
                bundleProcessor.reset();
                this.cachedBundleProcessors.get(str).add(bundleProcessor);
            } catch (Exception e) {
                ProcessBundleHandler.LOG.warn("Was unable to reset bundle processor safely. Bundle processor will be discarded and re-instantiated on next bundle for descriptor {}.", str, e);
            }
        }

        void discard(BundleProcessor bundleProcessor) {
            bundleProcessor.discard();
            this.activeBundleProcessors.remove(bundleProcessor.getInstructionId());
        }

        void shutdown() throws Exception {
            this.cachedBundleProcessors.invalidateAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$FailAllStateCallsForBundle.class */
    public static class FailAllStateCallsForBundle extends HandleStateCallsForBundle {
        private final BeamFnApi.ProcessBundleRequest request;

        private FailAllStateCallsForBundle(BeamFnApi.ProcessBundleRequest processBundleRequest) {
            this.request = processBundleRequest;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
        }

        @Override // org.apache.beam.fn.harness.state.BeamFnStateClient
        public CompletableFuture<BeamFnApi.StateResponse> handle(BeamFnApi.StateRequest.Builder builder) {
            throw new IllegalStateException(String.format("State API calls are unsupported because the ProcessBundleRequest %s does not support state.", this.request));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$HandleStateCallsForBundle.class */
    public static abstract class HandleStateCallsForBundle implements AutoCloseable, BeamFnStateClient {
        HandleStateCallsForBundle() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$MetricsEnvironmentStateForBundle.class */
    public static class MetricsEnvironmentStateForBundle {
        private MetricsEnvironment.MetricsEnvironmentState currentThreadState;

        MetricsEnvironmentStateForBundle() {
        }

        public void start(MetricsContainer metricsContainer) {
            this.currentThreadState = MetricsEnvironment.getMetricsEnvironmentStateForCurrentThread();
            this.currentThreadState.activate(metricsContainer);
        }

        public void reset() {
            this.currentThreadState.activate(null);
            this.currentThreadState = null;
        }

        public void discard() {
            this.currentThreadState.activate(null);
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$UnknownPTransformRunnerFactory.class */
    private static class UnknownPTransformRunnerFactory implements PTransformRunnerFactory<Object> {
        private final Set<String> knownUrns;

        private UnknownPTransformRunnerFactory(Set<String> set) {
            this.knownUrns = set;
        }

        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory
        public Object createRunnerForPTransform(PTransformRunnerFactory.Context context) {
            String format = String.format("No factory registered for %s, known factories %s", context.getPTransform().getSpec().getUrn(), this.knownUrns);
            ProcessBundleHandler.LOG.error(format);
            throw new IllegalStateException(format);
        }
    }

    public ProcessBundleHandler(PipelineOptions pipelineOptions, Set<String> set, Function<String, BeamFnApi.ProcessBundleDescriptor> function, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, FinalizeBundleHandler finalizeBundleHandler, ShortIdMap shortIdMap, ExecutionStateSampler executionStateSampler, Cache<Object, Object> cache, DataSampler dataSampler) {
        this(pipelineOptions, set, function, beamFnDataClient, beamFnStateGrpcClientCache, finalizeBundleHandler, shortIdMap, executionStateSampler, REGISTERED_RUNNER_FACTORIES, cache, new BundleProcessorCache(), dataSampler);
    }

    @VisibleForTesting
    ProcessBundleHandler(PipelineOptions pipelineOptions, Set<String> set, Function<String, BeamFnApi.ProcessBundleDescriptor> function, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, FinalizeBundleHandler finalizeBundleHandler, ShortIdMap shortIdMap, ExecutionStateSampler executionStateSampler, Map<String, PTransformRunnerFactory> map, Cache<Object, Object> cache, BundleProcessorCache bundleProcessorCache, DataSampler dataSampler) {
        this.options = pipelineOptions;
        this.fnApiRegistry = function;
        this.beamFnDataClient = beamFnDataClient;
        this.beamFnStateGrpcClientCache = beamFnStateGrpcClientCache;
        this.finalizeBundleHandler = finalizeBundleHandler;
        this.shortIds = shortIdMap;
        this.runnerCapabilities = set;
        this.runnerAcceptsShortIds = set.contains(BeamUrns.getUrn(RunnerApi.StandardRunnerProtocols.Enum.MONITORING_INFO_SHORT_IDS));
        this.executionStateSampler = executionStateSampler;
        this.urnToPTransformRunnerFactoryMap = map;
        this.defaultPTransformRunnerFactory = new UnknownPTransformRunnerFactory(map.keySet());
        this.processWideCache = cache;
        this.bundleProcessorCache = bundleProcessorCache;
        this.dataSampler = dataSampler;
    }

    private void createRunnerAndConsumersForPTransformRecursively(final BeamFnStateClient beamFnStateClient, final BeamFnDataClient beamFnDataClient, final String str, final RunnerApi.PTransform pTransform, final Supplier<String> supplier, final Supplier<List<BeamFnApi.ProcessBundleRequest.CacheToken>> supplier2, final Supplier<Cache<?, ?>> supplier3, final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, SetMultimap<String, String> setMultimap, final PCollectionConsumerRegistry pCollectionConsumerRegistry, Set<String> set, final PTransformFunctionRegistry pTransformFunctionRegistry, final PTransformFunctionRegistry pTransformFunctionRegistry2, final Consumer<ThrowingRunnable> consumer, final Consumer<ThrowingRunnable> consumer2, final BiConsumer<Endpoints.ApiServiceDescriptor, DataEndpoint<?>> biConsumer, final Consumer<TimerEndpoint<?>> consumer3, final Consumer<BundleProgressReporter> consumer4, final BundleSplitListener bundleSplitListener, final DoFn.BundleFinalizer bundleFinalizer, Collection<BeamFnDataReadRunner> collection, final Map<Endpoints.ApiServiceDescriptor, BeamFnDataOutboundAggregator> map, final Set<String> set2) throws IOException {
        Iterator<String> it = pTransform.getOutputsMap().values().iterator();
        while (it.hasNext()) {
            for (String str2 : setMultimap.get((SetMultimap<String, String>) it.next())) {
                createRunnerAndConsumersForPTransformRecursively(beamFnStateClient, beamFnDataClient, str2, processBundleDescriptor.getTransformsMap().get(str2), supplier, supplier2, supplier3, processBundleDescriptor, setMultimap, pCollectionConsumerRegistry, set, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, biConsumer, consumer3, consumer4, bundleSplitListener, bundleFinalizer, collection, map, set2);
            }
        }
        if (!pTransform.hasSpec()) {
            throw new IllegalArgumentException(String.format("Cannot process transform with no spec: %s", TextFormat.printer().printToString(pTransform)));
        }
        if (pTransform.getSubtransformsCount() > 0) {
            throw new IllegalArgumentException(String.format("Cannot process composite transform: %s", TextFormat.printer().printToString(pTransform)));
        }
        if (set.contains(str)) {
            return;
        }
        Object createRunnerForPTransform = this.urnToPTransformRunnerFactoryMap.getOrDefault(pTransform.getSpec().getUrn(), this.defaultPTransformRunnerFactory).createRunnerForPTransform(new PTransformRunnerFactory.Context() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandler.1
            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public PipelineOptions getPipelineOptions() {
                return ProcessBundleHandler.this.options;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public ShortIdMap getShortIdMap() {
                return ProcessBundleHandler.this.shortIds;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public BeamFnDataClient getBeamFnDataClient() {
                return beamFnDataClient;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public BeamFnStateClient getBeamFnStateClient() {
                return beamFnStateClient;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public String getPTransformId() {
                return str;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public RunnerApi.PTransform getPTransform() {
                return pTransform;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public Supplier<String> getProcessBundleInstructionIdSupplier() {
                return supplier;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public Supplier<List<BeamFnApi.ProcessBundleRequest.CacheToken>> getCacheTokensSupplier() {
                return supplier2;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public Supplier<Cache<?, ?>> getBundleCacheSupplier() {
                return supplier3;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public Cache<?, ?> getProcessWideCache() {
                return ProcessBundleHandler.this.processWideCache;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public Map<String, RunnerApi.PCollection> getPCollections() {
                return processBundleDescriptor.getPcollectionsMap();
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public Map<String, RunnerApi.Coder> getCoders() {
                return processBundleDescriptor.getCodersMap();
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public Map<String, RunnerApi.WindowingStrategy> getWindowingStrategies() {
                return processBundleDescriptor.getWindowingStrategiesMap();
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public Set<String> getRunnerCapabilities() {
                return set2;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public <T> void addPCollectionConsumer(String str3, FnDataReceiver<WindowedValue<T>> fnDataReceiver) {
                pCollectionConsumerRegistry.register(str3, str, pTransform.getUniqueName(), fnDataReceiver);
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public <T> FnDataReceiver<T> addOutgoingDataEndpoint(Endpoints.ApiServiceDescriptor apiServiceDescriptor, Coder<T> coder) {
                Map map2 = map;
                BeamFnDataClient beamFnDataClient2 = beamFnDataClient;
                Supplier supplier4 = supplier;
                Set set3 = set2;
                return ((BeamFnDataOutboundAggregator) map2.computeIfAbsent(apiServiceDescriptor, apiServiceDescriptor2 -> {
                    return beamFnDataClient2.createOutboundAggregator(apiServiceDescriptor2, supplier4, set3.contains(BeamUrns.getUrn(RunnerApi.StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING)));
                })).registerOutputDataLocation(str, coder);
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public <T> FnDataReceiver<Timer<T>> addOutgoingTimersEndpoint(String str3, Coder<Timer<T>> coder) {
                if (!processBundleDescriptor.hasTimerApiServiceDescriptor()) {
                    throw new IllegalStateException(String.format("Timers are unsupported because the ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", supplier.get()));
                }
                Map map2 = map;
                Endpoints.ApiServiceDescriptor timerApiServiceDescriptor = processBundleDescriptor.getTimerApiServiceDescriptor();
                BeamFnDataClient beamFnDataClient2 = beamFnDataClient;
                Supplier supplier4 = supplier;
                Set set3 = set2;
                return ((BeamFnDataOutboundAggregator) map2.computeIfAbsent(timerApiServiceDescriptor, apiServiceDescriptor -> {
                    return beamFnDataClient2.createOutboundAggregator(apiServiceDescriptor, supplier4, set3.contains(BeamUrns.getUrn(RunnerApi.StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING)));
                })).registerOutputTimersLocation(str, str3, coder);
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public FnDataReceiver<?> getPCollectionConsumer(String str3) {
                return pCollectionConsumerRegistry.getMultiplexingConsumer(str3);
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public void addStartBundleFunction(ThrowingRunnable throwingRunnable) {
                pTransformFunctionRegistry.register(str, pTransform.getUniqueName(), throwingRunnable);
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public void addFinishBundleFunction(ThrowingRunnable throwingRunnable) {
                pTransformFunctionRegistry2.register(str, pTransform.getUniqueName(), throwingRunnable);
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public <T> void addIncomingDataEndpoint(Endpoints.ApiServiceDescriptor apiServiceDescriptor, Coder<T> coder, FnDataReceiver<T> fnDataReceiver) {
                biConsumer.accept(apiServiceDescriptor, DataEndpoint.create(str, coder, fnDataReceiver));
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public <T> void addIncomingTimerEndpoint(String str3, Coder<Timer<T>> coder, FnDataReceiver<Timer<T>> fnDataReceiver) {
                consumer3.accept(TimerEndpoint.create(str, str3, coder, fnDataReceiver));
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public void addResetFunction(ThrowingRunnable throwingRunnable) {
                consumer.accept(throwingRunnable);
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public void addTearDownFunction(ThrowingRunnable throwingRunnable) {
                consumer2.accept(throwingRunnable);
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public void addBundleProgressReporter(BundleProgressReporter bundleProgressReporter) {
                consumer4.accept(bundleProgressReporter);
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public BundleSplitListener getSplitListener() {
                return bundleSplitListener;
            }

            @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Context
            public DoFn.BundleFinalizer getBundleFinalizer() {
                return bundleFinalizer;
            }
        });
        if (createRunnerForPTransform instanceof BeamFnDataReadRunner) {
            collection.add((BeamFnDataReadRunner) createRunnerForPTransform);
        }
        set.add(str);
    }

    /* JADX WARN: Failed to calculate best type for var: r12v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 12, insn: 0x0210: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r12 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:64:0x0210 */
    /* JADX WARN: Not initialized variable reg: 13, insn: 0x0215: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:66:0x0215 */
    /* JADX WARN: Type inference failed for: r12v0, types: [org.apache.beam.fn.harness.control.ProcessBundleHandler$HandleStateCallsForBundle] */
    /* JADX WARN: Type inference failed for: r13v0, types: [java.lang.Throwable] */
    public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest instructionRequest) throws Exception {
        BeamFnApi.ProcessBundleResponse.Builder newBuilder = BeamFnApi.ProcessBundleResponse.newBuilder();
        BundleProcessor bundleProcessor = this.bundleProcessorCache.get(instructionRequest, () -> {
            try {
                return createBundleProcessor(instructionRequest.getProcessBundle().getProcessBundleDescriptorId(), instructionRequest.getProcessBundle());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        try {
            try {
                PTransformFunctionRegistry startFunctionRegistry = bundleProcessor.getStartFunctionRegistry();
                PTransformFunctionRegistry finishFunctionRegistry = bundleProcessor.getFinishFunctionRegistry();
                ExecutionStateSampler.ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
                HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient();
                Throwable th = null;
                stateTracker.start(instructionRequest.getInstructionId());
                try {
                    for (ThrowingRunnable throwingRunnable : startFunctionRegistry.getFunctions()) {
                        LOG.debug("Starting function {}", throwingRunnable);
                        throwingRunnable.run();
                    }
                    if (instructionRequest.getProcessBundle().hasElements()) {
                        if (!bundleProcessor.getInboundObserver().multiplexElements(instructionRequest.getProcessBundle().getElements())) {
                            throw new RuntimeException("Elements embedded in ProcessBundleRequest do not contain stream terminators for all data and timer inputs. Unterminated endpoints: " + bundleProcessor.getInboundObserver().getUnfinishedEndpoints());
                        }
                    } else if (!bundleProcessor.getInboundEndpointApiServiceDescriptors().isEmpty()) {
                        BeamFnDataInboundObserver inboundObserver = bundleProcessor.getInboundObserver();
                        this.beamFnDataClient.registerReceiver(instructionRequest.getInstructionId(), bundleProcessor.getInboundEndpointApiServiceDescriptors(), inboundObserver);
                        inboundObserver.awaitCompletion();
                        this.beamFnDataClient.unregisterReceiver(instructionRequest.getInstructionId(), bundleProcessor.getInboundEndpointApiServiceDescriptors());
                    }
                    for (ThrowingRunnable throwingRunnable2 : Lists.reverse(finishFunctionRegistry.getFunctions())) {
                        LOG.debug("Finishing function {}", throwingRunnable2);
                        throwingRunnable2.run();
                    }
                    embedOutboundElementsIfApplicable(newBuilder, bundleProcessor);
                    newBuilder.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
                    bundleProcessor.getProgressRequestLock().lock();
                    Map<String, ByteString> finalMonitoringData = finalMonitoringData(bundleProcessor);
                    if (this.runnerAcceptsShortIds) {
                        newBuilder.putAllMonitoringData(finalMonitoringData);
                    } else {
                        for (Map.Entry<String, ByteString> entry : finalMonitoringData.entrySet()) {
                            newBuilder.addMonitoringInfos(this.shortIds.get(entry.getKey()).toBuilder().setPayload(entry.getValue()));
                        }
                    }
                    if (!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
                        this.finalizeBundleHandler.registerCallbacks(bundleProcessor.getInstructionId(), ImmutableList.copyOf((Collection) bundleProcessor.getBundleFinalizationCallbackRegistrations()));
                        newBuilder.setRequiresFinalization(true);
                    }
                    stateTracker.reset();
                    if (beamFnStateClient != null) {
                        if (0 != 0) {
                            try {
                                beamFnStateClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            beamFnStateClient.close();
                        }
                    }
                    this.bundleProcessorCache.release(instructionRequest.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
                    return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(newBuilder);
                } catch (Throwable th3) {
                    stateTracker.reset();
                    throw th3;
                }
            } catch (Exception e) {
                this.bundleProcessorCache.discard(bundleProcessor);
                throw e;
            }
        } finally {
        }
    }

    private void embedOutboundElementsIfApplicable(BeamFnApi.ProcessBundleResponse.Builder builder, BundleProcessor bundleProcessor) {
        if (bundleProcessor.getOutboundAggregators().isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList(bundleProcessor.getOutboundAggregators().size());
        boolean z = false;
        Iterator<BeamFnDataOutboundAggregator> it = bundleProcessor.getOutboundAggregators().values().iterator();
        while (it.hasNext()) {
            BeamFnApi.Elements sendOrCollectBufferedDataAndFinishOutboundStreams = it.next().sendOrCollectBufferedDataAndFinishOutboundStreams();
            if (sendOrCollectBufferedDataAndFinishOutboundStreams == null) {
                z = true;
            }
            arrayList.add(sendOrCollectBufferedDataAndFinishOutboundStreams);
        }
        if (!z) {
            BeamFnApi.Elements.Builder newBuilder = BeamFnApi.Elements.newBuilder();
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                newBuilder.mergeFrom((BeamFnApi.Elements) it2.next());
            }
            builder.setElements(newBuilder.build());
            return;
        }
        int i = 0;
        for (BeamFnDataOutboundAggregator beamFnDataOutboundAggregator : bundleProcessor.getOutboundAggregators().values()) {
            int i2 = i;
            i++;
            BeamFnApi.Elements elements = (BeamFnApi.Elements) arrayList.get(i2);
            if (elements != null) {
                beamFnDataOutboundAggregator.sendElements(elements);
            }
        }
    }

    public BeamFnApi.InstructionResponse.Builder progress(BeamFnApi.InstructionRequest instructionRequest) throws Exception {
        BundleProcessor find = this.bundleProcessorCache.find(instructionRequest.getProcessBundleProgress().getInstructionId());
        if (find != null && find.getProgressRequestLock().tryLock()) {
            try {
                if (this.bundleProcessorCache.find(instructionRequest.getProcessBundleProgress().getInstructionId()) == null) {
                    BeamFnApi.InstructionResponse.Builder processBundleProgress = BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance());
                    find.getProgressRequestLock().unlock();
                    return processBundleProgress;
                }
                Map<String, ByteString> intermediateMonitoringData = intermediateMonitoringData(find);
                find.getProgressRequestLock().unlock();
                BeamFnApi.ProcessBundleProgressResponse.Builder newBuilder = BeamFnApi.ProcessBundleProgressResponse.newBuilder();
                if (this.runnerAcceptsShortIds) {
                    newBuilder.putAllMonitoringData(intermediateMonitoringData);
                } else {
                    for (Map.Entry<String, ByteString> entry : intermediateMonitoringData.entrySet()) {
                        newBuilder.addMonitoringInfos(this.shortIds.get(entry.getKey()).toBuilder().setPayload(entry.getValue()));
                    }
                }
                newBuilder.setConsumingReceivedData(find.getInboundObserver().isConsumingReceivedData());
                return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(newBuilder);
            } catch (Throwable th) {
                find.getProgressRequestLock().unlock();
                throw th;
            }
        }
        return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance());
    }

    private Map<String, ByteString> intermediateMonitoringData(BundleProcessor bundleProcessor) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.putAll(bundleProcessor.getStateTracker().getMetricsContainerRegistry().getMonitoringData(this.shortIds));
        bundleProcessor.getBundleProgressReporterAndRegistrar().updateIntermediateMonitoringData(hashMap);
        return hashMap;
    }

    private Map<String, ByteString> finalMonitoringData(BundleProcessor bundleProcessor) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.putAll(bundleProcessor.getStateTracker().getMetricsContainerRegistry().getMonitoringData(this.shortIds));
        bundleProcessor.getBundleProgressReporterAndRegistrar().updateFinalMonitoringData(hashMap);
        return hashMap;
    }

    public BeamFnApi.InstructionResponse.Builder trySplit(BeamFnApi.InstructionRequest instructionRequest) {
        BundleProcessor find = this.bundleProcessorCache.find(instructionRequest.getProcessBundleSplit().getInstructionId());
        BeamFnApi.ProcessBundleSplitResponse.Builder newBuilder = BeamFnApi.ProcessBundleSplitResponse.newBuilder();
        if (find == null) {
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleSplit(BeamFnApi.ProcessBundleSplitResponse.getDefaultInstance());
        }
        Iterator<BeamFnDataReadRunner> it = find.getChannelRoots().iterator();
        while (it.hasNext()) {
            it.next().trySplit(instructionRequest.getProcessBundleSplit(), newBuilder);
        }
        return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleSplit(newBuilder);
    }

    public void shutdown() throws Exception {
        this.bundleProcessorCache.shutdown();
    }

    private BundleProcessor createBundleProcessor(String str, BeamFnApi.ProcessBundleRequest processBundleRequest) throws IOException {
        BeamFnApi.ProcessBundleDescriptor apply = this.fnApiRegistry.apply(str);
        SetMultimap<String, String> create = HashMultimap.create();
        BundleProgressReporter.InMemory inMemory = new BundleProgressReporter.InMemory();
        MetricsEnvironmentStateForBundle metricsEnvironmentStateForBundle = new MetricsEnvironmentStateForBundle();
        ExecutionStateSampler.ExecutionStateTracker create2 = this.executionStateSampler.create();
        inMemory.register(create2);
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(create2, this.shortIds, inMemory, apply, this.dataSampler);
        Set<String> hashSet = new HashSet<>();
        PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry(this.shortIds, create2, MonitoringInfoConstants.Urns.START_BUNDLE_MSECS);
        PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry(this.shortIds, create2, MonitoringInfoConstants.Urns.FINISH_BUNDLE_MSECS);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry<String, RunnerApi.PTransform> entry : apply.getTransformsMap().entrySet()) {
            Iterator<String> it = entry.getValue().getInputsMap().values().iterator();
            while (it.hasNext()) {
                create.put(it.next(), entry.getKey());
            }
        }
        HandleStateCallsForBundle blockTillStateCallsFinish = apply.hasStateApiServiceDescriptor() ? new BlockTillStateCallsFinish(this.beamFnStateGrpcClientCache.forApiServiceDescriptor(apply.getStateApiServiceDescriptor())) : new FailAllStateCallsForBundle(processBundleRequest);
        BundleSplitListener.InMemory create3 = BundleSplitListener.InMemory.create();
        final ArrayList arrayList3 = new ArrayList();
        DoFn.BundleFinalizer bundleFinalizer = new DoFn.BundleFinalizer() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandler.2
            @Override // org.apache.beam.sdk.transforms.DoFn.BundleFinalizer
            public void afterBundleCommit(Instant instant, DoFn.BundleFinalizer.Callback callback) {
                arrayList3.add(FinalizeBundleHandler.CallbackRegistration.create(instant, callback));
            }
        };
        BundleProcessor create4 = BundleProcessor.create(this.processWideCache, inMemory, apply, pTransformFunctionRegistry, pTransformFunctionRegistry2, arrayList, arrayList2, create3, pCollectionConsumerRegistry, metricsEnvironmentStateForBundle, create2, blockTillStateCallsFinish, arrayList3, this.runnerCapabilities);
        for (Map.Entry<String, RunnerApi.PTransform> entry2 : apply.getTransformsMap().entrySet()) {
            if ("beam:runner:source:v1".equals(entry2.getValue().getSpec().getUrn()) || "beam:runner:sink:v1".equals(entry2.getValue().getSpec().getUrn()) || JAVA_SOURCE_URN.equals(entry2.getValue().getSpec().getUrn()) || PTransformTranslation.READ_TRANSFORM_URN.equals(entry2.getValue().getSpec().getUrn())) {
                BeamFnDataClient beamFnDataClient = this.beamFnDataClient;
                String key = entry2.getKey();
                RunnerApi.PTransform value = entry2.getValue();
                Objects.requireNonNull(create4);
                Supplier<String> supplier = create4::getInstructionId;
                Objects.requireNonNull(create4);
                Supplier<List<BeamFnApi.ProcessBundleRequest.CacheToken>> supplier2 = create4::getCacheTokens;
                Objects.requireNonNull(create4);
                Supplier<Cache<?, ?>> supplier3 = create4::getBundleCache;
                Objects.requireNonNull(arrayList);
                Consumer<ThrowingRunnable> consumer = (v1) -> {
                    r14.add(v1);
                };
                Objects.requireNonNull(arrayList2);
                Consumer<ThrowingRunnable> consumer2 = (v1) -> {
                    r15.add(v1);
                };
                BiConsumer<Endpoints.ApiServiceDescriptor, DataEndpoint<?>> biConsumer = (apiServiceDescriptor, dataEndpoint) -> {
                    if (!create4.getInboundEndpointApiServiceDescriptors().contains(apiServiceDescriptor)) {
                        create4.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
                    }
                    create4.getInboundDataEndpoints().add(dataEndpoint);
                };
                Consumer<TimerEndpoint<?>> consumer3 = timerEndpoint -> {
                    if (!apply.hasTimerApiServiceDescriptor()) {
                        throw new IllegalStateException(String.format("Timers are unsupported because the ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", str));
                    }
                    create4.getTimerEndpoints().add(timerEndpoint);
                };
                Objects.requireNonNull(inMemory);
                createRunnerAndConsumersForPTransformRecursively(blockTillStateCallsFinish, beamFnDataClient, key, value, supplier, supplier2, supplier3, apply, create, pCollectionConsumerRegistry, hashSet, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, biConsumer, consumer3, inMemory::register, create3, bundleFinalizer, create4.getChannelRoots(), create4.getOutboundAggregators(), create4.getRunnerCapabilities());
            }
        }
        create4.finish();
        return create4;
    }

    public BundleProcessorCache getBundleProcessorCache() {
        return this.bundleProcessorCache;
    }

    static {
        TreeSet newTreeSet = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
        newTreeSet.addAll(Lists.newArrayList(ServiceLoader.load(PTransformRunnerFactory.Registrar.class, ReflectHelpers.findClassLoader())));
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator it = newTreeSet.iterator();
        while (it.hasNext()) {
            builder.putAll(((PTransformRunnerFactory.Registrar) it.next()).getPTransformRunnerFactories());
        }
        REGISTERED_RUNNER_FACTORIES = builder.build();
    }
}
