package org.apache.spark.streaming.dstream;

import org.apache.spark.Partitioner;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.RDD$;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Interval;
import org.apache.spark.streaming.Time;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.C$colon$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ReducedWindowedDStream.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055c!\u0002\f\u0018\u0001e\t\u0003\u0002\u0003\u001f\u0001\u0005\u0003\u0005\u000b\u0011B\u0012\t\u0011u\u0002!\u0011!Q\u0001\nyB\u0001\"\u0011\u0001\u0003\u0002\u0003\u0006IA\u0010\u0005\t\u0005\u0002\u0011\t\u0011)A\u0005\u0007\"AA\n\u0001B\u0001B\u0003%Q\n\u0003\u0005R\u0001\t\u0005\t\u0015!\u0003N\u0011!\u0011\u0006A!A!\u0002\u0013\u0019\u0006\u0002C,\u0001\u0005\u0007\u0005\u000b1\u0002-\t\u0011y\u0003!1!Q\u0001\f}CQ\u0001\u0019\u0001\u0005\u0002\u0005Dq!\u001c\u0001C\u0002\u0013%a\u000e\u0003\u0004p\u0001\u0001\u0006Ia\t\u0005\u0006a\u0002!\t!\u001d\u0005\u0006e\u0002!\te\u001d\u0005\u0007\u0003\u0017\u0001A\u0011I9\t\u0013\u00055\u0001A1A\u0005B\u0005=\u0001bBA\t\u0001\u0001\u0006I!\u0013\u0005\u0007\u0003'\u0001A\u0011I9\t\u000f\u0005U\u0001\u0001\"\u0011\u0002\u0018!9\u0011\u0011\u0006\u0001\u0005B\u0005-\u0002bBA\u0019\u0001\u0011\u0005\u00131\u0007\u0002\u0017%\u0016$WoY3e/&tGm\\<fI\u0012\u001bFO]3b[*\u0011\u0001$G\u0001\bIN$(/Z1n\u0015\tQ2$A\u0005tiJ,\u0017-\\5oO*\u0011A$H\u0001\u0006gB\f'o\u001b\u0006\u0003=}\ta!\u00199bG\",'\"\u0001\u0011\u0002\u0007=\u0014x-F\u0002#_i\u001a\"\u0001A\u0012\u0011\u0007\u0011*s%D\u0001\u0018\u0013\t1sCA\u0004E'R\u0014X-Y7\u0011\t!ZS&O\u0007\u0002S)\t!&A\u0003tG\u0006d\u0017-\u0003\u0002-S\t1A+\u001e9mKJ\u0002\"AL\u0018\r\u0001\u0011)\u0001\u0007\u0001b\u0001e\t\t1j\u0001\u0001\u0012\u0005M2\u0004C\u0001\u00155\u0013\t)\u0014FA\u0004O_RD\u0017N\\4\u0011\u0005!:\u0014B\u0001\u001d*\u0005\r\te.\u001f\t\u0003]i\"Qa\u000f\u0001C\u0002I\u0012\u0011AV\u0001\u0007a\u0006\u0014XM\u001c;\u0002\u0015I,G-^2f\rVt7\rE\u0003)\u007feJ\u0014(\u0003\u0002AS\tIa)\u001e8di&|gNM\u0001\u000eS:4(+\u001a3vG\u00164UO\\2\u0002\u0015\u0019LG\u000e^3s\rVt7\rE\u0002)\t\u001aK!!R\u0015\u0003\r=\u0003H/[8o!\u0011AsiJ%\n\u0005!K#!\u0003$v]\u000e$\u0018n\u001c82!\tA#*\u0003\u0002LS\t9!i\\8mK\u0006t\u0017aD0xS:$wn\u001e#ve\u0006$\u0018n\u001c8\u0011\u00059{U\"A\r\n\u0005AK\"\u0001\u0003#ve\u0006$\u0018n\u001c8\u0002\u001d}\u001bH.\u001b3f\tV\u0014\u0018\r^5p]\u0006Y\u0001/\u0019:uSRLwN\\3s!\t!V+D\u0001\u001c\u0013\t16DA\u0006QCJ$\u0018\u000e^5p]\u0016\u0014\u0018AC3wS\u0012,gnY3%cA\u0019\u0011\fX\u0017\u000e\u0003iS!aW\u0015\u0002\u000fI,g\r\\3di&\u0011QL\u0017\u0002\t\u00072\f7o\u001d+bO\u0006QQM^5eK:\u001cW\r\n\u001a\u0011\u0007ec\u0016(\u0001\u0004=S:LGO\u0010\u000b\tE\u001a<\u0007.\u001b6lYR\u00191\rZ3\u0011\t\u0011\u0002Q&\u000f\u0005\u0006/*\u0001\u001d\u0001\u0017\u0005\u0006=*\u0001\u001da\u0018\u0005\u0006y)\u0001\ra\t\u0005\u0006{)\u0001\rA\u0010\u0005\u0006\u0003*\u0001\rA\u0010\u0005\u0006\u0005*\u0001\ra\u0011\u0005\u0006\u0019*\u0001\r!\u0014\u0005\u0006#*\u0001\r!\u0014\u0005\u0006%*\u0001\raU\u0001\u000ee\u0016$WoY3e'R\u0014X-Y7\u0016\u0003\r\naB]3ek\u000e,Gm\u0015;sK\u0006l\u0007%\u0001\bxS:$wn\u001e#ve\u0006$\u0018n\u001c8\u0016\u00035\u000bA\u0002Z3qK:$WM\\2jKN,\u0012\u0001\u001e\t\u0005kv\f\tA\u0004\u0002ww:\u0011qO_\u0007\u0002q*\u0011\u00110M\u0001\u0007yI|w\u000e\u001e \n\u0003)J!\u0001`\u0015\u0002\u000fA\f7m[1hK&\u0011ap \u0002\u0005\u0019&\u001cHO\u0003\u0002}SA\"\u00111AA\u0004!\u0011!S%!\u0002\u0011\u00079\n9\u0001\u0002\u0006\u0002\n9\t\t\u0011!A\u0003\u0002I\u00121a\u0018\u00132\u00035\u0019H.\u001b3f\tV\u0014\u0018\r^5p]\u0006qQ.^:u\u0007\",7m\u001b9pS:$X#A%\u0002\u001f5,8\u000f^\"iK\u000e\\\u0007o\\5oi\u0002\na\u0003]1sK:$(+Z7f[\n,'\u000fR;sCRLwN\\\u0001\ba\u0016\u00148/[:u)\r\u0019\u0013\u0011\u0004\u0005\b\u00037\u0019\u0002\u0019AA\u000f\u00031\u0019Ho\u001c:bO\u0016dUM^3m!\u0011\ty\"!\n\u000e\u0005\u0005\u0005\"bAA\u00127\u000591\u000f^8sC\u001e,\u0017\u0002BA\u0014\u0003C\u0011Ab\u0015;pe\u0006<W\rT3wK2\f!b\u00195fG.\u0004x.\u001b8u)\r\u0019\u0013Q\u0006\u0005\u0007\u0003_!\u0002\u0019A'\u0002\u0011%tG/\u001a:wC2\fqaY8naV$X\r\u0006\u0003\u00026\u0005\r\u0003\u0003\u0002\u0015E\u0003o\u0001R!!\u000f\u0002@\u001dj!!a\u000f\u000b\u0007\u0005u2$A\u0002sI\u0012LA!!\u0011\u0002<\t\u0019!\u000b\u0012#\t\u000f\u0005\u0015S\u00031\u0001\u0002H\u0005Ia/\u00197jIRKW.\u001a\t\u0004\u001d\u0006%\u0013bAA&3\t!A+[7f\u0001")
/* loaded from: input_file:org/apache/spark/streaming/dstream/ReducedWindowedDStream.class */
public class ReducedWindowedDStream<K, V> extends DStream<Tuple2<K, V>> {
    private final DStream<Tuple2<K, V>> parent;
    private final Function2<V, V, V> reduceFunc;
    private final Function2<V, V, V> invReduceFunc;
    private final Option<Function1<Tuple2<K, V>, Object>> filterFunc;
    private final Duration _windowDuration;
    private final Duration _slideDuration;
    private final Partitioner partitioner;
    private final ClassTag<K> evidence$1;
    private final DStream<Tuple2<K, V>> reducedStream;
    private final boolean mustCheckpoint;

