package org.apache.spark.streaming;

import java.io.File;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.streaming.TestSuiteBase;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.Utils$;
import org.scalatest.Args;
import org.scalatest.BeforeAndAfter;
import org.scalatest.FunSuiteLike;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.concurrent.PatienceConfiguration;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.mutable.Buffer$;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.RichInt$;

/* compiled from: CheckpointSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005uc\u0001B\u0001\u0003\u0001-\u0011qb\u00115fG.\u0004x.\u001b8u'VLG/\u001a\u0006\u0003\u0007\u0011\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u00151\u0011!B:qCJ\\'BA\u0004\t\u0003\u0019\t\u0007/Y2iK*\t\u0011\"A\u0002pe\u001e\u001c\u0001aE\u0002\u0001\u0019A\u0001\"!\u0004\b\u000e\u0003\u0011I!a\u0004\u0003\u0003\u001bM\u0003\u0018M]6Gk:\u001cV/\u001b;f!\t\t\"#D\u0001\u0003\u0013\t\u0019\"AA\u0007UKN$8+^5uK\n\u000b7/\u001a\u0005\u0006+\u0001!\tAF\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003]\u0001\"!\u0005\u0001\t\u000fe\u0001\u0001\u0019!C\u00015\u0005\u00191o]2\u0016\u0003m\u0001\"!\u0005\u000f\n\u0005u\u0011!\u0001E*ue\u0016\fW.\u001b8h\u0007>tG/\u001a=u\u0011\u001dy\u0002\u00011A\u0005\u0002\u0001\nqa]:d?\u0012*\u0017\u000f\u0006\u0002\"OA\u0011!%J\u0007\u0002G)\tA%A\u0003tG\u0006d\u0017-\u0003\u0002'G\t!QK\\5u\u0011\u001dAc$!AA\u0002m\t1\u0001\u001f\u00132\u0011\u0019Q\u0003\u0001)Q\u00057\u0005!1o]2!\u0011\u0015a\u0003\u0001\"\u0011.\u00035\u0011\u0017\r^2i\tV\u0014\u0018\r^5p]V\ta\u0006\u0005\u0002\u0012_%\u0011\u0001G\u0001\u0002\t\tV\u0014\u0018\r^5p]\")!\u0007\u0001C!g\u0005q!-\u001a4pe\u00164UO\\2uS>tG#A\u0011\t\u000bU\u0002A\u0011I\u001a\u0002\u001b\u00054G/\u001a:Gk:\u001cG/[8o\u0011\u00159\u0004\u0001\"\u00019\u0003e!Xm\u001d;DQ\u0016\u001c7\u000e]8j]R,Gm\u00149fe\u0006$\u0018n\u001c8\u0016\u0007e*%\u000bF\u0003;)\u000e|7\u000fF\u0002\"w9Cq\u0001\u0010\u001c\u0002\u0002\u0003\u000fQ(\u0001\u0006fm&$WM\\2fIE\u00022AP!D\u001b\u0005y$B\u0001!$\u0003\u001d\u0011XM\u001a7fGRL!AQ \u0003\u0011\rc\u0017m]:UC\u001e\u0004\"\u0001R#\r\u0001\u0011)aI\u000eb\u0001\u000f\n\tQ+\u0005\u0002I\u0017B\u0011!%S\u0005\u0003\u0015\u000e\u0012qAT8uQ&tw\r\u0005\u0002#\u0019&\u0011Qj\t\u0002\u0004\u0003:L\bbB(7\u0003\u0003\u0005\u001d\u0001U\u0001\u000bKZLG-\u001a8dK\u0012\u0012\u0004c\u0001 B#B\u0011AI\u0015\u0003\u0006'Z\u0012\ra\u0012\u0002\u0002-\")QK\u000ea\u0001-\u0006)\u0011N\u001c9viB\u0019qk\u00182\u000f\u0005akfBA-]\u001b\u0005Q&BA.\u000b\u0003\u0019a$o\\8u}%\tA%\u0003\u0002_G\u00059\u0001/Y2lC\u001e,\u0017B\u00011b\u0005\r\u0019V-\u001d\u0006\u0003=\u000e\u00022aV0D\u0011\u0015!g\u00071\u0001f\u0003%y\u0007/\u001a:bi&|g\u000e\u0005\u0003#M\"t\u0017BA4$\u0005%1UO\\2uS>t\u0017\u0007E\u0002jY\u000ek\u0011A\u001b\u0006\u0003W\n\tq\u0001Z:ue\u0016\fW.\u0003\u0002nU\n9Ai\u0015;sK\u0006l\u0007cA5m#\")\u0001O\u000ea\u0001c\u0006qQ\r\u001f9fGR,GmT;uaV$\bcA,`eB\u0019qkX)\t\u000bQ4\u0004\u0019A;\u0002#%t\u0017\u000e^5bY:+XNQ1uG\",7\u000f\u0005\u0002#m&\u0011qo\t\u0002\u0004\u0013:$\b\"B=\u0001\t\u0003Q\u0018\u0001G1em\u0006t7-\u001a+j[\u0016<\u0016\u000e\u001e5SK\u0006dG)\u001a7bsV\u001910!\u0001\u0015\u000bq\fI!a\u0003\u0015\u0007u\f\u0019\u0001E\u0002X?z\u00042aV0��!\r!\u0015\u0011\u0001\u0003\u0006'b\u0014\ra\u0012\u0005\n\u0003\u000bA\u0018\u0011!a\u0002\u0003\u000f\t!\"\u001a<jI\u0016t7-\u001a\u00134!\rq\u0014i \u0005\u00063a\u0004\ra\u0007\u0005\b\u0003\u001bA\b\u0019AA\b\u0003)qW/\u001c\"bi\u000eDWm\u001d\t\u0004E\u0005E\u0011bAA\nG\t!Aj\u001c8h\u000f\u001d\t9B\u0001E\u0005\u00033\tqb\u00115fG.\u0004x.\u001b8u'VLG/\u001a\t\u0004#\u0005maAB\u0001\u0003\u0011\u0013\tib\u0005\u0004\u0002\u001c\u0005}\u0011Q\u0005\t\u0004E\u0005\u0005\u0012bAA\u0012G\t1\u0011I\\=SK\u001a\u00042AIA\u0014\u0013\r\tIc\t\u0002\r'\u0016\u0014\u0018.\u00197ju\u0006\u0014G.\u001a\u0005\b+\u0005mA\u0011AA\u0017)\t\tI\u0002\u0003\u0006\u00022\u0005m\u0001\u0019!C\u0001\u0003g\t\u0011EY1uG\"$\u0006N]3f'\"|W\u000f\u001c3CY>\u001c7.\u00138eK\u001aLg.\u001b;fYf,\"!!\u000e\u0011\u0007\t\n9$C\u0002\u0002:\r\u0012qAQ8pY\u0016\fg\u000e\u0003\u0006\u0002>\u0005m\u0001\u0019!C\u0001\u0003\u007f\tQEY1uG\"$\u0006N]3f'\"|W\u000f\u001c3CY>\u001c7.\u00138eK\u001aLg.\u001b;fYf|F%Z9\u0015\u0007\u0005\n\t\u0005C\u0005)\u0003w\t\t\u00111\u0001\u00026!I\u0011QIA\u000eA\u0003&\u0011QG\u0001#E\u0006$8\r\u001b+ie\u0016,7\u000b[8vY\u0012\u0014En\\2l\u0013:$WMZ5oSR,G.\u001f\u0011\t\u0015\u0005%\u00131DA\u0001\n\u0013\tY%A\u0006sK\u0006$'+Z:pYZ,GCAA'!\u0011\ty%!\u0017\u000e\u0005\u0005E#\u0002BA*\u0003+\nA\u0001\\1oO*\u0011\u0011qK\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002\\\u0005E#AB(cU\u0016\u001cG\u000f")
/* loaded from: input_file:org/apache/spark/streaming/CheckpointSuite.class */
public class CheckpointSuite extends SparkFunSuite implements TestSuiteBase {
    private StreamingContext ssc;
    private final String checkpointDir;
    private final SparkConf conf;
    private final PatienceConfiguration.Timeout eventuallyTimeout;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    private volatile boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    private volatile boolean bitmap$0;

