package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import scala.Function1;
import scala.Tuple2;
import scala.collection.Iterator;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory.class */
public abstract class DoFnPartitionIteratorFactory<InT, FnOutT, OutT> implements Function1<Iterator<WindowedValue<InT>>, Iterator<OutT>>, Serializable {
    protected final String stepName;
    protected final DoFn<InT, FnOutT> doFn;
    protected final DoFnSchemaInformation doFnSchema;
    protected final Supplier<PipelineOptions> options;
    protected final Coder<InT> coder;
    protected final WindowingStrategy<?, ?> windowingStrategy;
    protected final TupleTag<FnOutT> mainOutput;
    protected final List<TupleTag<?>> additionalOutputs;
    protected final Map<TupleTag<?>, Coder<?>> outputCoders;
    protected final Map<String, PCollectionView<?>> sideInputs;
    protected final SideInputReader sideInputReader;
    private final MetricsAccumulator metrics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory$DoFnPartitionIt.class */
    public class DoFnPartitionIt extends AbstractIterator<OutT> {
        private final Deque<OutT> buffer;
        private final DoFnRunner<InT, ?> doFnRunner;
        private final Iterator<WindowedValue<InT>> partitionIt;
        private boolean isBundleFinished;

        private DoFnPartitionIt(Iterator<WindowedValue<InT>> iterator) {
            this.buffer = new ArrayDeque();
            this.doFnRunner = DoFnPartitionIteratorFactory.this.metricsRunner(DoFnPartitionIteratorFactory.this.simpleRunner(this.buffer));
            this.partitionIt = iterator;
            DoFnInvokers.tryInvokeSetupFor(DoFnPartitionIteratorFactory.this.doFn, DoFnPartitionIteratorFactory.this.options.get());
            try {
                this.doFnRunner.startBundle();
            } catch (RuntimeException e) {
                DoFnInvokers.invokerFor(DoFnPartitionIteratorFactory.this.doFn).invokeTeardown();
                throw e;
            }
        }