    private DStream<Tuple2<K, V>> reducedStream() {
        return this.reducedStream;
    }

    public Duration windowDuration() {
        return this._windowDuration;
    }

    @Override // org.apache.spark.streaming.dstream.DStream
    public List<DStream<?>> dependencies() {
        return new C$colon$colon(reducedStream(), Nil$.MODULE$);
    }

    @Override // org.apache.spark.streaming.dstream.DStream
    public Duration slideDuration() {
        return this._slideDuration;
    }

    @Override // org.apache.spark.streaming.dstream.DStream
    public boolean mustCheckpoint() {
        return this.mustCheckpoint;
    }

    @Override // org.apache.spark.streaming.dstream.DStream
    public Duration parentRememberDuration() {
        return rememberDuration().$plus(windowDuration());
    }

    @Override // org.apache.spark.streaming.dstream.DStream
    public DStream<Tuple2<K, V>> persist(StorageLevel storageLevel) {
        super.persist(storageLevel);
        reducedStream().persist(storageLevel);
        return this;
    }

    @Override // org.apache.spark.streaming.dstream.DStream
    public DStream<Tuple2<K, V>> checkpoint(Duration duration) {
        super.checkpoint(duration);
        return this;
    }

    @Override // org.apache.spark.streaming.dstream.DStream
    public Option<RDD<Tuple2<K, V>>> compute(Time time) {
        Function2<V, V, V> function2 = this.reduceFunc;
        Function2<V, V, V> function22 = this.invReduceFunc;
        Interval interval = new Interval(time.$minus(windowDuration()).$plus(this.parent.slideDuration()), time);
        Interval $minus = interval.$minus(slideDuration());
        logDebug(() -> {
            return new StringBuilder(14).append("Window time = ").append(this.windowDuration()).toString();
        });
        logDebug(() -> {
            return new StringBuilder(13).append("Slide time = ").append(this.slideDuration()).toString();
        });
        logDebug(() -> {
            return new StringBuilder(12).append("Zero time = ").append(this.zeroTime()).toString();
        });
        logDebug(() -> {
            return new StringBuilder(17).append("Current window = ").append(interval).toString();
        });
        logDebug(() -> {
            return new StringBuilder(18).append("Previous window = ").append($minus).toString();
        });
        Seq<RDD<Tuple2<K, V>>> slice = reducedStream().slice($minus.beginTime(), interval.beginTime().$minus(this.parent.slideDuration()));
        logDebug(() -> {
            return new StringBuilder(13).append("# old RDDs = ").append(slice.size()).toString();
        });
        Seq<RDD<Tuple2<K, V>>> slice2 = reducedStream().slice($minus.endTime().$plus(this.parent.slideDuration()), interval.endTime());
        logDebug(() -> {
            return new StringBuilder(13).append("# new RDDs = ").append(slice2.size()).toString();
        });
        CoGroupedRDD coGroupedRDD = new CoGroupedRDD(new ArrayBuffer().$plus$eq((ArrayBuffer) getOrCompute($minus.endTime()).getOrElse(() -> {
            SparkContext sc = this.ssc().sc();
            return sc.makeRDD((Seq) Seq$.MODULE$.apply(Nil$.MODULE$), sc.makeRDD$default$2(), ClassTag$.MODULE$.apply(Tuple2.class));
        })).mo17556$plus$plus$eq((TraversableOnce) slice).mo17556$plus$plus$eq((TraversableOnce) slice2).toSeq(), this.partitioner, this.evidence$1);
        int size = slice.size();
        int size2 = slice2.size();
        Function1 function1 = iterableArr -> {
            if (iterableArr.length != 1 + size + size2) {
                throw new Exception("Unexpected number of sequences of reduced values");
            }
            IndexedSeq indexedSeq = (IndexedSeq) ((TraversableLike) ((TraversableLike) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), size).map(obj -> {
                return $anonfun$compute$10(iterableArr, BoxesRunTime.unboxToInt(obj));
            }, IndexedSeq$.MODULE$.canBuildFrom())).filter(iterable -> {
                return BoxesRunTime.boxToBoolean($anonfun$compute$11(iterable));
            })).map(iterable2 -> {
                return iterable2.mo17447head();
            }, IndexedSeq$.MODULE$.canBuildFrom());
            IndexedSeq indexedSeq2 = (IndexedSeq) ((TraversableLike) ((TraversableLike) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), size2).map(obj2 -> {
                return $anonfun$compute$13(iterableArr, size, BoxesRunTime.unboxToInt(obj2));
            }, IndexedSeq$.MODULE$.canBuildFrom())).filter(iterable3 -> {
                return BoxesRunTime.boxToBoolean($anonfun$compute$14(iterable3));
            })).map(iterable4 -> {
                return iterable4.mo17447head();
            }, IndexedSeq$.MODULE$.canBuildFrom());
            if (iterableArr[0].isEmpty()) {
                if (indexedSeq2.isEmpty()) {
                    throw new Exception("Neither previous window has value for key, nor new values found. Are you sure your key class hashes consistently?");
                }
                return indexedSeq2.reduce(function2);
            }
            Object head = iterableArr[0].mo17447head();
            if (!indexedSeq.isEmpty()) {
                head = function22.mo17477apply(head, indexedSeq.reduce(function2));
            }
            if (!indexedSeq2.isEmpty()) {
                head = function2.mo17477apply(head, indexedSeq2.reduce(function2));
            }
            return head;
        };
        ClassTag<K> classTag = this.evidence$1;
        ClassTag<V> apply = ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Iterable.class));
        RDD$.MODULE$.rddToPairRDDFunctions$default$4(coGroupedRDD);
        RDD<Tuple2<K, U>> mapValues = RDD$.MODULE$.rddToPairRDDFunctions(coGroupedRDD, classTag, apply, null).mapValues(function1);
        return this.filterFunc.isDefined() ? new Some(mapValues.filter(this.filterFunc.get())) : new Some(mapValues);
    }

    public static final /* synthetic */ Iterable $anonfun$compute$10(Iterable[] iterableArr, int i) {
        return iterableArr[i];
    }

    public static final /* synthetic */ boolean $anonfun$compute$11(Iterable iterable) {
        return !iterable.isEmpty();
    }

    public static final /* synthetic */ Iterable $anonfun$compute$13(Iterable[] iterableArr, int i, int i2) {
        return iterableArr[i + i2];
    }

    public static final /* synthetic */ boolean $anonfun$compute$14(Iterable iterable) {
        return !iterable.isEmpty();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ReducedWindowedDStream(DStream<Tuple2<K, V>> dStream, Function2<V, V, V> function2, Function2<V, V, V> function22, Option<Function1<Tuple2<K, V>, Object>> option, Duration duration, Duration duration2, Partitioner partitioner, ClassTag<K> classTag, ClassTag<V> classTag2) {
        super(dStream.ssc(), ClassTag$.MODULE$.apply(Tuple2.class));
        this.parent = dStream;
        this.reduceFunc = function2;
        this.invReduceFunc = function22;
        this.filterFunc = option;
        this._windowDuration = duration;
        this._slideDuration = duration2;
        this.partitioner = partitioner;
        this.evidence$1 = classTag;
        Predef$.MODULE$.require(duration.isMultipleOf(dStream.slideDuration()), () -> {
            return new StringBuilder(108).append("The window duration of ReducedWindowedDStream (").append(this._windowDuration).append(") ").append("must be multiple of the slide duration of parent DStream (").append(this.parent.slideDuration()).append(")").toString();
        });
        Predef$.MODULE$.require(duration2.isMultipleOf(dStream.slideDuration()), () -> {
            return new StringBuilder(107).append("The slide duration of ReducedWindowedDStream (").append(this._slideDuration).append(") ").append("must be multiple of the slide duration of parent DStream (").append(this.parent.slideDuration()).append(")").toString();
        });
        DStream$.MODULE$.toPairDStreamFunctions$default$4(dStream);
        this.reducedStream = DStream$.MODULE$.toPairDStreamFunctions(dStream, classTag, classTag2, null).reduceByKey(function2, partitioner);
        super.persist(StorageLevel$.MODULE$.MEMORY_ONLY_SER());
        reducedStream().persist(StorageLevel$.MODULE$.MEMORY_ONLY_SER());
        this.mustCheckpoint = true;
    }
}
