package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnRunnerFactory;
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
import scala.Function1;
import scala.Tuple2;
import scala.collection.Iterator;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory.class */
public abstract class DoFnPartitionIteratorFactory<InT, FnOutT, OutT> implements Function1<Iterator<WindowedValue<InT>>, Iterator<OutT>>, Serializable {
    protected final DoFnRunnerFactory<InT, FnOutT> factory;
    protected final Supplier<PipelineOptions> options;
    private final MetricsAccumulator metrics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory$DoFnPartitionIt.class */
    public class DoFnPartitionIt extends AbstractIterator<OutT> {
        private final Deque<OutT> buffer;
        private final DoFnRunnerFactory.DoFnRunnerWithTeardown<InT, ?> doFnRunner;
        private final Iterator<WindowedValue<InT>> partitionIt;
        private boolean isBundleFinished;

        private DoFnPartitionIt(Iterator<WindowedValue<InT>> iterator) {
            this.buffer = new ArrayDeque();
            this.partitionIt = iterator;
            this.doFnRunner = DoFnPartitionIteratorFactory.this.factory.create(DoFnPartitionIteratorFactory.this.options.get(), DoFnPartitionIteratorFactory.this.metrics, DoFnPartitionIteratorFactory.this.outputManager(this.buffer));
        }

        protected OutT computeNext() {
            while (this.buffer.isEmpty()) {
                try {
                    if (this.partitionIt.hasNext()) {
                        this.doFnRunner.processElement((WindowedValue) this.partitionIt.next());
                    } else {
                        if (this.isBundleFinished) {
                            this.doFnRunner.teardown();
                            return (OutT) endOfData();
                        }
                        this.isBundleFinished = true;
                        this.doFnRunner.finishBundle();
                    }
                } catch (RuntimeException e) {
                    this.doFnRunner.teardown();
                    throw e;
                }
            }
            return this.buffer.remove();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory$MultiOut.class */
    private static class MultiOut<InT, FnOutT, OutT> extends DoFnPartitionIteratorFactory<InT, FnOutT, Tuple2<Integer, WindowedValue<OutT>>> {
        private final Map<String, Integer> tagColIdx;

        public MultiOut(Supplier<PipelineOptions> supplier, MetricsAccumulator metricsAccumulator, DoFnRunnerFactory<InT, FnOutT> doFnRunnerFactory, Map<String, Integer> map) {
            super(supplier, metricsAccumulator, doFnRunnerFactory);
            this.tagColIdx = map;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory
        DoFnRunners.OutputManager outputManager(final Deque<Tuple2<Integer, WindowedValue<OutT>>> deque) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory.MultiOut.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    Integer num = (Integer) MultiOut.this.tagColIdx.get(tupleTag.getId());
                    if (num != null) {
                        deque.add(ScalaInterop.tuple(num, windowedValue));
                    }
                }
            };
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory
        public /* bridge */ /* synthetic */ Object apply(Object obj) {
            return super.apply((Iterator) obj);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnPartitionIteratorFactory$SingleOut.class */
    private static class SingleOut<InT, OutT> extends DoFnPartitionIteratorFactory<InT, OutT, WindowedValue<OutT>> {
        private SingleOut(Supplier<PipelineOptions> supplier, MetricsAccumulator metricsAccumulator, DoFnRunnerFactory<InT, OutT> doFnRunnerFactory) {
            super(supplier, metricsAccumulator, doFnRunnerFactory);
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory
        DoFnRunners.OutputManager outputManager(final Deque<WindowedValue<OutT>> deque) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory.SingleOut.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    deque.add(windowedValue);
                }
            };
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnPartitionIteratorFactory
        public /* bridge */ /* synthetic */ Object apply(Object obj) {
            return super.apply((Iterator) obj);
        }
    }

    private DoFnPartitionIteratorFactory(Supplier<PipelineOptions> supplier, MetricsAccumulator metricsAccumulator, DoFnRunnerFactory<InT, FnOutT> doFnRunnerFactory) {
        this.options = supplier;
        this.metrics = metricsAccumulator;
        this.factory = doFnRunnerFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <InT, OutT> DoFnPartitionIteratorFactory<InT, ?, WindowedValue<OutT>> singleOutput(Supplier<PipelineOptions> supplier, MetricsAccumulator metricsAccumulator, DoFnRunnerFactory<InT, OutT> doFnRunnerFactory) {
        return new SingleOut(supplier, metricsAccumulator, doFnRunnerFactory);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <InT, FnOutT, OutT> DoFnPartitionIteratorFactory<InT, ?, Tuple2<Integer, WindowedValue<OutT>>> multiOutput(Supplier<PipelineOptions> supplier, MetricsAccumulator metricsAccumulator, DoFnRunnerFactory<InT, FnOutT> doFnRunnerFactory, Map<String, Integer> map) {
        return new MultiOut(supplier, metricsAccumulator, doFnRunnerFactory, map);
    }

    @Override // 
    public Iterator<OutT> apply(Iterator<WindowedValue<InT>> iterator) {
        return iterator.hasNext() ? ScalaInterop.scalaIterator((java.util.Iterator) new DoFnPartitionIt(iterator)) : Iterator.empty();
    }

    abstract DoFnRunners.OutputManager outputManager(Deque<OutT> deque);
}
