package org.apache.beam.runners.spark.translation;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.InMemoryBagUserStateFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.class */
class SparkExecutableStageFunction<InputT, SideInputT> implements FlatMapFunction<Iterator<WindowedValue<InputT>>, RawUnionValue> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkExecutableStageFunction.class);
    private final RunnerApi.ExecutableStagePayload stagePayload;
    private final Map<String, Integer> outputMap;
    private final SparkExecutableStageContextFactory contextFactory;
    private final Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>> sideInputs;
    private final MetricsContainerStepMapAccumulator metricsAccumulator;
    private final Coder windowCoder;
    private final JobInfo jobInfo;
    private transient InMemoryBagUserStateFactory bagUserStateHandlerFactory;
    private transient Object currentTimerKey;

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkExecutableStageFunction$JobBundleFactoryCreator.class */
    interface JobBundleFactoryCreator extends Serializable {
        JobBundleFactory create();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkExecutableStageFunction$ReceiverFactory.class */
    public static class ReceiverFactory implements OutputReceiverFactory {
        private final ConcurrentLinkedQueue<RawUnionValue> collector;
        private final Map<String, Integer> outputMap;

        ReceiverFactory(ConcurrentLinkedQueue<RawUnionValue> concurrentLinkedQueue, Map<String, Integer> map) {
            this.collector = concurrentLinkedQueue;
            this.outputMap = map;
        }

        public <OutputT> FnDataReceiver<OutputT> create(String str) {
            Integer num = this.outputMap.get(str);
            if (num == null) {
                throw new IllegalStateException(String.format(Locale.ENGLISH, "Unknown PCollectionId %s", str));
            }
            int intValue = num.intValue();
            return obj -> {
                this.collector.add(new RawUnionValue(intValue, obj));
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkExecutableStageFunction(RunnerApi.ExecutableStagePayload executableStagePayload, JobInfo jobInfo, Map<String, Integer> map, SparkExecutableStageContextFactory sparkExecutableStageContextFactory, Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>> map2, MetricsContainerStepMapAccumulator metricsContainerStepMapAccumulator, Coder coder) {
        this.stagePayload = executableStagePayload;
        this.jobInfo = jobInfo;
        this.outputMap = map;
        this.contextFactory = sparkExecutableStageContextFactory;
        this.sideInputs = map2;
        this.metricsAccumulator = metricsContainerStepMapAccumulator;
        this.windowCoder = coder;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlatMapFunction<Tuple2<ByteArray, Iterable<WindowedValue<InputT>>>, RawUnionValue> forPair() {
        return tuple2 -> {
            return call((Iterator) ((Iterable) tuple2._2).iterator());
        };
    }

    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 13, insn: 0x0171: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:55:0x0171 */
    /* JADX WARN: Not initialized variable reg: 14, insn: 0x0176: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r14 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:57:0x0176 */
    /* JADX WARN: Type inference failed for: r13v0, types: [java.lang.AutoCloseable] */
    /* JADX WARN: Type inference failed for: r14v0, types: [java.lang.Throwable] */
    public Iterator<RawUnionValue> call(Iterator<WindowedValue<InputT>> it) throws Exception {
        ?? r13;
        ?? r14;
        ExecutableStageContext executableStageContext = this.contextFactory.get(this.jobInfo);
        try {
            try {
                ExecutableStage fromPayload = ExecutableStage.fromPayload(this.stagePayload);
                StageBundleFactory stageBundleFactory = executableStageContext.getStageBundleFactory(fromPayload);
                ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
                StateRequestHandler stateRequestHandler = getStateRequestHandler(fromPayload, stageBundleFactory.getProcessBundleDescriptor());
                if (fromPayload.getTimers().size() == 0) {
                    processElements(fromPayload, stateRequestHandler, new ReceiverFactory(concurrentLinkedQueue, this.outputMap), null, stageBundleFactory, it);
                    Iterator<RawUnionValue> it2 = concurrentLinkedQueue.iterator();
                    if (stageBundleFactory != null) {
                        $closeResource(null, stageBundleFactory);
                    }
                    return it2;
                }
                InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();
                inMemoryTimerInternals.advanceProcessingTime(Instant.now());
                inMemoryTimerInternals.advanceSynchronizedProcessingTime(Instant.now());
                ReceiverFactory receiverFactory = new ReceiverFactory(concurrentLinkedQueue, this.outputMap);
                TimerReceiverFactory timerReceiverFactory = new TimerReceiverFactory(stageBundleFactory, (timer, timerData) -> {
                    this.currentTimerKey = timer.getUserKey();
                    if (timer.getClearBit()) {
                        inMemoryTimerInternals.deleteTimer(timerData);
                    } else {
                        inMemoryTimerInternals.setTimer(timerData);
                    }
                }, this.windowCoder);
                processElements(fromPayload, stateRequestHandler, receiverFactory, timerReceiverFactory, stageBundleFactory, it);
                inMemoryTimerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
                inMemoryTimerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
                inMemoryTimerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
                while (inMemoryTimerInternals.hasPendingTimers()) {
                    RemoteBundle bundle = stageBundleFactory.getBundle(receiverFactory, timerReceiverFactory, stateRequestHandler, getBundleProgressHandler());
                    Throwable th = null;
                    try {
                        try {
                            PipelineTranslatorUtils.fireEligibleTimers(inMemoryTimerInternals, bundle.getTimerReceivers(), this.currentTimerKey);
                            if (bundle != null) {
                                $closeResource(null, bundle);
                            }
                        } finally {
                        }
                    } catch (Throwable th2) {
                        if (bundle != null) {
                            $closeResource(th, bundle);
                        }
                        throw th2;
                    }
                }
                Iterator<RawUnionValue> it3 = concurrentLinkedQueue.iterator();
                if (stageBundleFactory != null) {
                    $closeResource(null, stageBundleFactory);
                }
                if (executableStageContext != null) {
                    $closeResource(null, executableStageContext);
                }
                return it3;
            } catch (Throwable th3) {
                if (r13 != 0) {
                    $closeResource(r14, r13);
                }
                throw th3;
            }
        } finally {
            if (executableStageContext != null) {
                $closeResource(null, executableStageContext);
            }
        }
    }

    private void processElements(ExecutableStage executableStage, StateRequestHandler stateRequestHandler, ReceiverFactory receiverFactory, TimerReceiverFactory timerReceiverFactory, StageBundleFactory stageBundleFactory, Iterator<WindowedValue<InputT>> it) throws Exception {
        RemoteBundle bundle = stageBundleFactory.getBundle(receiverFactory, timerReceiverFactory, stateRequestHandler, getBundleProgressHandler());
        try {
            FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(bundle.getInputReceivers().values());
            while (it.hasNext()) {
                fnDataReceiver.accept(it.next());
            }
        } finally {
            if (bundle != null) {
                $closeResource(null, bundle);
            }
        }
    }

    private BundleProgressHandler getBundleProgressHandler() {
        final MetricsContainerImpl container = this.metricsAccumulator.m28value().getContainer(this.stagePayload.getInput());
        return new BundleProgressHandler() { // from class: org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.1
            public void onProgress(BeamFnApi.ProcessBundleProgressResponse processBundleProgressResponse) {
                container.update(processBundleProgressResponse.getMonitoringInfosList());
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse processBundleResponse) {
                container.update(processBundleResponse.getMonitoringInfosList());
            }
        };
    }

    private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage, ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor) {
        StateRequestHandler unsupported;
        EnumMap enumMap = new EnumMap(BeamFnApi.StateKey.TypeCase.class);
        try {
            StateRequestHandler forSideInputHandlerFactory = StateRequestHandlers.forSideInputHandlerFactory(ProcessBundleDescriptors.getSideInputs(executableStage), BatchSideInputHandlerFactory.forStage(executableStage, new BatchSideInputHandlerFactory.SideInputGetter() { // from class: org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.2
                public <T> List<T> getSideInput(String str) {
                    Tuple2 tuple2 = (Tuple2) SparkExecutableStageFunction.this.sideInputs.get(str);
                    Broadcast broadcast = (Broadcast) tuple2._1;
                    WindowedValue.WindowedValueCoder windowedValueCoder = (WindowedValue.WindowedValueCoder) tuple2._2;
                    return (List) ((List) broadcast.value()).stream().map(bArr -> {
                        return (WindowedValue) CoderHelpers.fromByteArray(bArr, windowedValueCoder);
                    }).collect(Collectors.toList());
                }
            }));
            if (this.bagUserStateHandlerFactory == null) {
                this.bagUserStateHandlerFactory = new InMemoryBagUserStateFactory();
            }
            if (executableStage.getUserStates().size() > 0) {
                this.bagUserStateHandlerFactory.resetForNewKey();
                unsupported = StateRequestHandlers.forBagUserStateHandlerFactory(executableProcessBundleDescriptor, this.bagUserStateHandlerFactory);
            } else {
                unsupported = StateRequestHandler.unsupported();
            }
            enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.ITERABLE_SIDE_INPUT, (BeamFnApi.StateKey.TypeCase) forSideInputHandlerFactory);
            enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, (BeamFnApi.StateKey.TypeCase) forSideInputHandlerFactory);
            enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.MULTIMAP_KEYS_SIDE_INPUT, (BeamFnApi.StateKey.TypeCase) forSideInputHandlerFactory);
            enumMap.put((EnumMap) BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, (BeamFnApi.StateKey.TypeCase) unsupported);
            return StateRequestHandlers.delegateBasedUponType(enumMap);
        } catch (IOException e) {
            throw new RuntimeException("Failed to setup state handler", e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1708456413:
                if (implMethodName.equals("lambda$forPair$55b1e86f$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/SparkExecutableStageFunction") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Ljava/util/Iterator;")) {
                    SparkExecutableStageFunction sparkExecutableStageFunction = (SparkExecutableStageFunction) serializedLambda.getCapturedArg(0);
                    return tuple2 -> {
                        return call((Iterator) ((Iterable) tuple2._2).iterator());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
