package org.apache.beam.runners.spark.translation;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.dataflow.qual.Pure;
import scala.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: SparkInputDataProcessor.java */
/* loaded from: input_file:org/apache/beam/runners/spark/translation/BoundedSparkInputDataProcessor.class */
public class BoundedSparkInputDataProcessor<FnInputT, FnOutputT> implements SparkInputDataProcessor<FnInputT, FnOutputT, Tuple2<TupleTag<?>, WindowedValue<?>>> {
    private final BoundedDoFnOutputManager outputManager = new BoundedDoFnOutputManager();

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: SparkInputDataProcessor.java */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/BoundedSparkInputDataProcessor$BoundedDoFnOutputManager.class */
    public static class BoundedDoFnOutputManager implements DoFnRunners.OutputManager, Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>> {
        private final LinkedBlockingQueue<Tuple2<TupleTag<?>, WindowedValue<?>>> queue;
        private volatile boolean stopped;

        private BoundedDoFnOutputManager() {
            this.queue = new LinkedBlockingQueue<>(500);
            this.stopped = false;
        }

        public void stop() {
            this.stopped = true;
        }

        @Override // java.lang.Iterable
        public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() {
            return new Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>>() { // from class: org.apache.beam.runners.spark.translation.BoundedSparkInputDataProcessor.BoundedDoFnOutputManager.1
                private Tuple2<TupleTag<?>, WindowedValue<?>> next = null;

                @Override // java.util.Iterator
                @Pure
                public boolean hasNext() {
                    while (this.next == null && (!BoundedDoFnOutputManager.this.stopped || !BoundedDoFnOutputManager.this.queue.isEmpty())) {
                        try {
                            this.next = (Tuple2) BoundedDoFnOutputManager.this.queue.poll(1000L, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new IllegalStateException(e);
                        }
                    }
                    return this.next != null;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public Tuple2<TupleTag<?>, WindowedValue<?>> next() {
                    if (this.next == null && !hasNext()) {
                        throw new NoSuchElementException();
                    }
                    Tuple2<TupleTag<?>, WindowedValue<?>> tuple2 = this.next;
                    this.next = null;
                    return tuple2;
                }
            };
        }

        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            try {
                Preconditions.checkState(!this.stopped, "Output called on already stopped manager");
                this.queue.put(new Tuple2<>(tupleTag, windowedValue));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }
    }

    /* compiled from: SparkInputDataProcessor.java */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/BoundedSparkInputDataProcessor$BoundedInOutIterator.class */
    private class BoundedInOutIterator<K, InputT, OutputT> extends AbstractIterator<Tuple2<TupleTag<?>, WindowedValue<?>>> {
        private final SparkProcessContext<K, InputT, OutputT> ctx;
        private final Iterator<WindowedValue<InputT>> inputIterator;
        private final Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> outputIterator;
        private final ExecutorService executorService;
        private Future<?> outputProducerTask = null;
        private volatile RuntimeException inputConsumeFailure = null;

        BoundedInOutIterator(Iterator<WindowedValue<InputT>> it, SparkProcessContext<K, InputT, OutputT> sparkProcessContext) {
            this.inputIterator = it;
            this.ctx = sparkProcessContext;
            this.outputIterator = BoundedSparkInputDataProcessor.this.outputManager.iterator();
            this.executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("bounded-in/out-iterator-" + sparkProcessContext.getStepName() + "-%d").setDaemon(true).build());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @CheckForNull
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public Tuple2<TupleTag<?>, WindowedValue<?>> m68computeNext() {
            if (this.outputProducerTask == null) {
                this.outputProducerTask = startOutputProducerTask();
            }
            boolean hasNext = this.outputIterator.hasNext();
            if (this.inputConsumeFailure != null) {
                this.executorService.shutdown();
                throw this.inputConsumeFailure;
            }
            if (hasNext) {
                return this.outputIterator.next();
            }
            this.executorService.shutdown();
            return (Tuple2) endOfData();
        }

        private void fireTimer(TimerInternals.TimerData timerData) {
            StateNamespaces.WindowNamespace namespace = timerData.getNamespace();
            Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
            this.ctx.getDoFnRunner().onTimer(timerData.getTimerId(), timerData.getTimerFamilyId(), this.ctx.getKey(), namespace.getWindow(), timerData.getTimestamp(), timerData.getOutputTimestamp(), timerData.getDomain());
        }

        private Future<?> startOutputProducerTask() {
            return this.executorService.submit(() -> {
                try {
                    this.ctx.getDoFnRunner().startBundle();
                    while (true) {
                        if (this.inputIterator.hasNext()) {
                            this.ctx.getDoFnRunner().processElement(this.inputIterator.next());
                        } else {
                            if (!this.ctx.getTimerDataIterator().hasNext()) {
                                this.ctx.getDoFnRunner().finishBundle();
                                DoFnInvokers.invokerFor(this.ctx.getDoFn()).invokeTeardown();
                                BoundedSparkInputDataProcessor.this.outputManager.stop();
                                return;
                            }
                            fireTimer(this.ctx.getTimerDataIterator().next());
                        }
                    }
                } catch (RuntimeException e) {
                    this.inputConsumeFailure = e;
                    DoFnInvokers.invokerFor(this.ctx.getDoFn()).invokeTeardown();
                    BoundedSparkInputDataProcessor.this.outputManager.stop();
                }
            });
        }
    }

    @Override // org.apache.beam.runners.spark.translation.SparkInputDataProcessor
    public DoFnRunners.OutputManager getOutputManager() {
        return this.outputManager;
    }

    @Override // org.apache.beam.runners.spark.translation.SparkInputDataProcessor
    public <K> Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> createOutputIterator(Iterator<WindowedValue<FnInputT>> it, SparkProcessContext<K, FnInputT, FnOutputT> sparkProcessContext) {
        return new BoundedInOutIterator(it, sparkProcessContext);
    }
}
