/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kinesis;

import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.rdd.RDD;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.storage.StreamBlockId;
import org.apache.spark.streaming.Checkpoint$;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.LocalStreamingContext;
import org.apache.spark.streaming.Milliseconds$;
import org.apache.spark.streaming.Minutes$;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.kinesis.BasicCredentials;
import org.apache.spark.streaming.kinesis.KPLBasedKinesisTestUtils;
import org.apache.spark.streaming.kinesis.KPLBasedKinesisTestUtils$;
import org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD;
import org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDPartition;
import org.apache.spark.streaming.kinesis.KinesisFunSuite;
import org.apache.spark.streaming.kinesis.KinesisInitialPosition;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.kinesis.KinesisInputDStream$;
import org.apache.spark.streaming.kinesis.KinesisReadConfigurations$;
import org.apache.spark.streaming.kinesis.KinesisTestUtils;
import org.apache.spark.streaming.kinesis.KinesisTestUtils$;
import org.apache.spark.streaming.kinesis.SequenceNumberRange;
import org.apache.spark.streaming.kinesis.SequenceNumberRanges;
import org.apache.spark.streaming.kinesis.SequenceNumberRanges$;
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult;
import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult;
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Args;
import org.scalatest.Assertions$;
import org.scalatest.BeforeAndAfter;
import org.scalatest.BeforeAndAfterAll;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.compatible.Assertion;
import org.scalatest.concurrent.AbstractPatienceConfiguration;
import org.scalatest.concurrent.AbstractPatienceConfiguration$PatienceConfig$;
import org.scalatest.concurrent.Eventually;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.concurrent.ScaledTimeSpans;
import org.scalatest.enablers.Emptiness$;
import org.scalatest.enablers.Retrying;
import org.scalatest.enablers.Retrying$;
import org.scalatest.matchers.TypeMatcherHelper$;
import org.scalatest.matchers.should.Matchers;
import org.scalatest.matchers.should.Matchers$;
import org.scalatest.time.Span;
import org.scalatest.time.Span$;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenIterable;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.Set;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashSet;
import scala.concurrent.duration.package;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;
import scala.util.Random$;