        protected OutT computeNext() {
            while (this.buffer.isEmpty()) {
                try {
                    if (this.partitionIt.hasNext()) {
                        this.doFnRunner.processElement((WindowedValue) this.partitionIt.next());
                    } else {
                        if (this.isBundleFinished) {
                            DoFnInvokers.invokerFor(DoFnPartitionIteratorFactory.this.doFn).invokeTeardown();
                            return (OutT) endOfData();
                        }
                        this.isBundleFinished = true;
                        this.doFnRunner.finishBundle();
                    }
                } catch (RuntimeException e) {
                    DoFnInvokers.invokerFor(DoFnPartitionIteratorFactory.this.doFn).invokeTeardown();
                    throw e;
                }
            }
            return this.buffer.remove();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory$MultiOut.class */
    private static class MultiOut<InT, FnOutT, OutT> extends DoFnPartitionIteratorFactory<InT, FnOutT, Tuple2<Integer, WindowedValue<OutT>>> {
        private final Map<String, Integer> tagColIdx;

        public MultiOut(AppliedPTransform<PCollection<? extends InT>, ?, ParDo.MultiOutput<InT, FnOutT>> appliedPTransform, Supplier<PipelineOptions> supplier, PCollection<InT> pCollection, SideInputReader sideInputReader, MetricsAccumulator metricsAccumulator, Map<String, Integer> map) {
            super(appliedPTransform, supplier, pCollection, sideInputReader, metricsAccumulator);
            this.tagColIdx = map;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory
        DoFnRunners.OutputManager outputManager(final Deque<Tuple2<Integer, WindowedValue<OutT>>> deque) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory.MultiOut.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    Integer num = (Integer) MultiOut.this.tagColIdx.get(tupleTag.getId());
                    if (num != null) {
                        deque.add(ScalaInterop.tuple(num, windowedValue));
                    }
                }
            };
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory
        public /* bridge */ /* synthetic */ Object apply(Object obj) {
            return super.apply((Iterator) obj);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory$SingleOut.class */
    private static class SingleOut<InT, OutT> extends DoFnPartitionIteratorFactory<InT, OutT, WindowedValue<OutT>> {
        private SingleOut(AppliedPTransform<PCollection<? extends InT>, ?, ParDo.MultiOutput<InT, OutT>> appliedPTransform, Supplier<PipelineOptions> supplier, PCollection<InT> pCollection, SideInputReader sideInputReader, MetricsAccumulator metricsAccumulator) {
            super(appliedPTransform, supplier, pCollection, sideInputReader, metricsAccumulator);
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory
        DoFnRunners.OutputManager outputManager(final Deque<WindowedValue<OutT>> deque) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory.SingleOut.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    if (SingleOut.this.mainOutput.equals(tupleTag)) {
                        deque.add(windowedValue);
                    }
                }
            };
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory
        public /* bridge */ /* synthetic */ Object apply(Object obj) {
            return super.apply((Iterator) obj);
        }
    }

    private DoFnPartitionIteratorFactory(AppliedPTransform<PCollection<? extends InT>, ?, ParDo.MultiOutput<InT, FnOutT>> appliedPTransform, Supplier<PipelineOptions> supplier, PCollection<InT> pCollection, SideInputReader sideInputReader, MetricsAccumulator metricsAccumulator) {
        this.stepName = appliedPTransform.getFullName();
        this.doFn = appliedPTransform.getTransform().getFn();
        this.doFnSchema = ParDoTranslation.getSchemaInformation(appliedPTransform);
        this.options = supplier;
        this.coder = pCollection.getCoder();
        this.windowingStrategy = pCollection.getWindowingStrategy();
        this.mainOutput = appliedPTransform.getTransform().getMainOutputTag();
        this.additionalOutputs = additionalOutputs(appliedPTransform.getTransform());
        this.outputCoders = outputCoders(appliedPTransform.getOutputs());
        this.sideInputs = appliedPTransform.getTransform().getSideInputs();
        this.sideInputReader = sideInputReader;
        this.metrics = metricsAccumulator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <InT, OutT> DoFnPartitionIteratorFactory<InT, ?, WindowedValue<OutT>> singleOutput(AppliedPTransform<PCollection<? extends InT>, ?, ParDo.MultiOutput<InT, OutT>> appliedPTransform, Supplier<PipelineOptions> supplier, PCollection<InT> pCollection, SideInputReader sideInputReader, MetricsAccumulator metricsAccumulator) {
        return new SingleOut(appliedPTransform, supplier, pCollection, sideInputReader, metricsAccumulator);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <InT, FnOutT, OutT> DoFnPartitionIteratorFactory<InT, ?, Tuple2<Integer, WindowedValue<OutT>>> multiOutput(AppliedPTransform<PCollection<? extends InT>, ?, ParDo.MultiOutput<InT, FnOutT>> appliedPTransform, Supplier<PipelineOptions> supplier, PCollection<InT> pCollection, SideInputReader sideInputReader, MetricsAccumulator metricsAccumulator, Map<String, Integer> map) {
        return new MultiOut(appliedPTransform, supplier, pCollection, sideInputReader, metricsAccumulator, map);
    }

    @Override // 
    public Iterator<OutT> apply(Iterator<WindowedValue<InT>> iterator) {
        return iterator.hasNext() ? ScalaInterop.scalaIterator((java.util.Iterator) new DoFnPartitionIt(iterator)) : Iterator.empty();
    }

    abstract DoFnRunners.OutputManager outputManager(Deque<OutT> deque);

    /* JADX INFO: Access modifiers changed from: private */
    public DoFnRunner<InT, FnOutT> simpleRunner(Deque<OutT> deque) {
        return DoFnRunners.simpleRunner(this.options.get(), this.doFn, CachedSideInputReader.of(this.sideInputReader, this.sideInputs.values()), outputManager(deque), this.mainOutput, this.additionalOutputs, new NoOpStepContext(), this.coder, this.outputCoders, this.windowingStrategy, this.doFnSchema, this.sideInputs);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DoFnRunner<InT, FnOutT> metricsRunner(DoFnRunner<InT, FnOutT> doFnRunner) {
        return new DoFnRunnerWithMetrics(this.stepName, doFnRunner, this.metrics);
    }

    private static Map<TupleTag<?>, Coder<?>> outputCoders(Map<TupleTag<?>, PCollection<?>> map) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<TupleTag<?>, PCollection<?>> entry : map.entrySet()) {
            newHashMapWithExpectedSize.put(entry.getKey(), entry.getValue().getCoder());
        }
        return newHashMapWithExpectedSize;
    }

    private static List<TupleTag<?>> additionalOutputs(ParDo.MultiOutput<?, ?> multiOutput) {
        List all = multiOutput.getAdditionalOutputTags().getAll();
        return all.isEmpty() ? Collections.emptyList() : new ArrayList(all);
    }
}
