package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsContainerStepMapAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.ProcessContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SideInputBroadcast;
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.CachedSideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.class */
public class DoFnFunction<InputT, OutputT> implements MapPartitionsFunction<WindowedValue<InputT>, Tuple2<TupleTag<?>, WindowedValue<?>>> {
    private final MetricsContainerStepMapAccumulator metricsAccum;
    private final String stepName;
    private final DoFn<InputT, OutputT> doFn;
    private transient boolean wasSetupCalled;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
    private final SerializablePipelineOptions serializableOptions;
    private final List<TupleTag<?>> additionalOutputTags;
    private final TupleTag<OutputT> mainOutputTag;
    private final Coder<InputT> inputCoder;
    private final Map<TupleTag<?>, Coder<?>> outputCoderMap;
    private final SideInputBroadcast broadcastStateData;
    private DoFnSchemaInformation doFnSchemaInformation;
    private Map<String, PCollectionView<?>> sideInputMapping;

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction$DoFnOutputManager.class */
    private class DoFnOutputManager implements ProcessContext.ProcessOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {
        private final Multimap<TupleTag<?>, WindowedValue<?>> outputs;

        private DoFnOutputManager() {
            this.outputs = LinkedListMultimap.create();
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.ProcessContext.ProcessOutputManager
        public void clear() {
            this.outputs.clear();
        }

        @Override // java.lang.Iterable
        public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() {
            return Iterators.transform(this.outputs.entries().iterator(), entryToTupleFn());
        }

        private <K, V> Function<Map.Entry<K, V>, Tuple2<K, V>> entryToTupleFn() {
            return entry -> {
                return new Tuple2(entry.getKey(), entry.getValue());
            };
        }

        public synchronized <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            this.outputs.put(tupleTag, windowedValue);
        }
    }

    public DoFnFunction(MetricsContainerStepMapAccumulator metricsContainerStepMapAccumulator, String str, DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> map, SerializablePipelineOptions serializablePipelineOptions, List<TupleTag<?>> list, TupleTag<OutputT> tupleTag, Coder<InputT> coder, Map<TupleTag<?>, Coder<?>> map2, SideInputBroadcast sideInputBroadcast, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map3) {
        this.metricsAccum = metricsContainerStepMapAccumulator;
        this.stepName = str;
        this.doFn = doFn;
        this.windowingStrategy = windowingStrategy;
        this.sideInputs = map;
        this.serializableOptions = serializablePipelineOptions;
        this.additionalOutputTags = list;
        this.mainOutputTag = tupleTag;
        this.inputCoder = coder;
        this.outputCoderMap = map2;
        this.broadcastStateData = sideInputBroadcast;
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.sideInputMapping = map3;
    }

    public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> it) throws Exception {
        if (!this.wasSetupCalled && it.hasNext()) {
            DoFnInvokers.tryInvokeSetupFor(this.doFn);
            this.wasSetupCalled = true;
        }
        DoFnOutputManager doFnOutputManager = new DoFnOutputManager();
        return new ProcessContext(this.doFn, new DoFnRunnerWithMetrics(this.stepName, DoFnRunners.simpleRunner(this.serializableOptions.get(), this.doFn, CachedSideInputReader.of(new SparkSideInputReader(this.sideInputs, this.broadcastStateData)), doFnOutputManager, this.mainOutputTag, this.additionalOutputTags, new NoOpStepContext(), this.inputCoder, this.outputCoderMap, this.windowingStrategy, this.doFnSchemaInformation, this.sideInputMapping), this.metricsAccum), doFnOutputManager, Collections.emptyIterator()).processPartition(it).iterator();
    }
}
