package org.apache.beam.runners.spark.translation.streaming;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.joda.time.Instant;
import scala.Option;
import scala.reflect.ClassTag;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/TestDStream.class */
public class TestDStream<T> extends InputDStream<WindowedValue<T>> {
    private final Coder<WindowedValue<T>> coder;

    @SuppressFBWarnings({"SE_TRANSIENT_FIELD_NOT_RESTORED"})
    @Nullable
    private final transient List<TestStream.Event<T>> events;
    private int currentEventIndex;
    private boolean insertEmptyBatch;
    private long lastValidTimeMs;
    private Instant lastWatermark;

    public TestDStream(TestStream<T> testStream, StreamingContext streamingContext) {
        super(streamingContext, classTag());
        this.currentEventIndex = 0;
        this.insertEmptyBatch = false;
        this.lastValidTimeMs = 0L;
        this.lastWatermark = Instant.EPOCH;
        this.coder = WindowedValue.getFullCoder(testStream.getValueCoder(), GlobalWindow.Coder.INSTANCE);
        this.events = testStream.getEvents();
    }

    public Option<RDD<WindowedValue<T>>> compute(Time time) {
        TestStream.Event<T> nextEvent = this.insertEmptyBatch ? null : nextEvent();
        if (nextEvent == null) {
            this.insertEmptyBatch = false;
            waitForLastBatch(time);
            return Option.apply(emptyRDD());
        }
        if (nextEvent instanceof TestStream.ElementEvent) {
            waitForLastBatch(time);
            return Option.apply(buildRdd((TestStream.ElementEvent) nextEvent));
        }
        if (!(nextEvent instanceof TestStream.WatermarkEvent)) {
            if (nextEvent instanceof TestStream.ProcessingTimeEvent) {
                throw new UnsupportedOperationException("Advancing Processing time is not supported by the Spark Runner.");
            }
            throw new IllegalStateException("Unknown event type " + nextEvent);
        }
        waitForLastBatch(time);
        addWatermark(time, (TestStream.WatermarkEvent) nextEvent);
        this.insertEmptyBatch = true;
        return Option.apply(emptyRDD());
    }

    private void waitForLastBatch(Time time) {
        while (GlobalWatermarkHolder.getLastWatermarkedBatchTime() < this.lastValidTimeMs) {
            Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
        }
        this.lastValidTimeMs = time.milliseconds();
    }

    @Nullable
    private TestStream.Event<T> nextEvent() {
        List list = (List) Preconditions.checkStateNotNull(this.events);
        if (list.size() <= this.currentEventIndex) {
            return null;
        }
        int i = this.currentEventIndex;
        this.currentEventIndex = i + 1;
        return (TestStream.Event) list.get(i);
    }

    private void addWatermark(Time time, TestStream.WatermarkEvent<T> watermarkEvent) {
        GlobalWatermarkHolder.SparkWatermarks sparkWatermarks = new GlobalWatermarkHolder.SparkWatermarks(this.lastWatermark, watermarkEvent.getWatermark(), new Instant(time.milliseconds()));
        this.lastWatermark = watermarkEvent.getWatermark();
        GlobalWatermarkHolder.add(id(), sparkWatermarks);
    }

    public void start() {
    }

    public void stop() {
    }

    private RDD<WindowedValue<T>> emptyRDD() {
        return ssc().sparkContext().emptyRDD(classTag());
    }

    private RDD<WindowedValue<T>> buildRdd(TestStream.ElementEvent<T> elementEvent) {
        ArrayList arrayList = new ArrayList();
        for (TimestampedValue timestampedValue : elementEvent.getElements()) {
            arrayList.add(CoderHelpers.toByteArray(WindowedValue.timestampedValueInGlobalWindow(timestampedValue.getValue(), timestampedValue.getTimestamp()), this.coder));
        }
        return new JavaSparkContext(ssc().sparkContext()).parallelize(arrayList).map(CoderHelpers.fromByteFunction(this.coder)).rdd();
    }

    private static <T> ClassTag<WindowedValue<T>> classTag() {
        return JavaSparkContext$.MODULE$.fakeClassTag();
    }
}