    public static boolean batchThreeShouldBlockIndefinitely() {
        return CheckpointSuite$.MODULE$.batchThreeShouldBlockIndefinitely();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private String checkpointDir$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.checkpointDir = TestSuiteBase.Cclass.checkpointDir(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.checkpointDir;
        }
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String checkpointDir() {
        return this.bitmap$0 ? this.checkpointDir : checkpointDir$lzycompute();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public SparkConf conf() {
        return this.conf;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public PatienceConfiguration.Timeout eventuallyTimeout() {
        return this.eventuallyTimeout;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$conf_$eq(SparkConf sparkConf) {
        this.conf = sparkConf;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$eventuallyTimeout_$eq(PatienceConfiguration.Timeout timeout) {
        this.eventuallyTimeout = timeout;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String framework() {
        return TestSuiteBase.Cclass.framework(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String master() {
        return TestSuiteBase.Cclass.master(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public int numInputPartitions() {
        return TestSuiteBase.Cclass.numInputPartitions(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public int maxWaitTimeMillis() {
        return TestSuiteBase.Cclass.maxWaitTimeMillis(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public boolean useManualClock() {
        return TestSuiteBase.Cclass.useManualClock(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public boolean actuallyWait() {
        return TestSuiteBase.Cclass.actuallyWait(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <R> R withStreamingContext(StreamingContext streamingContext, Function1<StreamingContext, R> function1) {
        return (R) TestSuiteBase.Cclass.withStreamingContext(this, streamingContext, function1);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <R> R withTestServer(TestServer testServer, Function1<TestServer, R> function1) {
        return (R) TestSuiteBase.Cclass.withTestServer(this, testServer, function1);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> StreamingContext setupStreams(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, int i, ClassTag<U> classTag, ClassTag<V> classTag2) {
        return TestSuiteBase.Cclass.setupStreams(this, seq, function1, i, classTag, classTag2);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> StreamingContext setupStreams(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        return TestSuiteBase.Cclass.setupStreams(this, seq, seq2, function2, classTag, classTag2, classTag3);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Seq<Seq<V>> runStreams(StreamingContext streamingContext, int i, int i2, ClassTag<V> classTag) {
        return TestSuiteBase.Cclass.runStreams(this, streamingContext, i, i2, classTag);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Seq<Seq<Seq<V>>> runStreamsWithPartitions(StreamingContext streamingContext, int i, int i2, ClassTag<V> classTag) {
        return TestSuiteBase.Cclass.runStreamsWithPartitions(this, streamingContext, i, i2, classTag);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> void verifyOutput(Seq<Seq<V>> seq, Seq<Seq<V>> seq2, boolean z, ClassTag<V> classTag) {
        TestSuiteBase.Cclass.verifyOutput(this, seq, seq2, z, classTag);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> void testOperation(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, Seq<Seq<V>> seq2, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2) {
        TestSuiteBase.Cclass.testOperation(this, seq, function1, seq2, z, classTag, classTag2);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> void testOperation(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, Seq<Seq<V>> seq2, int i, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2) {
        TestSuiteBase.Cclass.testOperation(this, seq, function1, seq2, i, z, classTag, classTag2);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> void testOperation(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, Seq<Seq<W>> seq3, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        TestSuiteBase.Cclass.testOperation(this, seq, seq2, function2, seq3, z, classTag, classTag2, classTag3);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> void testOperation(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, Seq<Seq<W>> seq3, int i, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        TestSuiteBase.Cclass.testOperation(this, seq, seq2, function2, seq3, i, z, classTag, classTag2, classTag3);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> boolean testOperation$default$4() {
        return TestSuiteBase.Cclass.testOperation$default$4(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> int setupStreams$default$3() {
        int numInputPartitions;
        numInputPartitions = numInputPartitions();
        return numInputPartitions;
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    }

    public boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked() {
        return this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    }

    public void org$scalatest$BeforeAndAfter$$runHasBeenInvoked_$eq(boolean z) {
        this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked = z;
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$runTest(String str, Args args) {
        return FunSuiteLike.class.runTest(this, str, args);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$run(Option option, Args args) {
        return FunSuiteLike.class.run(this, option, args);
    }

    public void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$beforeFunctionAtomic_$eq(AtomicReference atomicReference) {
        this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic = atomicReference;
    }

    public void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$afterFunctionAtomic_$eq(AtomicReference atomicReference) {
        this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic = atomicReference;
    }

    public void before(Function0<Object> function0) {
        BeforeAndAfter.class.before(this, function0);
    }

    public void after(Function0<Object> function0) {
        BeforeAndAfter.class.after(this, function0);
    }

    public Status runTest(String str, Args args) {
        return BeforeAndAfter.class.runTest(this, str, args);
    }

    public Status run(Option<String> option, Args args) {
        return BeforeAndAfter.class.run(this, option, args);
    }

    public StreamingContext ssc() {
        return this.ssc;
    }

    public void ssc_$eq(StreamingContext streamingContext) {
        this.ssc = streamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public Duration batchDuration() {
        return Milliseconds$.MODULE$.apply(500L);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void beforeFunction() {
        TestSuiteBase.Cclass.beforeFunction(this);
        Utils$.MODULE$.deleteRecursively(new File(checkpointDir()));
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void afterFunction() {
        TestSuiteBase.Cclass.afterFunction(this);
        if (ssc() != null) {
            StreamingContext ssc = ssc();
            ssc.stop(ssc.stop$default$1());
        }
        Utils$.MODULE$.deleteRecursively(new File(checkpointDir()));
    }

    public <U, V> void testCheckpointedOperation(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, Seq<Seq<V>> seq2, int i, ClassTag<U> classTag, ClassTag<V> classTag2) {
        int size = seq.size() - i;
        int size2 = (seq2.size() - i) + 1;
        ssc_$eq(setupStreams(seq, function1, setupStreams$default$3(), classTag, classTag2));
        ssc().start();
        Seq<Seq<V>> advanceTimeWithRealDelay = advanceTimeWithRealDelay(ssc(), i, classTag2);
        StreamingContext ssc = ssc();
        ssc.stop(ssc.stop$default$1());
        verifyOutput(advanceTimeWithRealDelay, (Seq) seq2.take(i), true, classTag2);
        Thread.sleep(1000L);
        logInfo(new CheckpointSuite$$anonfun$testCheckpointedOperation$1(this));
        ssc_$eq(new StreamingContext(checkpointDir()));
        ssc().start();
        verifyOutput(advanceTimeWithRealDelay(ssc(), size, classTag2), (Seq) seq2.takeRight(size2), true, classTag2);
        StreamingContext ssc2 = ssc();
        ssc2.stop(ssc2.stop$default$1());
        ssc_$eq(null);
    }

    public <V> Seq<Seq<V>> advanceTimeWithRealDelay(StreamingContext streamingContext, long j, ClassTag<V> classTag) {
        ManualClock clock = streamingContext.scheduler().clock();
        logInfo(new CheckpointSuite$$anonfun$advanceTimeWithRealDelay$2(this, clock));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), (int) j).foreach$mVc$sp(new CheckpointSuite$$anonfun$advanceTimeWithRealDelay$1(this, clock));
        logInfo(new CheckpointSuite$$anonfun$advanceTimeWithRealDelay$3(this, clock));
        Thread.sleep(batchDuration().milliseconds());
        return (Seq) ((TestOutputStreamWithPartitions) Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(streamingContext.graph().getOutputStreams()).filter(new CheckpointSuite$$anonfun$33(this))).head()).output().map(new CheckpointSuite$$anonfun$advanceTimeWithRealDelay$4(this), Buffer$.MODULE$.canBuildFrom());
    }

    public CheckpointSuite() {
        BeforeAndAfter.class.$init$(this);
        TestSuiteBase.Cclass.$init$(this);
        this.ssc = null;
        test("basic rdd checkpoints + dstream graph checkpoint recovery", Predef$.MODULE$.wrapRefArray(new Tag[0]), new CheckpointSuite$$anonfun$1(this));
        test("recovery of conf through checkpoints", Predef$.MODULE$.wrapRefArray(new Tag[0]), new CheckpointSuite$$anonfun$2(this));
        test("recovery with map and reduceByKey operations", Predef$.MODULE$.wrapRefArray(new Tag[0]), new CheckpointSuite$$anonfun$3(this));
        test("recovery with invertible reduceByKeyAndWindow operation", Predef$.MODULE$.wrapRefArray(new Tag[0]), new CheckpointSuite$$anonfun$4(this));
        test("recovery with saveAsHadoopFiles operation", Predef$.MODULE$.wrapRefArray(new Tag[0]), new CheckpointSuite$$anonfun$7(this));
        test("recovery with saveAsNewAPIHadoopFiles operation", Predef$.MODULE$.wrapRefArray(new Tag[0]), new CheckpointSuite$$anonfun$9(this));
        test("recovery with saveAsHadoopFile inside transform operation", Predef$.MODULE$.wrapRefArray(new Tag[0]), new CheckpointSuite$$anonfun$11(this));
        test("recovery with updateStateByKey operation", Predef$.MODULE$.wrapRefArray(new Tag[0]), new CheckpointSuite$$anonfun$13(this));
        test("recovery with file input stream", Predef$.MODULE$.wrapRefArray(new Tag[0]), new CheckpointSuite$$anonfun$14(this));
    }
}