@ScalaSignature(bytes="\u0006\u0001\u0005]a!B\r\u001b\u0003\u0003)\u0003\u0002C!\u0001\u0005\u0003\u0005\u000b\u0011\u0002\"\t\u000b!\u0003A\u0011A%\t\u000f1\u0003!\u0019!C\u0005\u001b\"1\u0011\f\u0001Q\u0001\n9CqA\u0017\u0001C\u0002\u0013%1\f\u0003\u0004`\u0001\u0001\u0006I\u0001\u0018\u0005\bA\u0002\u0011\r\u0011\"\u0003b\u0011\u0019I\u0007\u0001)A\u0005E\"9!\u000e\u0001b\u0001\n\u0013i\u0005BB6\u0001A\u0003%a\nC\u0004m\u0001\t\u0007I\u0011B1\t\r5\u0004\u0001\u0015!\u0003c\u0011\u001dq\u0007A1A\u0005\n\u0005Daa\u001c\u0001!\u0002\u0013\u0011\u0007b\u00029\u0001\u0001\u0004%I!\u001d\u0005\bk\u0002\u0001\r\u0011\"\u0003w\u0011\u0019a\b\u0001)Q\u0005e\"9Q\u0010\u0001a\u0001\n\u0013q\b\"CA\u0003\u0001\u0001\u0007I\u0011BA\u0004\u0011\u001d\tY\u0001\u0001Q!\n}Dq!!\u0004\u0001\t\u0003\ny\u0001C\u0004\u0002\u0012\u0001!\t%a\u0004\t\u000f\u0005M\u0001\u0001\"\u0011\u0002\u0010!9\u0011Q\u0003\u0001\u0005B\u0005=!AE&j]\u0016\u001c\u0018n]*ue\u0016\fW\u000eV3tiNT!a\u0007\u000f\u0002\u000f-Lg.Z:jg*\u0011QDH\u0001\ngR\u0014X-Y7j]\u001eT!a\b\u0011\u0002\u000bM\u0004\u0018M]6\u000b\u0005\u0005\u0012\u0013AB1qC\u000eDWMC\u0001$\u0003\ry'oZ\u0002\u0001'\u001d\u0001aE\u000b\u00183uy\u0002\"a\n\u0015\u000e\u0003yI!!\u000b\u0010\u0003\u001bM\u0003\u0018M]6Gk:\u001cV/\u001b;f!\tYC&D\u0001\u001b\u0013\ti#DA\bLS:,7/[:Gk:\u001cV/\u001b;f!\ty\u0003'D\u0001\u001d\u0013\t\tDDA\u000bM_\u000e\fGn\u0015;sK\u0006l\u0017N\\4D_:$X\r\u001f;\u0011\u0005MBT\"\u0001\u001b\u000b\u0005U2\u0014AC2p]\u000e,(O]3oi*\u0011qGI\u0001\ng\u000e\fG.\u0019;fgRL!!\u000f\u001b\u0003\u0015\u00153XM\u001c;vC2d\u0017\u0010\u0005\u0002<y5\ta'\u0003\u0002>m\tq!)\u001a4pe\u0016\fe\u000eZ!gi\u0016\u0014\bCA\u001e@\u0013\t\u0001eGA\tCK\u001a|'/Z!oI\u00063G/\u001a:BY2\f\u0011#Y4he\u0016<\u0017\r^3UKN$H)\u0019;b!\t\u0019e)D\u0001E\u0015\u0005)\u0015!B:dC2\f\u0017BA$E\u0005\u001d\u0011un\u001c7fC:\fa\u0001P5oSRtDC\u0001&L!\tY\u0003\u0001C\u0003B\u0005\u0001\u0007!)A\u0004baBt\u0015-\\3\u0016\u00039\u0003\"a\u0014,\u000f\u0005A#\u0006CA)E\u001b\u0005\u0011&BA*%\u0003\u0019a$o\\8u}%\u0011Q\u000bR\u0001\u0007!J,G-\u001a4\n\u0005]C&AB*ue&twM\u0003\u0002V\t\u0006A\u0011\r\u001d9OC6,\u0007%A\u0007cCR\u001c\u0007\u000eR;sCRLwN\\\u000b\u00029B\u0011q&X\u0005\u0003=r\u0011\u0001\u0002R;sCRLwN\\\u0001\u000fE\u0006$8\r\u001b#ve\u0006$\u0018n\u001c8!\u0003A!W/\\7z\u000b:$\u0007o\\5oiV\u0013H.F\u0001c!\t\u0019\u0007.D\u0001e\u0015\t)g-\u0001\u0003mC:<'\"A4\u0002\t)\fg/Y\u0005\u0003/\u0012\f\u0011\u0003Z;n[f,e\u000e\u001a9pS:$XK\u001d7!\u0003=!W/\\7z%\u0016<\u0017n\u001c8OC6,\u0017\u0001\u00053v[6L(+Z4j_:t\u0015-\\3!\u0003E!W/\\7z\u0003^\u001b\u0016iY2fgN\\U-_\u0001\u0013IVlW._!X'\u0006\u001b7-Z:t\u0017\u0016L\b%A\tek6l\u00170Q,T'\u0016\u001c'/\u001a;LKf\f!\u0003Z;n[f\fukU*fGJ,GoS3zA\u0005IA/Z:u+RLGn]\u000b\u0002eB\u00111f]\u0005\u0003ij\u0011\u0001cS5oKNL7\u000fV3tiV#\u0018\u000e\\:\u0002\u001bQ,7\u000f^+uS2\u001cx\fJ3r)\t9(\u0010\u0005\u0002Dq&\u0011\u0011\u0010\u0012\u0002\u0005+:LG\u000fC\u0004|!\u0005\u0005\t\u0019\u0001:\u0002\u0007a$\u0013'\u0001\u0006uKN$X\u000b^5mg\u0002\n!a]2\u0016\u0003}\u00042aJA\u0001\u0013\r\t\u0019A\b\u0002\r'B\f'o[\"p]R,\u0007\u0010^\u0001\u0007g\u000e|F%Z9\u0015\u0007]\fI\u0001C\u0004|'\u0005\u0005\t\u0019A@\u0002\u0007M\u001c\u0007%A\u0005cK\u001a|'/Z!mYR\tq/\u0001\u0005bMR,'/\u00117m\u0003)\u0011WMZ8sK\u0016\u000b7\r[\u0001\nC\u001a$XM]#bG\"\u0004")
public abstract class KinesisStreamTests
extends SparkFunSuite
implements KinesisFunSuite,
LocalStreamingContext,
Eventually,
BeforeAndAfter {
    private final boolean aggregateTestData;
    private final String appName;
    private final Duration batchDuration;
    private final String dummyEndpointUrl;
    private final String dummyRegionName;
    private final String dummyAWSAccessKey;
    private final String dummyAWSSecretKey;
    private KinesisTestUtils testUtils;
    private SparkContext sc;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    private volatile boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    private final AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    private volatile AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig$module;
    private transient StreamingContext ssc;
    private final transient boolean stopSparkContext;

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$runTest(String testName, Args args) {
        return BeforeAndAfterEach.runTest$((BeforeAndAfterEach)this, (String)testName, (Args)args);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$run(Option testName, Args args) {
        return BeforeAndAfterAll.run$((BeforeAndAfterAll)this, (Option)testName, (Args)args);
    }

    public void before(Function0<Object> fun, Position pos) {
        BeforeAndAfter.before$((BeforeAndAfter)this, fun, (Position)pos);
    }

    public void after(Function0<Object> fun, Position pos) {
        BeforeAndAfter.after$((BeforeAndAfter)this, fun, (Position)pos);
    }

    public Status runTest(String testName, Args args) {
        return BeforeAndAfter.runTest$((BeforeAndAfter)this, (String)testName, (Args)args);
    }

    public Status run(Option<String> testName, Args args) {
        return BeforeAndAfter.run$((BeforeAndAfter)this, testName, (Args)args);
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, PatienceConfiguration.Interval interval, Function0<T> fun, Retrying<T> retrying, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, (PatienceConfiguration.Timeout)timeout, (PatienceConfiguration.Interval)interval, fun, retrying, (Position)pos);
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config, Retrying<T> retrying, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, (PatienceConfiguration.Timeout)timeout, fun, (AbstractPatienceConfiguration.PatienceConfig)config, retrying, (Position)pos);
    }

    public <T> T eventually(PatienceConfiguration.Interval interval, Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config, Retrying<T> retrying, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, (PatienceConfiguration.Interval)interval, fun, (AbstractPatienceConfiguration.PatienceConfig)config, retrying, (Position)pos);
    }

    public <T> T eventually(Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config, Retrying<T> retrying, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, fun, (AbstractPatienceConfiguration.PatienceConfig)config, retrying, (Position)pos);
    }

    public AbstractPatienceConfiguration.PatienceConfig patienceConfig() {
        return PatienceConfiguration.patienceConfig$((PatienceConfiguration)this);
    }

    public PatienceConfiguration.Timeout timeout(Span value) {
        return PatienceConfiguration.timeout$((PatienceConfiguration)this, (Span)value);
    }

    public PatienceConfiguration.Interval interval(Span value) {
        return PatienceConfiguration.interval$((PatienceConfiguration)this, (Span)value);
    }

    public final Span scaled(Span span) {
        return ScaledTimeSpans.scaled$((ScaledTimeSpans)this, (Span)span);
    }

    public double spanScaleFactor() {
        return ScaledTimeSpans.spanScaleFactor$((ScaledTimeSpans)this);
    }

    public /* synthetic */ void org$apache$spark$streaming$LocalStreamingContext$$super$afterEach() {
        BeforeAndAfterEach.afterEach$((BeforeAndAfterEach)this);
    }

    public void resetStreamingContext() {
        LocalStreamingContext.resetStreamingContext$((LocalStreamingContext)this);
    }

    @Override
    public void testIfEnabled(String testName, Function0<BoxedUnit> testBody) {
        KinesisFunSuite.testIfEnabled$(this, testName, testBody);
    }

    @Override
    public void runIfTestsEnabled(String message, Function0<BoxedUnit> body) {
        KinesisFunSuite.runIfTestsEnabled$(this, message, body);
    }

    public final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    }

    public final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    }

    public boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked() {
        return this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    }

    public void org$scalatest$BeforeAndAfter$$runHasBeenInvoked_$eq(boolean x$1) {
        this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked = x$1;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$beforeFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> x$1) {
        this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic = x$1;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$afterFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> x$1) {
        this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic = x$1;
    }

    public AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig() {
        return this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    }

    public final void org$scalatest$concurrent$PatienceConfiguration$_setter_$org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig_$eq(AbstractPatienceConfiguration.PatienceConfig x$1) {
        this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig = x$1;
    }

    public AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig() {
        if (this.PatienceConfig$module == null) {
            this.PatienceConfig$lzycompute$1();
        }
        return this.PatienceConfig$module;
    }

    public StreamingContext ssc() {
        return this.ssc;
    }

    public void ssc_$eq(StreamingContext x$1) {
        this.ssc = x$1;
    }

    public boolean stopSparkContext() {
        return this.stopSparkContext;
    }

    public void org$apache$spark$streaming$LocalStreamingContext$_setter_$stopSparkContext_$eq(boolean x$1) {
        this.stopSparkContext = x$1;
    }

    private String appName() {
        return this.appName;
    }

    private Duration batchDuration() {
        return this.batchDuration;
    }

    private String dummyEndpointUrl() {
        return this.dummyEndpointUrl;
    }

    private String dummyRegionName() {
        return this.dummyRegionName;
    }

    private String dummyAWSAccessKey() {
        return this.dummyAWSAccessKey;
    }

    private String dummyAWSSecretKey() {
        return this.dummyAWSSecretKey;
    }

    private KinesisTestUtils testUtils() {
        return this.testUtils;
    }

    private void testUtils_$eq(KinesisTestUtils x$1) {
        this.testUtils = x$1;
    }

    private SparkContext sc() {
        return this.sc;
    }

    private void sc_$eq(SparkContext x$1) {
        this.sc = x$1;
    }

    public void beforeAll() {
        this.runIfTestsEnabled("Prepare KinesisTestUtils", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.testUtils_$eq(new KPLBasedKinesisTestUtils(KPLBasedKinesisTestUtils$.MODULE$.$lessinit$greater$default$1()));
            this.testUtils().createStream();
        });
    }

    public void afterAll() {
        try {
            if (this.testUtils() != null) {
                this.testUtils().deleteStream();
                this.testUtils().deleteDynamoDBTable(this.appName());
            }
        }
        finally {
            super.afterAll();
        }
    }

    public void beforeEach() {
        BeforeAndAfterEach.beforeEach$((BeforeAndAfterEach)this);
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("KinesisStreamSuite");
        this.sc_$eq(new SparkContext(conf));
        this.ssc_$eq(new StreamingContext(this.sc(), this.batchDuration()));
    }

    public void afterEach() {
        try {
            if (this.testUtils() != null) {
                this.testUtils().deleteDynamoDBTable(this.appName());
            }
        }
        finally {
            LocalStreamingContext.afterEach$((LocalStreamingContext)this);
        }
    }

    private final void PatienceConfig$lzycompute$1() {
        KinesisStreamTests kinesisStreamTests = this;
        synchronized (kinesisStreamTests) {
            if (this.PatienceConfig$module == null) {
                this.PatienceConfig$module = new AbstractPatienceConfiguration$PatienceConfig$((AbstractPatienceConfiguration)this);
            }
        }
    }

    public static final /* synthetic */ int $anonfun$new$10(byte[] bytes) {
        return new StringOps(Predef$.MODULE$.augmentString(new String(bytes))).toInt();
    }

    public static final /* synthetic */ void $anonfun$new$11(KinesisStreamTests $this, HashSet collected$1, RDD rdd) {
        HashSet hashSet = collected$1;
        synchronized (hashSet) {
            collected$1.$plus$plus$eq((TraversableOnce)new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps((int[])rdd.collect())));
            $this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(12).append("Collected = ").append(collected$1.mkString(", ")).toString());
        }
    }

    private static final int addFive$1(Record r) {
        return new StringOps(Predef$.MODULE$.augmentString(JavaUtils.bytesToString((ByteBuffer)r.getData()))).toInt() + 5;
    }

    public static final /* synthetic */ void $anonfun$new$16(KinesisStreamTests $this, HashSet collected$2, RDD rdd) {
        HashSet hashSet = collected$2;
        synchronized (hashSet) {
            collected$2.$plus$plus$eq((TraversableOnce)new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps((int[])rdd.collect())));
            $this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(12).append("Collected = ").append(collected$2.mkString(", ")).toString());
        }
    }

    public static final /* synthetic */ int $anonfun$new$22(byte[] bytes) {
        return new StringOps(Predef$.MODULE$.augmentString(new String(bytes))).toInt();
    }

    public static final /* synthetic */ void $anonfun$new$23(KinesisStreamTests $this, HashSet collected$3, RDD rdd) {
        HashSet hashSet = collected$3;
        synchronized (hashSet) {
            collected$3.$plus$plus$eq((TraversableOnce)new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps((int[])rdd.collect())));
            $this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(12).append("Collected = ").append(collected$3.mkString(", ")).toString());
        }
    }

    public static final /* synthetic */ boolean $anonfun$new$26(Shard shard) {
        return shard.getSequenceNumberRange().getEndingSequenceNumber() == null;
    }

    public static final /* synthetic */ boolean $anonfun$new$28(Shard shard) {
        return shard.getSequenceNumberRange().getEndingSequenceNumber() == null;
    }

    public static final /* synthetic */ int $anonfun$new$32(byte[] bytes) {
        return new StringOps(Predef$.MODULE$.augmentString(new String(bytes))).toInt();
    }

    public static final /* synthetic */ void $anonfun$new$31(HashMap collectedData$1, RDD rdd, Time time) {
        KinesisBackedBlockRDD kRdd = (KinesisBackedBlockRDD)rdd;
        Seq data = new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps((int[])rdd.map((Function1 & Serializable & scala.Serializable)bytes -> BoxesRunTime.boxToInteger((int)KinesisStreamTests.$anonfun$new$32(bytes)), ClassTag$.MODULE$.Int()).collect())).toSeq();
        HashMap hashMap = collectedData$1;
        synchronized (hashMap) {
            collectedData$1.update((Object)time, (Object)new Tuple2((Object)kRdd.arrayOfseqNumberRanges(), (Object)data));
        }
    }

    public static final /* synthetic */ boolean $anonfun$new$33(Tuple2 x$11) {
        return ((TraversableOnce)((Tuple2)x$11._2())._2()).nonEmpty();
    }

    private static final int numBatchesWithData$1(HashMap collectedData$1) {
        int n;
        HashMap hashMap = collectedData$1;
        synchronized (hashMap) {
            n = collectedData$1.count((Function1 & Serializable & scala.Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)KinesisStreamTests.$anonfun$new$33(x$11)));
        }
        return n;
    }

    private static final boolean isCheckpointPresent$1(String checkpointDir$1) {
        return Checkpoint$.MODULE$.getCheckpointFiles(checkpointDir$1, Checkpoint$.MODULE$.getCheckpointFiles$default$2()).nonEmpty();
    }

    public static final /* synthetic */ int $anonfun$new$39(byte[] bytes) {
        return new StringOps(Predef$.MODULE$.augmentString(new String(bytes))).toInt();
    }

    public KinesisStreamTests(boolean aggregateTestData) {
        this.aggregateTestData = aggregateTestData;
        KinesisFunSuite.$init$(this);
        LocalStreamingContext.$init$((LocalStreamingContext)this);
        ScaledTimeSpans.$init$((ScaledTimeSpans)this);
        AbstractPatienceConfiguration.$init$((AbstractPatienceConfiguration)this);
        PatienceConfiguration.$init$((PatienceConfiguration)this);
        Eventually.$init$((Eventually)this);
        BeforeAndAfter.$init$((BeforeAndAfter)this);
        this.appName = new StringBuilder(19).append("KinesisStreamSuite-").append(package$.MODULE$.abs(Random$.MODULE$.nextLong())).toString();
        this.batchDuration = Seconds$.MODULE$.apply(1L);
        this.dummyEndpointUrl = KinesisTestUtils$.MODULE$.defaultEndpointUrl();
        this.dummyRegionName = KinesisTestUtils$.MODULE$.getRegionNameByEndpoint(this.dummyEndpointUrl());
        this.dummyAWSAccessKey = "dummyAccessKey";
        this.dummyAWSSecretKey = "dummySecretKey";
        this.testUtils = null;
        this.sc = null;
        this.test("RDD generation", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            KinesisInputDStream kinesisStream;
            KinesisInputDStream $org_scalatest_assert_macro_left = kinesisStream = KinesisInputDStream$.MODULE$.builder().streamingContext(this.ssc()).checkpointAppName(this.appName()).streamName("dummyStream").endpointUrl(this.dummyEndpointUrl()).regionName(this.dummyRegionName()).initialPosition((KinesisInitialPosition)new KinesisInitialPositions.Latest()).checkpointInterval(Seconds$.MODULE$.apply(2L)).storageLevel(StorageLevel$.MODULE$.MEMORY_AND_DISK_2()).kinesisCredentials((SparkAWSCredentials)new BasicCredentials(this.dummyAWSAccessKey(), this.dummyAWSSecretKey())).build();
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.isInstanceOfMacroBool((Object)$org_scalatest_assert_macro_left, "isInstanceOf", "org.apache.spark.streaming.kinesis.KinesisInputDStream", $org_scalatest_assert_macro_left instanceof KinesisInputDStream, Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 108));
            Time time = new Time(1000L);
            SequenceNumberRanges seqNumRanges1 = SequenceNumberRanges$.MODULE$.apply(new SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67));
            StreamBlockId blockId1 = new StreamBlockId(kinesisStream.id(), 123L);
            ReceivedBlockInfo blockInfo1 = new ReceivedBlockInfo(0, (Option)None$.MODULE$, (Option)new Some((Object)seqNumRanges1), (ReceivedBlockStoreResult)new BlockManagerBasedStoreResult(blockId1, (Option)None$.MODULE$));
            SequenceNumberRanges seqNumRanges2 = SequenceNumberRanges$.MODULE$.apply(new SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89));
            StreamBlockId blockId2 = new StreamBlockId(kinesisStream.id(), 345L);
            ReceivedBlockInfo blockInfo2 = new ReceivedBlockInfo(0, (Option)None$.MODULE$, (Option)new Some((Object)seqNumRanges2), (ReceivedBlockStoreResult)new BlockManagerBasedStoreResult(blockId2, (Option)None$.MODULE$));
            Seq blockInfos = (Seq)new .colon.colon((Object)blockInfo1, (List)new .colon.colon((Object)blockInfo2, (List)Nil$.MODULE$));
            RDD nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos);
            Matchers.AnyShouldWrapper $org_scalatest_type_matcher_macro_left = Matchers$.MODULE$.convertToAnyShouldWrapper((Object)nonEmptyRDD, new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 128), Prettifier$.MODULE$.default());
            TypeMatcherHelper$.MODULE$.assertAType($org_scalatest_type_matcher_macro_left.leftSideValue(), Matchers$.MODULE$.a(ClassTag$.MODULE$.apply(KinesisBackedBlockRDD.class)), $org_scalatest_type_matcher_macro_left.prettifier(), $org_scalatest_type_matcher_macro_left.pos());
            KinesisBackedBlockRDD kinesisRDD = (KinesisBackedBlockRDD)nonEmptyRDD;
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(kinesisRDD.regionName());
            String $org_scalatest_assert_macro_right = this.dummyRegionName();
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 130));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left3 = this.convertToEqualizer(kinesisRDD.endpointUrl());
            String $org_scalatest_assert_macro_right2 = this.dummyEndpointUrl();
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left3, "===", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left3.$eq$eq$eq((Object)$org_scalatest_assert_macro_right2, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 131));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left4 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)kinesisRDD.kinesisReadConfigs().retryTimeoutMs()));
            long $org_scalatest_assert_macro_right3 = this.batchDuration().milliseconds();
            Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left4, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right3), $org_scalatest_assert_macro_left4.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right3), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 132));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left5 = this.convertToEqualizer(kinesisRDD.kinesisCreds());
            BasicCredentials $org_scalatest_assert_macro_right4 = new BasicCredentials(this.dummyAWSAccessKey(), this.dummyAWSSecretKey());
            Bool $org_scalatest_assert_macro_expr5 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left5, "===", (Object)$org_scalatest_assert_macro_right4, $org_scalatest_assert_macro_left5.$eq$eq$eq((Object)$org_scalatest_assert_macro_right4, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr5, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 133));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left6 = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])nonEmptyRDD.partitions())).size()));
            int $org_scalatest_assert_macro_right5 = blockInfos.size();
            Bool $org_scalatest_assert_macro_expr6 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left6, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right5), $org_scalatest_assert_macro_left6.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right5), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr6, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 136));
            new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])nonEmptyRDD.partitions())).foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
                Matchers.AnyShouldWrapper $org_scalatest_type_matcher_macro_left = Matchers$.MODULE$.convertToAnyShouldWrapper(x$1, new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 137), Prettifier$.MODULE$.default());
                return TypeMatcherHelper$.MODULE$.assertAType($org_scalatest_type_matcher_macro_left.leftSideValue(), Matchers$.MODULE$.a(ClassTag$.MODULE$.apply(KinesisBackedBlockRDDPartition.class)), $org_scalatest_type_matcher_macro_left.prettifier(), $org_scalatest_type_matcher_macro_left.pos());
            });
            Seq partitions = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])nonEmptyRDD.partitions())).map((Function1 & Serializable & scala.Serializable)x$2 -> (KinesisBackedBlockRDDPartition)x$2, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(KinesisBackedBlockRDDPartition.class))))).toSeq();
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left7 = this.convertToEqualizer(partitions.map((Function1 & Serializable & scala.Serializable)x$3 -> x$3.seqNumberRanges(), Seq$.MODULE$.canBuildFrom()));
            Seq $org_scalatest_assert_macro_right6 = (Seq)new .colon.colon((Object)seqNumRanges1, (List)new .colon.colon((Object)seqNumRanges2, (List)Nil$.MODULE$));
            Bool $org_scalatest_assert_macro_expr7 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left7, "===", (Object)$org_scalatest_assert_macro_right6, $org_scalatest_assert_macro_left7.$eq$eq$eq((Object)$org_scalatest_assert_macro_right6, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr7, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 140));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left8 = this.convertToEqualizer(partitions.map((Function1 & Serializable & scala.Serializable)x$4 -> x$4.blockId(), Seq$.MODULE$.canBuildFrom()));
            Seq $org_scalatest_assert_macro_right7 = (Seq)new .colon.colon((Object)blockId1, (List)new .colon.colon((Object)blockId2, (List)Nil$.MODULE$));
            Bool $org_scalatest_assert_macro_expr8 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left8, "===", (Object)$org_scalatest_assert_macro_right7, $org_scalatest_assert_macro_left8.$eq$eq$eq((Object)$org_scalatest_assert_macro_right7, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr8, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 141));
            Bool $org_scalatest_assert_macro_expr9 = Bool$.MODULE$.simpleMacroBool(partitions.forall((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)x$5.isBlockIdValid())), "partitions.forall(((x$5: org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDPartition) => x$5.isBlockIdValid))", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr9, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 142));
            RDD emptyRDD = kinesisStream.createBlockRDD(time, (Seq)Nil$.MODULE$);
            Matchers.AnyShouldWrapper $org_scalatest_type_matcher_macro_left2 = Matchers$.MODULE$.convertToAnyShouldWrapper((Object)emptyRDD, new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 148), Prettifier$.MODULE$.default());
            TypeMatcherHelper$.MODULE$.assertAType($org_scalatest_type_matcher_macro_left2.leftSideValue(), Matchers$.MODULE$.a(ClassTag$.MODULE$.apply(KinesisBackedBlockRDD.class)), $org_scalatest_type_matcher_macro_left2.prettifier(), $org_scalatest_type_matcher_macro_left2.pos());
            Matchers$.MODULE$.convertToAnyShouldWrapper((Object)emptyRDD.partitions(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 149), Prettifier$.MODULE$.default()).shouldBe(Matchers$.MODULE$.empty(), Emptiness$.MODULE$.emptinessOfArray());
            blockInfos.foreach((Function1 & Serializable & scala.Serializable)x$6 -> {
                x$6.setBlockIdInvalid();
                return BoxedUnit.UNIT;
            });
            new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])kinesisStream.createBlockRDD(time, blockInfos).partitions())).foreach((Function1 & Serializable & scala.Serializable)partition -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToBoolean((boolean)((KinesisBackedBlockRDDPartition)partition).isBlockIdValid()));
                boolean $org_scalatest_assert_macro_right = false;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToBoolean((boolean)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToBoolean((boolean)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 154));
            });
        }, new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 97));
        this.testIfEnabled("basic operation", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            KinesisInputDStream stream = KinesisInputDStream$.MODULE$.builder().streamingContext(this.ssc()).checkpointAppName(this.appName()).streamName(this.testUtils().streamName()).endpointUrl(this.testUtils().endpointUrl()).regionName(this.testUtils().regionName()).initialPosition((KinesisInitialPosition)new KinesisInitialPositions.Latest()).checkpointInterval(Seconds$.MODULE$.apply(10L)).storageLevel(StorageLevel$.MODULE$.MEMORY_ONLY()).build();
            HashSet collected = new HashSet();
            stream.map((Function1 & Serializable & scala.Serializable)bytes -> BoxesRunTime.boxToInteger((int)KinesisStreamTests.$anonfun$new$10(bytes)), ClassTag$.MODULE$.Int()).foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                KinesisStreamTests.$anonfun$new$11(this, collected, rdd);
                return BoxedUnit.UNIT;
            });
            this.ssc().start();
            Range.Inclusive testData = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minutes())), this.interval(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(10)).seconds())), (Function0 & Serializable & scala.Serializable)() -> {
                Assertion assertion;
                this.testUtils().pushData((Seq)testData, $this.aggregateTestData);
                HashSet hashSet = collected;
                synchronized (hashSet) {
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(collected);
                    scala.collection.immutable.Set $org_scalatest_assert_macro_right = testData.toSet();
                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                    assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"\nData received does not match data sent", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 191));
                }
                return assertion;
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 188));
            this.ssc().stop(false);
        });
        this.testIfEnabled("custom message handling", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            KinesisInputDStream stream = KinesisInputDStream$.MODULE$.builder().streamingContext(this.ssc()).checkpointAppName(this.appName()).streamName(this.testUtils().streamName()).endpointUrl(this.testUtils().endpointUrl()).regionName(this.testUtils().regionName()).initialPosition((KinesisInitialPosition)new KinesisInitialPositions.Latest()).checkpointInterval(Seconds$.MODULE$.apply(10L)).storageLevel(StorageLevel$.MODULE$.MEMORY_ONLY()).buildWithMessageHandler((Function1 & Serializable & scala.Serializable)r -> BoxesRunTime.boxToInteger((int)KinesisStreamTests.addFive$1(r)), ClassTag$.MODULE$.Int());
            Matchers.AnyShouldWrapper $org_scalatest_type_matcher_macro_left = Matchers$.MODULE$.convertToAnyShouldWrapper((Object)stream, new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 210), Prettifier$.MODULE$.default());
            TypeMatcherHelper$.MODULE$.assertAType($org_scalatest_type_matcher_macro_left.leftSideValue(), Matchers$.MODULE$.a(ClassTag$.MODULE$.apply(ReceiverInputDStream.class)), $org_scalatest_type_matcher_macro_left.prettifier(), $org_scalatest_type_matcher_macro_left.pos());
            HashSet collected = new HashSet();
            stream.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                KinesisStreamTests.$anonfun$new$16(this, collected, rdd);
                return BoxedUnit.UNIT;
            });
            this.ssc().start();
            Range.Inclusive testData = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minutes())), this.interval(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(10)).seconds())), (Function0 & Serializable & scala.Serializable)() -> {
                Assertion assertion;
                this.testUtils().pushData((Seq)testData, $this.aggregateTestData);
                IndexedSeq modData = (IndexedSeq)testData.map((Function1)(JFunction1.mcII.sp & Serializable & scala.Serializable)x$7 -> x$7 + 5, IndexedSeq$.MODULE$.canBuildFrom());
                HashSet hashSet = collected;
                synchronized (hashSet) {
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(collected);
                    scala.collection.immutable.Set $org_scalatest_assert_macro_right = modData.toSet();
                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                    assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"\nData received does not match data sent", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 226));
                }
                return assertion;
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 222));
            this.ssc().stop(false);
        });
        this.test("Kinesis read with custom configurations", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            Assertion assertion;
            try {
                this.ssc().sc().conf().set(KinesisReadConfigurations$.MODULE$.RETRY_WAIT_TIME_KEY(), "2000ms");
                this.ssc().sc().conf().set(KinesisReadConfigurations$.MODULE$.RETRY_MAX_ATTEMPTS_KEY(), "5");
                KinesisInputDStream kinesisStream = KinesisInputDStream$.MODULE$.builder().streamingContext(this.ssc()).checkpointAppName(this.appName()).streamName("dummyStream").endpointUrl(this.dummyEndpointUrl()).regionName(this.dummyRegionName()).initialPosition((KinesisInitialPosition)new KinesisInitialPositions.Latest()).checkpointInterval(Seconds$.MODULE$.apply(10L)).storageLevel(StorageLevel$.MODULE$.MEMORY_ONLY()).build();
                Time time = new Time(1000L);
                SequenceNumberRanges seqNumRanges1 = SequenceNumberRanges$.MODULE$.apply(new SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67));
                StreamBlockId blockId1 = new StreamBlockId(kinesisStream.id(), 123L);
                ReceivedBlockInfo blockInfo1 = new ReceivedBlockInfo(0, (Option)None$.MODULE$, (Option)new Some((Object)seqNumRanges1), (ReceivedBlockStoreResult)new BlockManagerBasedStoreResult(blockId1, (Option)None$.MODULE$));
                SequenceNumberRanges seqNumRanges2 = SequenceNumberRanges$.MODULE$.apply(new SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89));
                StreamBlockId blockId2 = new StreamBlockId(kinesisStream.id(), 345L);
                ReceivedBlockInfo blockInfo2 = new ReceivedBlockInfo(0, (Option)None$.MODULE$, (Option)new Some((Object)seqNumRanges2), (ReceivedBlockStoreResult)new BlockManagerBasedStoreResult(blockId2, (Option)None$.MODULE$));
                Seq blockInfos = (Seq)new .colon.colon((Object)blockInfo1, (List)new .colon.colon((Object)blockInfo2, (List)Nil$.MODULE$));
                KinesisBackedBlockRDD kinesisRDD = (KinesisBackedBlockRDD)kinesisStream.createBlockRDD(time, blockInfos);
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToLong((long)kinesisRDD.kinesisReadConfigs().retryWaitTimeMs()));
                int $org_scalatest_assert_macro_right = 2000;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 268));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)kinesisRDD.kinesisReadConfigs().maxRetries()));
                int $org_scalatest_assert_macro_right2 = 5;
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 269));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left3 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)kinesisRDD.kinesisReadConfigs().retryTimeoutMs()));
                long $org_scalatest_assert_macro_right3 = this.batchDuration().milliseconds();
                Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left3, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right3), $org_scalatest_assert_macro_left3.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right3), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 270));
            }
            finally {
                this.ssc().sc().conf().remove(KinesisReadConfigurations$.MODULE$.RETRY_WAIT_TIME_KEY());
                this.ssc().sc().conf().remove(KinesisReadConfigurations$.MODULE$.RETRY_MAX_ATTEMPTS_KEY());
                this.ssc().stop(false);
            }
            return assertion;
        }, new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 232));
        this.testIfEnabled("split and merge shards in a stream", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String localAppName = new StringBuilder(19).append("KinesisStreamSuite-").append(package$.MODULE$.abs(Random$.MODULE$.nextLong())).toString();
            KPLBasedKinesisTestUtils localTestUtils = new KPLBasedKinesisTestUtils(1);
            localTestUtils.createStream();
            try {
                Seq mergedCloseShards;
                Seq splitCloseShards;
                KinesisInputDStream stream = KinesisInputDStream$.MODULE$.builder().streamingContext(this.ssc()).checkpointAppName(localAppName).streamName(localTestUtils.streamName()).endpointUrl(localTestUtils.endpointUrl()).regionName(localTestUtils.regionName()).initialPosition((KinesisInitialPosition)new KinesisInitialPositions.Latest()).checkpointInterval(Seconds$.MODULE$.apply(10L)).storageLevel(StorageLevel$.MODULE$.MEMORY_ONLY()).build();
                HashSet collected = new HashSet();
                stream.map((Function1 & Serializable & scala.Serializable)bytes -> BoxesRunTime.boxToInteger((int)KinesisStreamTests.$anonfun$new$22(bytes)), ClassTag$.MODULE$.Int()).foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                    KinesisStreamTests.$anonfun$new$23(this, collected, rdd);
                    return BoxedUnit.UNIT;
                });
                this.ssc().start();
                Range.Inclusive testData1 = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10);
                Range.Inclusive testData2 = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(11), 20);
                Range.Inclusive testData3 = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(21), 30);
                this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).minute())), this.interval(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(10)).seconds())), (Function0 & Serializable & scala.Serializable)() -> {
                    Assertion assertion;
                    localTestUtils.pushData((Seq)testData1, $this.aggregateTestData);
                    HashSet hashSet = collected;
                    synchronized (hashSet) {
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(collected);
                        scala.collection.immutable.Set $org_scalatest_assert_macro_right = testData1.toSet();
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                        assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"\nData received does not match data sent", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 311));
                    }
                    return assertion;
                }, Retrying$.MODULE$.retryingNatureOfT(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 308));
                Shard shardToSplit = (Shard)localTestUtils.getShards().head();
                localTestUtils.splitShard(shardToSplit.getShardId());
                Tuple2 tuple2 = localTestUtils.getShards().partition((Function1 & Serializable & scala.Serializable)shard -> BoxesRunTime.boxToBoolean((boolean)KinesisStreamTests.$anonfun$new$26(shard)));
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                Seq splitOpenShards = (Seq)tuple2._1();
                Seq splitCloseShards2 = (Seq)tuple2._2();
                Tuple2 tuple22 = new Tuple2((Object)splitOpenShards, (Object)splitCloseShards2);
                Tuple2 tuple23 = tuple22;
                Seq splitOpenShards2 = (Seq)tuple23._1();
                Seq $org_scalatest_assert_macro_left = splitCloseShards = (Seq)tuple23._2();
                int $org_scalatest_assert_macro_right = 1;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.lengthSizeMacroBool((Object)$org_scalatest_assert_macro_left, "size", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left.size()), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 322));
                Seq $org_scalatest_assert_macro_left2 = splitOpenShards2;
                int $org_scalatest_assert_macro_right2 = 2;
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.lengthSizeMacroBool((Object)$org_scalatest_assert_macro_left2, "size", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left2.size()), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 323));
                this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).minute())), this.interval(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(10)).seconds())), (Function0 & Serializable & scala.Serializable)() -> {
                    Assertion assertion;
                    localTestUtils.pushData((Seq)testData2, $this.aggregateTestData);
                    HashSet hashSet = collected;
                    synchronized (hashSet) {
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(collected);
                        scala.collection.immutable.Set $org_scalatest_assert_macro_right = ((TraversableOnce)testData1.$plus$plus((GenTraversableOnce)testData2, IndexedSeq$.MODULE$.canBuildFrom())).toSet();
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                        assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"\nData received does not match data sent after splitting a shard", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 328));
                    }
                    return assertion;
                }, Retrying$.MODULE$.retryingNatureOfT(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 325));
                Seq seq = splitOpenShards2;
                Some some = Seq$.MODULE$.unapplySeq(seq);
                if (some.isEmpty() || some.get() == null || ((SeqLike)some.get()).lengthCompare(2) != 0) {
                    throw new MatchError((Object)seq);
                }
                Shard shardToMerge = (Shard)((SeqLike)some.get()).apply(0);
                Shard adjShard = (Shard)((SeqLike)some.get()).apply(1);
                Tuple2 tuple24 = new Tuple2((Object)shardToMerge, (Object)adjShard);
                Tuple2 tuple25 = tuple24;
                Shard shardToMerge2 = (Shard)tuple25._1();
                Shard adjShard2 = (Shard)tuple25._2();
                localTestUtils.mergeShard(shardToMerge2.getShardId(), adjShard2.getShardId());
                Tuple2 tuple26 = localTestUtils.getShards().partition((Function1 & Serializable & scala.Serializable)shard -> BoxesRunTime.boxToBoolean((boolean)KinesisStreamTests.$anonfun$new$28(shard)));
                if (tuple26 == null) {
                    throw new MatchError((Object)tuple26);
                }
                Seq mergedOpenShards = (Seq)tuple26._1();
                Seq mergedCloseShards2 = (Seq)tuple26._2();
                Tuple2 tuple27 = new Tuple2((Object)mergedOpenShards, (Object)mergedCloseShards2);
                Tuple2 tuple28 = tuple27;
                Seq mergedOpenShards2 = (Seq)tuple28._1();
                Seq $org_scalatest_assert_macro_left3 = mergedCloseShards = (Seq)tuple28._2();
                int $org_scalatest_assert_macro_right3 = 3;
                Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.lengthSizeMacroBool((Object)$org_scalatest_assert_macro_left3, "size", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left3.size()), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 340));
                Seq $org_scalatest_assert_macro_left4 = mergedOpenShards2;
                int $org_scalatest_assert_macro_right4 = 1;
                Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.lengthSizeMacroBool((Object)$org_scalatest_assert_macro_left4, "size", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left4.size()), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 341));
                this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).minute())), this.interval(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(10)).seconds())), (Function0 & Serializable & scala.Serializable)() -> {
                    Assertion assertion;
                    localTestUtils.pushData((Seq)testData3, $this.aggregateTestData);
                    HashSet hashSet = collected;
                    synchronized (hashSet) {
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(collected);
                        scala.collection.immutable.Set $org_scalatest_assert_macro_right = ((TraversableOnce)((TraversableLike)testData1.$plus$plus((GenTraversableOnce)testData2, IndexedSeq$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce)testData3, IndexedSeq$.MODULE$.canBuildFrom())).toSet();
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                        assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"\nData received does not match data sent after merging shards", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 346));
                    }
                    return assertion;
                }, Retrying$.MODULE$.retryingNatureOfT(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 343));
            }
            finally {
                this.ssc().stop(false);
                localTestUtils.deleteStream();
                localTestUtils.deleteDynamoDBTable(localAppName);
            }
        });
        this.testIfEnabled("failure recovery", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass().getSimpleName());
            String checkpointDir = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2()).getAbsolutePath();
            this.ssc_$eq(new StreamingContext(this.sc(), Milliseconds$.MODULE$.apply(1000L)));
            this.ssc().checkpoint(checkpointDir);
            HashMap collectedData = new HashMap();
            KinesisInputDStream kinesisStream = KinesisInputDStream$.MODULE$.builder().streamingContext(this.ssc()).checkpointAppName(this.appName()).streamName(this.testUtils().streamName()).endpointUrl(this.testUtils().endpointUrl()).regionName(this.testUtils().regionName()).initialPosition((KinesisInitialPosition)new KinesisInitialPositions.Latest()).checkpointInterval(Seconds$.MODULE$.apply(10L)).storageLevel(StorageLevel$.MODULE$.MEMORY_ONLY()).build();
            kinesisStream.foreachRDD((Function2 & Serializable & scala.Serializable)(rdd, time) -> {
                KinesisStreamTests.$anonfun$new$31(collectedData, rdd, time);
                return BoxedUnit.UNIT;
            });
            this.ssc().remember(Minutes$.MODULE$.apply(60L));
            this.ssc().start();
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minutes())), this.interval(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).second())), (Function0 & Serializable & scala.Serializable)() -> {
                Bool bool;
                this.testUtils().pushData((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5), $this.aggregateTestData);
                Bool $org_scalatest_assert_macro_left = Bool$.MODULE$.simpleMacroBool(KinesisStreamTests.isCheckpointPresent$1(checkpointDir), "isCheckpointPresent", Prettifier$.MODULE$.default());
                if ($org_scalatest_assert_macro_left.value()) {
                    int $org_scalatest_assert_macro_left2 = KinesisStreamTests.numBatchesWithData$1(collectedData);
                    int $org_scalatest_assert_macro_right = 10;
                    bool = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left2), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left2 > $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                } else {
                    bool = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
                }
                Bool $org_scalatest_assert_macro_right = bool;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "&&", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$amp$amp((Function0 & Serializable & scala.Serializable)() -> $org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 398));
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 396));
            this.ssc().stop(true);
            this.logInfo((Function0 & Serializable & scala.Serializable)() -> "Restarting from checkpoint");
            this.ssc_$eq(new StreamingContext(checkpointDir));
            this.ssc().start();
            InputDStream recoveredKinesisStream = (InputDStream)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.ssc().graph().getInputStreams())).head();
            HashMap hashMap = collectedData;
            synchronized (hashMap) {
                Set times = collectedData.keySet();
                times.foreach((Function1 & Serializable & scala.Serializable)time -> {
                    Tuple2 tuple2 = (Tuple2)collectedData.apply(time);
                    if (tuple2 == null) {
                        throw new MatchError((Object)tuple2);
                    }
                    SequenceNumberRanges[] arrayOfSeqNumRanges = (SequenceNumberRanges[])tuple2._1();
                    Seq data = (Seq)tuple2._2();
                    Tuple2 tuple22 = new Tuple2((Object)arrayOfSeqNumRanges, (Object)data);
                    Tuple2 tuple23 = tuple22;
                    SequenceNumberRanges[] arrayOfSeqNumRanges2 = (SequenceNumberRanges[])tuple23._1();
                    Seq data2 = (Seq)tuple23._2();
                    RDD rdd = (RDD)recoveredKinesisStream.getOrCompute(time).get();
                    Matchers.AnyShouldWrapper $org_scalatest_type_matcher_macro_left = Matchers$.MODULE$.convertToAnyShouldWrapper((Object)rdd, new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 415), Prettifier$.MODULE$.default());
                    TypeMatcherHelper$.MODULE$.assertAType($org_scalatest_type_matcher_macro_left.leftSideValue(), Matchers$.MODULE$.a(ClassTag$.MODULE$.apply(KinesisBackedBlockRDD.class)), $org_scalatest_type_matcher_macro_left.prettifier(), $org_scalatest_type_matcher_macro_left.pos());
                    KinesisBackedBlockRDD kRdd = (KinesisBackedBlockRDD)rdd;
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])kRdd.arrayOfseqNumberRanges())).size()));
                    int $org_scalatest_assert_macro_right = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])arrayOfSeqNumRanges2)).size();
                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                    Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 419));
                    new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])arrayOfSeqNumRanges2)).zip((GenIterable)Predef$.MODULE$.wrapRefArray((Object[])kRdd.arrayOfseqNumberRanges()), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                        Tuple2 tuple2 = x0$1;
                        if (tuple2 == null) {
                            throw new MatchError((Object)tuple2);
                        }
                        SequenceNumberRanges expected = (SequenceNumberRanges)tuple2._1();
                        SequenceNumberRanges found = (SequenceNumberRanges)tuple2._2();
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(expected.ranges().toSeq());
                        Seq $org_scalatest_assert_macro_right = found.ranges().toSeq();
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                        Assertion assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 421));
                        return assertion;
                    });
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps((int[])rdd.map((Function1 & Serializable & scala.Serializable)bytes -> BoxesRunTime.boxToInteger((int)KinesisStreamTests.$anonfun$new$39(bytes)), ClassTag$.MODULE$.Int()).collect())).toSeq());
                    Seq $org_scalatest_assert_macro_right2 = data2;
                    Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)$org_scalatest_assert_macro_right2, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                    return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("KinesisStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 425));
                });
            }
            StreamingContext qual$1 = this.ssc();
            boolean x$1 = qual$1.stop$default$1();
            qual$1.stop(x$1);
        });
    }
}

