package org.apache.beam.runners.spark.translation;

import java.util.ArrayDeque;
import java.util.Iterator;
import javax.annotation.CheckForNull;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
import scala.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: SparkInputDataProcessor.java */
/* loaded from: input_file:org/apache/beam/runners/spark/translation/UnboundedSparkInputDataProcessor.class */
public class UnboundedSparkInputDataProcessor<FnInputT, FnOutputT> implements SparkInputDataProcessor<FnInputT, FnOutputT, Tuple2<TupleTag<?>, WindowedValue<?>>> {
    private final UnboundedDoFnOutputManager outputManager = new UnboundedDoFnOutputManager();

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: SparkInputDataProcessor.java */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/UnboundedSparkInputDataProcessor$UnboundedDoFnOutputManager.class */
    public static class UnboundedDoFnOutputManager implements DoFnRunners.OutputManager, Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>> {
        private final ArrayDeque<Tuple2<TupleTag<?>, WindowedValue<?>>> outputs;

        private UnboundedDoFnOutputManager() {
            this.outputs = new ArrayDeque<>();
        }

        public void clear() {
            this.outputs.clear();
        }

        @Override // java.lang.Iterable
        public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() {
            return this.outputs.iterator();
        }

        public synchronized <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            this.outputs.addLast(new Tuple2<>(tupleTag, windowedValue));
        }
    }

    /* compiled from: SparkInputDataProcessor.java */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/UnboundedSparkInputDataProcessor$UnboundedInOutIterator.class */
    private class UnboundedInOutIterator<K> extends AbstractIterator<Tuple2<TupleTag<?>, WindowedValue<?>>> {
        private final Iterator<WindowedValue<FnInputT>> inputIterator;
        private final SparkProcessContext<K, FnInputT, FnOutputT> ctx;
        private Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> outputIterator;
        private boolean isBundleStarted;
        private boolean isBundleFinished;

        UnboundedInOutIterator(Iterator<WindowedValue<FnInputT>> it, SparkProcessContext<K, FnInputT, FnOutputT> sparkProcessContext) {
            this.inputIterator = it;
            this.ctx = sparkProcessContext;
            this.outputIterator = UnboundedSparkInputDataProcessor.this.outputManager.iterator();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @CheckForNull
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public Tuple2<TupleTag<?>, WindowedValue<?>> m87computeNext() {
            try {
                if (!this.isBundleStarted) {
                    this.isBundleStarted = true;
                    this.ctx.getDoFnRunner().startBundle();
                }
                while (!this.outputIterator.hasNext()) {
                    UnboundedSparkInputDataProcessor.this.outputManager.clear();
                    if (this.inputIterator.hasNext()) {
                        this.ctx.getDoFnRunner().processElement(this.inputIterator.next());
                        this.outputIterator = UnboundedSparkInputDataProcessor.this.outputManager.iterator();
                    } else if (this.ctx.getTimerDataIterator().hasNext()) {
                        fireTimer(this.ctx.getTimerDataIterator().next());
                        this.outputIterator = UnboundedSparkInputDataProcessor.this.outputManager.iterator();
                    } else {
                        if (this.isBundleFinished) {
                            DoFnInvokers.invokerFor(this.ctx.getDoFn()).invokeTeardown();
                            return (Tuple2) endOfData();
                        }
                        this.isBundleFinished = true;
                        this.ctx.getDoFnRunner().finishBundle();
                        this.outputIterator = UnboundedSparkInputDataProcessor.this.outputManager.iterator();
                    }
                }
                return this.outputIterator.next();
            } catch (RuntimeException e) {
                DoFnInvokers.invokerFor(this.ctx.getDoFn()).invokeTeardown();
                throw e;
            }
        }

        private void fireTimer(TimerInternals.TimerData timerData) {
            StateNamespaces.WindowNamespace namespace = timerData.getNamespace();
            Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
            this.ctx.getDoFnRunner().onTimer(timerData.getTimerId(), timerData.getTimerFamilyId(), this.ctx.getKey(), namespace.getWindow(), timerData.getTimestamp(), timerData.getOutputTimestamp(), timerData.getDomain());
        }
    }

    @Override // org.apache.beam.runners.spark.translation.SparkInputDataProcessor
    public DoFnRunners.OutputManager getOutputManager() {
        return this.outputManager;
    }

    @Override // org.apache.beam.runners.spark.translation.SparkInputDataProcessor
    public <K> Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> createOutputIterator(Iterator<WindowedValue<FnInputT>> it, SparkProcessContext<K, FnInputT, FnOutputT> sparkProcessContext) {
        return new UnboundedInOutIterator(it, sparkProcessContext);
    }
}
