package org.apache.beam.runners.spark.translation.streaming;

import java.io.IOException;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.class */
public class SparkRunnerStreamingContextFactory implements Function0<JavaStreamingContext> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
    private final transient Pipeline pipeline;
    private final transient SparkPipelineOptions options;
    private final transient Checkpoint.CheckpointDir checkpointDir;

    public SparkRunnerStreamingContextFactory(Pipeline pipeline, SparkPipelineOptions sparkPipelineOptions, Checkpoint.CheckpointDir checkpointDir) {
        this.pipeline = pipeline;
        this.options = sparkPipelineOptions;
        this.checkpointDir = checkpointDir;
    }

    /* renamed from: call, reason: merged with bridge method [inline-methods] */
    public JavaStreamingContext m81call() throws Exception {
        LOG.info("Creating a new Spark Streaming Context");
        Preconditions.checkArgument(this.options.getMinReadTimeMillis().longValue() < this.options.getBatchIntervalMillis().longValue(), "Minimum read time has to be less than batch time.");
        Preconditions.checkArgument(this.options.getReadTimePercentage().doubleValue() > 0.0d && this.options.getReadTimePercentage().doubleValue() < 1.0d, "Read time percentage is bound to (0, 1).");
        StreamingTransformTranslator.Translator translator = new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
        Duration duration = new Duration(this.options.getBatchIntervalMillis().longValue());
        LOG.info("Setting Spark streaming batchDuration to {} msec", Long.valueOf(duration.milliseconds()));
        JavaSparkContext sparkContext = SparkContextFactory.getSparkContext(this.options);
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext, duration);
        SparkRunner.initAccumulators(this.options, sparkContext);
        EvaluationContext evaluationContext = new EvaluationContext(sparkContext, this.pipeline, this.options, javaStreamingContext);
        SparkRunner.updateCacheCandidates(this.pipeline, translator, evaluationContext);
        try {
            this.pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, evaluationContext));
            evaluationContext.computeOutputs();
            checkpoint(javaStreamingContext, this.checkpointDir);
            return javaStreamingContext;
        } catch (RuntimeException e) {
            javaStreamingContext.stop(false, false);
            SparkContextFactory.stopSparkContext(sparkContext);
            throw e;
        }
    }

    private void checkpoint(JavaStreamingContext javaStreamingContext, Checkpoint.CheckpointDir checkpointDir) {
        Path rootCheckpointDir = checkpointDir.getRootCheckpointDir();
        Path sparkCheckpointDir = checkpointDir.getSparkCheckpointDir();
        Path beamCheckpointDir = checkpointDir.getBeamCheckpointDir();
        try {
            FileSystem fileSystem = rootCheckpointDir.getFileSystem(javaStreamingContext.sparkContext().hadoopConfiguration());
            if (!fileSystem.exists(rootCheckpointDir)) {
                fileSystem.mkdirs(rootCheckpointDir);
            }
            if (!fileSystem.exists(sparkCheckpointDir)) {
                fileSystem.mkdirs(sparkCheckpointDir);
            }
            if (!fileSystem.exists(beamCheckpointDir)) {
                fileSystem.mkdirs(beamCheckpointDir);
            }
            javaStreamingContext.checkpoint(sparkCheckpointDir.toString());
        } catch (IOException e) {
            throw new RuntimeException("Failed to create checkpoint dir", e);
        }
    }
}
