package org.apache.beam.runners.spark.translation.streaming;

import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.class */
class WatermarkSyncedDStream<T> extends InputDStream<WindowedValue<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(WatermarkSyncedDStream.class.getCanonicalName() + "#compute");
    private static final int SLEEP_DURATION_MILLIS = 10;
    private final Queue<JavaRDD<WindowedValue<T>>> rdds;
    private final Long batchDuration;
    private volatile boolean isFirst;

    public WatermarkSyncedDStream(Queue<JavaRDD<WindowedValue<T>>> queue, Long l, StreamingContext streamingContext) {
        super(streamingContext, JavaSparkContext$.MODULE$.fakeClassTag());
        this.isFirst = true;
        this.rdds = queue;
        this.batchDuration = l;
    }

    private void awaitWatermarkSyncWith(long j) {
        while (!isFirstBatch() && watermarkOutOfSync(j)) {
            Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
        }
        Preconditions.checkState(isFirstBatch() || watermarkIsOneBatchBehind(j), String.format("Watermark batch time:[%d] should be exactly one batch behind current batch time:[%d]", Long.valueOf(GlobalWatermarkHolder.getLastWatermarkedBatchTime()), Long.valueOf(j)));
    }

    private boolean watermarkOutOfSync(long j) {
        return j - GlobalWatermarkHolder.getLastWatermarkedBatchTime() > this.batchDuration.longValue();
    }

    private boolean isFirstBatch() {
        return this.isFirst;
    }

    private RDD<WindowedValue<T>> generateRdd() {
        return this.rdds.size() > 0 ? this.rdds.poll().rdd() : ssc().sparkContext().emptyRDD(JavaSparkContext$.MODULE$.fakeClassTag());
    }

    private boolean watermarkIsOneBatchBehind(long j) {
        return GlobalWatermarkHolder.getLastWatermarkedBatchTime() == j - this.batchDuration.longValue();
    }

    public Option<RDD<WindowedValue<T>>> compute(Time time) {
        long milliseconds = time.milliseconds();
        LOG.trace("BEFORE waiting for watermark sync, LastWatermarkedBatchTime: {}, current batch time: {}", Long.valueOf(GlobalWatermarkHolder.getLastWatermarkedBatchTime()), Long.valueOf(milliseconds));
        Stopwatch createStarted = Stopwatch.createStarted();
        awaitWatermarkSyncWith(milliseconds);
        createStarted.stop();
        LOG.info("Waited {} millis for watermarks to sync up with the current batch ({})", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)), Long.valueOf(milliseconds));
        LOG.info("Watermarks are now: {}", GlobalWatermarkHolder.get(this.batchDuration));
        LOG.trace("AFTER waiting for watermark sync, LastWatermarkedBatchTime: {}, current batch time: {}", Long.valueOf(GlobalWatermarkHolder.getLastWatermarkedBatchTime()), Long.valueOf(milliseconds));
        RDD<WindowedValue<T>> generateRdd = generateRdd();
        this.isFirst = false;
        return Option.apply(generateRdd);
    }

    public void start() {
    }

    public void stop() {
    }
}
