package org.apache.beam.runners.spark.io;

import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.io.SourceRDD;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.scheduler.RateController;
import org.apache.spark.streaming.scheduler.RateController$;
import org.apache.spark.streaming.scheduler.rate.RateEstimator;
import org.apache.spark.streaming.scheduler.rate.RateEstimator$;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/io/SourceDStream.class */
class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends InputDStream<Tuple2<Source<T>, CheckpointMarkT>> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class);
    private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
    private final SerializablePipelineOptions options;
    private final Duration boundReadDuration;
    private final double readerCacheInterval;
    private final int numPartitions;
    private final int initialParallelism;
    private final long boundMaxRecords;
    private final RateController rateController;

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SourceDStream$SourceRateController.class */
    private static class SourceRateController extends RateController {
        private SourceRateController(int i, RateEstimator rateEstimator) {
            super(i, rateEstimator);
        }

        public void publish(long j) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SourceDStream(StreamingContext streamingContext, UnboundedSource<T, CheckpointMarkT> unboundedSource, SerializablePipelineOptions serializablePipelineOptions, Long l) {
        super(streamingContext, JavaSparkContext$.MODULE$.fakeClassTag());
        this.rateController = new SourceRateController(id(), RateEstimator$.MODULE$.create(ssc().conf(), ssc().graph().batchDuration()));
        this.unboundedSource = unboundedSource;
        this.options = serializablePipelineOptions;
        SparkPipelineOptions sparkPipelineOptions = (SparkPipelineOptions) serializablePipelineOptions.get().as(SparkPipelineOptions.class);
        this.readerCacheInterval = 1.5d * sparkPipelineOptions.getBatchIntervalMillis().longValue();
        this.boundReadDuration = boundReadDuration(sparkPipelineOptions.getReadTimePercentage().doubleValue(), sparkPipelineOptions.getMinReadTimeMillis().longValue());
        this.initialParallelism = ssc().sparkContext().defaultParallelism();
        Preconditions.checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero.");
        this.boundMaxRecords = l.longValue();
        try {
            this.numPartitions = createMicrobatchSource().split(sparkPipelineOptions).size();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time time) {
        return Option.apply(new SourceRDD.Unbounded(ssc().sparkContext(), this.options, createMicrobatchSource(), this.numPartitions));
    }

    private MicrobatchSource<T, CheckpointMarkT> createMicrobatchSource() {
        return new MicrobatchSource<>(this.unboundedSource, this.boundReadDuration, this.initialParallelism, computeReadMaxRecords(), -1, id(), this.readerCacheInterval);
    }

    private long computeReadMaxRecords() {
        if (this.boundMaxRecords > 0) {
            LOG.info("Max records per batch has been set to {}, as configured in the PipelineOptions.", Long.valueOf(this.boundMaxRecords));
            return this.boundMaxRecords;
        }
        Option<Long> rateControlledMaxRecords = rateControlledMaxRecords();
        if (rateControlledMaxRecords.isDefined()) {
            LOG.info("Max records per batch has been set to {}, as advised by the rate controller.", rateControlledMaxRecords.get());
            return ((Long) rateControlledMaxRecords.get()).longValue();
        }
        LOG.info("Max records per batch has not been limited by neither configuration nor the rate controller, and will remain unlimited for the current batch ({}).", Long.MAX_VALUE);
        return Long.MAX_VALUE;
    }

    public void start() {
    }

    public void stop() {
    }

    public String name() {
        return "Beam UnboundedSource [" + id() + "]";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumPartitions() {
        return this.numPartitions;
    }

    private Duration boundReadDuration(double d, long j) {
        Duration duration = new Duration(Math.round(ssc().graph().batchDuration().milliseconds() * d));
        Duration duration2 = new Duration(j);
        Duration duration3 = duration.isLongerThan(duration2) ? duration : duration2;
        LOG.info("Read duration set to: " + duration3);
        return duration3;
    }

    private Option<Long> rateControlledMaxRecords() {
        Option<Long> empty;
        Option<RateController> rateController = rateController();
        if (rateController.isDefined()) {
            long latestRate = ((RateController) rateController.get()).getLatestRate();
            if (latestRate > 0) {
                empty = Option.apply(Long.valueOf(latestRate * (ssc().graph().batchDuration().milliseconds() / 1000)));
                return empty;
            }
        }
        empty = Option.empty();
        return empty;
    }

    public Option<RateController> rateController() {
        return RateController$.MODULE$.isBackPressureEnabled(ssc().conf()) ? Option.apply(this.rateController) : Option.empty();
    }
}
