package org.apache.spark.streaming;

import org.apache.spark.streaming.scheduler.BatchAllocationEvent;
import org.apache.spark.streaming.scheduler.BlockAdditionEvent$;
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo;
import org.apache.spark.streaming.scheduler.ReceivedBlockTracker;
import org.apache.spark.streaming.util.WriteAheadLogUtils$;
import org.apache.spark.util.Clock;
import org.apache.spark.util.ManualClock;
import org.scalactic.Equality$;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.enablers.Containing$;
import org.scalatest.enablers.Emptiness$;
import org.scalatest.enablers.Size$;
import org.scalatest.time.Span$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ReceivedBlockTrackerSuite.scala */
/* loaded from: input_file:org/apache/spark/streaming/ReceivedBlockTrackerSuite$$anonfun$5.class */
public final class ReceivedBlockTrackerSuite$$anonfun$5 extends AbstractFunction0.mcV.sp implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ ReceivedBlockTrackerSuite $outer;

    public final void apply() {
        apply$mcV$sp();
    }

    public void apply$mcV$sp() {
        Clock manualClock = new ManualClock();
        this.$outer.conf().set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1");
        Predef$.MODULE$.require(this.$outer.convertToEqualizer(BoxesRunTime.boxToInteger(WriteAheadLogUtils$.MODULE$.getRollingIntervalSecs(this.$outer.conf(), true))).$eq$eq$eq(BoxesRunTime.boxToInteger(1), Equality$.MODULE$.default()));
        ReceivedBlockTracker createTracker = this.$outer.createTracker(this.$outer.createTracker$default$1(), this.$outer.createTracker$default$2(), manualClock);
        this.$outer.convertToAnyShouldWrapper(BoxesRunTime.boxToBoolean(createTracker.isWriteAheadLogEnabled())).should(this.$outer.be().apply(true));
        Seq<ReceivedBlockInfo> addBlockInfos$1 = addBlockInfos$1(createTracker);
        this.$outer.convertToAnyShouldWrapper(createTracker.getUnallocatedBlocks(this.$outer.streamId()).toList()).shouldEqual(addBlockInfos$1, Equality$.MODULE$.default());
        Seq seq = (Seq) addBlockInfos$1.map(BlockAdditionEvent$.MODULE$, Seq$.MODULE$.canBuildFrom());
        this.$outer.convertToAnyShouldWrapper(this.$outer.getWrittenLogData(this.$outer.getWrittenLogData$default$1())).shouldEqual(seq, Equality$.MODULE$.default());
        this.$outer.convertToAnyShouldWrapper(this.$outer.getWriteAheadLogFiles()).should(this.$outer.have()).size(1L, Size$.MODULE$.sizeOfGenTraversable());
        createTracker.stop();
        incrementTime$1(manualClock);
        ReceivedBlockTracker createTracker2 = this.$outer.createTracker(this.$outer.createTracker$default$1(), false, manualClock);
        this.$outer.convertToAnyShouldWrapper(createTracker2.getUnallocatedBlocks(this.$outer.streamId())).shouldBe(this.$outer.empty(), Emptiness$.MODULE$.emptinessOfGenTraversable());
        this.$outer.convertToAnyShouldWrapper(BoxesRunTime.boxToBoolean(createTracker2.hasUnallocatedReceivedBlocks())).should(this.$outer.be().apply(false));
        createTracker2.stop();
        ReceivedBlockTracker createTracker3 = this.$outer.createTracker(this.$outer.createTracker$default$1(), true, manualClock);
        List list = createTracker3.getUnallocatedBlocks(this.$outer.streamId()).toList();
        this.$outer.convertToAnyShouldWrapper(list).shouldEqual(addBlockInfos$1, Equality$.MODULE$.default());
        list.foreach(new ReceivedBlockTrackerSuite$$anonfun$5$$anonfun$apply$mcV$sp$6(this));
        long timeMillis = manualClock.getTimeMillis();
        createTracker3.allocateBlocksToBatch(this.$outer.millisToTime(timeMillis));
        this.$outer.convertToAnyShouldWrapper(createTracker3.getBlocksOfBatchAndStream(this.$outer.millisToTime(timeMillis), this.$outer.streamId())).shouldEqual(addBlockInfos$1, Equality$.MODULE$.default());
        this.$outer.convertToAnyShouldWrapper(createTracker3.getBlocksOfBatch(this.$outer.millisToTime(timeMillis))).shouldEqual(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(this.$outer.streamId())), addBlockInfos$1)})), Equality$.MODULE$.default());
        incrementTime$1(manualClock);
        long timeMillis2 = manualClock.getTimeMillis();
        Seq<ReceivedBlockInfo> addBlockInfos$12 = addBlockInfos$1(createTracker3);
        createTracker3.allocateBlocksToBatch(this.$outer.millisToTime(timeMillis2));
        this.$outer.convertToAnyShouldWrapper(createTracker3.getBlocksOfBatchAndStream(this.$outer.millisToTime(timeMillis2), this.$outer.streamId())).shouldEqual(addBlockInfos$12, Equality$.MODULE$.default());
        createTracker3.stop();
        this.$outer.convertToAnyShouldWrapper(this.$outer.getWrittenLogData(this.$outer.getWrittenLogData$default$1())).shouldEqual((Seq) ((TraversableLike) ((TraversableLike) seq.$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new BatchAllocationEvent[]{this.$outer.createBatchAllocation(timeMillis, addBlockInfos$1)})), Seq$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce) addBlockInfos$12.map(BlockAdditionEvent$.MODULE$, Seq$.MODULE$.canBuildFrom()), Seq$.MODULE$.canBuildFrom())).$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new BatchAllocationEvent[]{this.$outer.createBatchAllocation(timeMillis2, addBlockInfos$12)})), Seq$.MODULE$.canBuildFrom()), Equality$.MODULE$.default());
        incrementTime$1(manualClock);
        ReceivedBlockTracker createTracker4 = this.$outer.createTracker(this.$outer.createTracker$default$1(), true, manualClock);
        this.$outer.convertToAnyShouldWrapper(createTracker4.getBlocksOfBatchAndStream(this.$outer.millisToTime(timeMillis), this.$outer.streamId())).shouldEqual(addBlockInfos$1, Equality$.MODULE$.default());
        this.$outer.convertToAnyShouldWrapper(createTracker4.getBlocksOfBatchAndStream(this.$outer.millisToTime(timeMillis2), this.$outer.streamId())).shouldEqual(addBlockInfos$12, Equality$.MODULE$.default());
        this.$outer.convertToAnyShouldWrapper(createTracker4.getUnallocatedBlocks(this.$outer.streamId())).shouldBe(this.$outer.empty(), Emptiness$.MODULE$.emptinessOfGenTraversable());
        String str = (String) this.$outer.getWriteAheadLogFiles().head();
        incrementTime$1(manualClock);
        createTracker4.cleanupOldBatches(this.$outer.millisToTime(timeMillis2), true);
        this.$outer.convertToAnyShouldWrapper(createTracker4.getBlocksOfBatchAndStream(this.$outer.millisToTime(timeMillis), this.$outer.streamId())).shouldEqual(Seq$.MODULE$.empty(), Equality$.MODULE$.default());
        this.$outer.convertToAnyShouldWrapper(this.$outer.getWrittenLogData((String) this.$outer.getWriteAheadLogFiles().last())).should(this.$outer.contain().apply(this.$outer.createBatchCleanup(timeMillis, Predef$.MODULE$.wrapLongArray(new long[0]))), Containing$.MODULE$.containingNatureOfGenTraversable(Equality$.MODULE$.default()));
        Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(Span$.MODULE$.convertDurationToSpan(new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), Eventually$.MODULE$.interval(Span$.MODULE$.convertDurationToSpan(new package.DurationInt(package$.MODULE$.DurationInt(10)).millisecond())), new ReceivedBlockTrackerSuite$$anonfun$5$$anonfun$apply$mcV$sp$1(this, str));
        printLogFiles$1("After clean");
        createTracker4.stop();
        incrementTime$1(manualClock);
        ReceivedBlockTracker createTracker5 = this.$outer.createTracker(this.$outer.createTracker$default$1(), true, manualClock);
        this.$outer.convertToAnyShouldWrapper(createTracker5.getUnallocatedBlocks(this.$outer.streamId())).shouldBe(this.$outer.empty(), Emptiness$.MODULE$.emptinessOfGenTraversable());
        this.$outer.convertToAnyShouldWrapper(createTracker5.getBlocksOfBatchAndStream(this.$outer.millisToTime(timeMillis), this.$outer.streamId())).shouldBe(this.$outer.empty(), Emptiness$.MODULE$.emptinessOfGenTraversable());
        this.$outer.convertToAnyShouldWrapper(createTracker5.getBlocksOfBatchAndStream(this.$outer.millisToTime(timeMillis2), this.$outer.streamId())).shouldEqual(addBlockInfos$12, Equality$.MODULE$.default());
        createTracker5.stop();
    }

    public /* synthetic */ ReceivedBlockTrackerSuite org$apache$spark$streaming$ReceivedBlockTrackerSuite$$anonfun$$$outer() {
        return this.$outer;
    }

    /* renamed from: apply, reason: collision with other method in class */
    public final /* bridge */ /* synthetic */ Object m338apply() {
        apply();
        return BoxedUnit.UNIT;
    }

    private final void incrementTime$1(ManualClock manualClock) {
        manualClock.advance(2000L);
    }

    private final Seq addBlockInfos$1(ReceivedBlockTracker receivedBlockTracker) {
        Seq<ReceivedBlockInfo> generateBlockInfos = this.$outer.generateBlockInfos();
        generateBlockInfos.map(new ReceivedBlockTrackerSuite$$anonfun$5$$anonfun$addBlockInfos$1$1(this, receivedBlockTracker), Seq$.MODULE$.canBuildFrom());
        return generateBlockInfos;
    }

    private final void printLogFiles$1(String str) {
        this.$outer.logInfo(new ReceivedBlockTrackerSuite$$anonfun$5$$anonfun$printLogFiles$1$1(this, str, ((TraversableOnce) this.$outer.getWriteAheadLogFiles().map(new ReceivedBlockTrackerSuite$$anonfun$5$$anonfun$9(this), Seq$.MODULE$.canBuildFrom())).mkString("\n")));
    }

    public ReceivedBlockTrackerSuite$$anonfun$5(ReceivedBlockTrackerSuite receivedBlockTrackerSuite) {
        if (receivedBlockTrackerSuite == null) {
            throw null;
        }
        this.$outer = receivedBlockTrackerSuite;
    }
}
