package org.apache.beam.runners.spark.io;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream$;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.scheduler.StreamInputInfo;
import org.joda.time.Instant;
import scala.Option;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/beam/runners/spark/io/SparkUnboundedSource.class */
public class SparkUnboundedSource {

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SparkUnboundedSource$Metadata.class */
    public static class Metadata implements Serializable {
        private final long numRecords;
        private final Instant lowWatermark;
        private final Instant highWatermark;
        private final long readDurationMillis;
        private final MetricsContainerStepMap metricsContainers;

        public Metadata(long j, Instant instant, Instant instant2, long j2, MetricsContainerStepMap metricsContainerStepMap) {
            this.numRecords = j;
            this.readDurationMillis = j2;
            this.metricsContainers = metricsContainerStepMap;
            this.lowWatermark = instant;
            this.highWatermark = instant2;
        }

        long getNumRecords() {
            return this.numRecords;
        }

        Instant getLowWatermark() {
            return this.lowWatermark;
        }

        Instant getHighWatermark() {
            return this.highWatermark;
        }

        long getReadDurationMillis() {
            return this.readDurationMillis;
        }

        MetricsContainerStepMap getMetricsContainers() {
            return this.metricsContainers;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SparkUnboundedSource$ReadReportDStream.class */
    private static class ReadReportDStream extends DStream<BoxedUnit> {
        private static final String READ_DURATION_MILLIS = "readDurationMillis";
        private static final String NAMESPACE = "spark-runner.io";
        private final DStream<Metadata> parent;
        private final int inputDStreamId;
        private final String sourceName;
        private final String stepName;

        ReadReportDStream(DStream<Metadata> dStream, int i, String str, String str2) {
            super(dStream.ssc(), JavaSparkContext$.MODULE$.fakeClassTag());
            this.parent = dStream;
            this.inputDStreamId = i;
            this.sourceName = str;
            this.stepName = str2;
        }

        public Duration slideDuration() {
            return this.parent.slideDuration();
        }

        public List<DStream<?>> dependencies() {
            return JavaConversions.asScalaBuffer(Collections.singletonList(this.parent)).toList();
        }

        public Option<RDD<BoxedUnit>> compute(Time time) {
            Option orCompute = this.parent.getOrCompute(time);
            MetricsContainerStepMapAccumulator metricsAccumulator = MetricsAccumulator.getInstance();
            long j = 0;
            GlobalWatermarkHolder.SparkWatermarks sparkWatermarks = null;
            Instant instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
            Instant instant2 = BoundedWindow.TIMESTAMP_MIN_VALUE;
            if (orCompute.isDefined()) {
                for (Metadata metadata : ((RDD) orCompute.get()).toJavaRDD().collect()) {
                    j += metadata.getNumRecords();
                    Instant lowWatermark = metadata.getLowWatermark();
                    instant = instant.isBefore(lowWatermark) ? lowWatermark : instant;
                    Instant highWatermark = metadata.getHighWatermark();
                    instant2 = instant2.isBefore(highWatermark) ? highWatermark : instant2;
                    Gauge gauge = Metrics.gauge(NAMESPACE, READ_DURATION_MILLIS);
                    try {
                        Closeable scopedMetricsContainer = MetricsEnvironment.scopedMetricsContainer(metadata.getMetricsContainers().getContainer(this.stepName));
                        Throwable th = null;
                        try {
                            try {
                                long readDurationMillis = metadata.getReadDurationMillis();
                                if (readDurationMillis > 0) {
                                    gauge.set(readDurationMillis);
                                }
                                if (scopedMetricsContainer != null) {
                                    if (0 != 0) {
                                        try {
                                            scopedMetricsContainer.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        scopedMetricsContainer.close();
                                    }
                                }
                                metricsAccumulator.m27value().updateAll(metadata.getMetricsContainers());
                            } finally {
                            }
                        } finally {
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                sparkWatermarks = new GlobalWatermarkHolder.SparkWatermarks(instant, instant2, new Instant(time.milliseconds()));
                GlobalWatermarkHolder.add(this.inputDStreamId, sparkWatermarks);
            }
            report(time, j, sparkWatermarks);
            return Option.empty();
        }

        private void report(Time time, long j, GlobalWatermarkHolder.SparkWatermarks sparkWatermarks) {
            String METADATA_KEY_DESCRIPTION = StreamInputInfo.METADATA_KEY_DESCRIPTION();
            Object[] objArr = new Object[4];
            objArr[0] = Long.valueOf(j);
            objArr[1] = sparkWatermarks == null ? "N/A" : sparkWatermarks;
            objArr[2] = this.sourceName;
            objArr[3] = time;
            ssc().scheduler().inputInfoTracker().reportInfo(time, new StreamInputInfo(this.inputDStreamId, j, new Map.Map1(METADATA_KEY_DESCRIPTION, String.format("Read %d records with observed watermarks %s, from %s for batch time: %s", objArr))));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SparkUnboundedSource$Tuple2MetadataFunction.class */
    private static class Tuple2MetadataFunction implements Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata> {
        private Tuple2MetadataFunction() {
        }

        public Metadata call(Tuple2<Iterable<byte[]>, Metadata> tuple2) throws Exception {
            return (Metadata) tuple2._2();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SparkUnboundedSource$Tuple2byteFlatMapFunction.class */
    private static class Tuple2byteFlatMapFunction implements FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]> {
        private Tuple2byteFlatMapFunction() {
        }

        public Iterator<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> tuple2) throws Exception {
            return ((Iterable) tuple2._1()).iterator();
        }
    }

    public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> UnboundedDataset<T> read(JavaStreamingContext javaStreamingContext, SerializablePipelineOptions serializablePipelineOptions, UnboundedSource<T, CheckpointMarkT> unboundedSource, String str) {
        SparkPipelineOptions sparkPipelineOptions = (SparkPipelineOptions) serializablePipelineOptions.get().as(SparkPipelineOptions.class);
        SourceDStream sourceDStream = new SourceDStream(javaStreamingContext.ssc(), unboundedSource, serializablePipelineOptions, sparkPipelineOptions.getMaxRecordsPerBatch());
        JavaPairInputDStream fromInputDStream = JavaPairInputDStream$.MODULE$.fromInputDStream(sourceDStream, JavaSparkContext$.MODULE$.fakeClassTag(), JavaSparkContext$.MODULE$.fakeClassTag());
        JavaMapWithStateDStream mapWithState = fromInputDStream.mapWithState(StateSpec.function(StateSpecFunctions.mapSourceFunction(serializablePipelineOptions, str)).numPartitions(sourceDStream.getNumPartitions()));
        checkpointStream(mapWithState, sparkPipelineOptions);
        int id = fromInputDStream.inputDStream().id();
        new ReadReportDStream(mapWithState.map(new Tuple2MetadataFunction()).dstream(), id, getSourceName(unboundedSource, id), str).register();
        return new UnboundedDataset<>(mapWithState.flatMap(new Tuple2byteFlatMapFunction()).map(CoderHelpers.fromByteFunction(WindowedValue.FullWindowedValueCoder.of(unboundedSource.getOutputCoder(), GlobalWindow.Coder.INSTANCE))), Collections.singletonList(Integer.valueOf(id)));
    }

    private static <T> String getSourceName(Source<T> source, int i) {
        StringBuilder sb = new StringBuilder();
        Iterator<T> it = Splitter.onPattern("(?=[A-Z])").split(source.getClass().getSimpleName().replace("$", "")).iterator();
        while (it.hasNext()) {
            String trim = ((String) it.next()).trim();
            if (!trim.isEmpty()) {
                sb.append(trim).append(" ");
            }
        }
        return sb.append("[").append(i).append("]").toString();
    }

    private static void checkpointStream(JavaDStream<?> javaDStream, SparkPipelineOptions sparkPipelineOptions) {
        long longValue = sparkPipelineOptions.getCheckpointDurationMillis().longValue();
        if (longValue > 0) {
            javaDStream.checkpoint(new Duration(longValue));
        }
    }
}
