/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.kafka010;

import java.io.File;
import java.io.Serializable;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.kafka010.KafkaBatchInputPartition;
import org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase;
import org.apache.spark.sql.kafka010.KafkaMicroBatchStream;
import org.apache.spark.sql.kafka010.KafkaMicroBatchStream$;
import org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite$;
import org.apache.spark.sql.kafka010.KafkaSourceOffset;
import org.apache.spark.sql.kafka010.KafkaSourceProvider;
import org.apache.spark.sql.kafka010.KafkaSourceProvider$;
import org.apache.spark.sql.kafka010.KafkaTestUtils;
import org.apache.spark.sql.kafka010.RecordBuilder;
import org.apache.spark.sql.streaming.SourceProgress;
import org.apache.spark.sql.streaming.StreamTest;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.Tag;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.MapLike;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.Symbols;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001Y1AAA\u0002\u0001\u001d!)1\u0003\u0001C\u0001)\ta2*\u00194lC6K7M]8CCR\u001c\u0007N\u0016\u001aT_V\u00148-Z*vSR,'B\u0001\u0003\u0006\u0003!Y\u0017MZ6baE\u0002$B\u0001\u0004\b\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u0011%\tQa\u001d9be.T!AC\u0006\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005a\u0011aA8sO\u000e\u00011C\u0001\u0001\u0010!\t\u0001\u0012#D\u0001\u0004\u0013\t\u00112A\u0001\u0010LC\u001a\\\u0017-T5de>\u0014\u0015\r^2i'>,(oY3Tk&$XMQ1tK\u00061A(\u001b8jiz\"\u0012!\u0006\t\u0003!\u0001\u0001")
public class KafkaMicroBatchV2SourceSuite
extends KafkaMicroBatchSourceSuiteBase {
    public static final /* synthetic */ boolean $anonfun$new$133(LogicalPlan x0$1) {
        boolean bl;
        LogicalPlan logicalPlan = x0$1;
        if (logicalPlan instanceof StreamingDataSourceV2Relation) {
            StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)logicalPlan;
            bl = streamingDataSourceV2Relation.stream() instanceof KafkaMicroBatchStream;
        } else {
            bl = false;
        }
        return bl;
    }

    public static final /* synthetic */ boolean $anonfun$new$132(StreamExecution query) {
        return query.logicalPlan().exists((Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)KafkaMicroBatchV2SourceSuite.$anonfun$new$133(x0$1)));
    }

    public static final /* synthetic */ void $anonfun$new$135(KafkaMicroBatchV2SourceSuite $this, String topic$10, String minPartitions$1, TopicPartition tp$1, int numPartitionsGenerated$1, File dir) {
        KafkaSourceProvider provider = new KafkaSourceProvider();
        scala.collection.immutable.Map options = ((MapLike)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"kafka.bootstrap.servers"), (Object)$this.testUtils().brokerAddress()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"subscribe"), (Object)topic$10)}))).$plus$plus((GenTraversableOnce)Option$.MODULE$.option2Iterable(Option$.MODULE$.apply((Object)minPartitions$1).map((Function1 & Serializable & scala.Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"minPartitions"), p))));
        CaseInsensitiveStringMap dsOptions = new CaseInsensitiveStringMap((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)options).asJava());
        KafkaSourceProvider.KafkaTable table = provider.getTable(dsOptions);
        MicroBatchStream stream = table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath());
        KafkaBatchInputPartition[] inputPartitions = (KafkaBatchInputPartition[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])stream.planInputPartitions((Offset)new KafkaSourceOffset((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp$1), (Object)BoxesRunTime.boxToLong((long)0L))}))), (Offset)new KafkaSourceOffset((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp$1), (Object)BoxesRunTime.boxToLong((long)100L))})))))).map((Function1 & Serializable & scala.Serializable)x$44 -> (KafkaBatchInputPartition)x$44, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(KafkaBatchInputPartition.class)));
        $this.withClue(new StringBuilder(39).append("minPartitions = ").append(minPartitions$1).append(" generated factories ").append(inputPartitions).append("\n\t").toString(), (Function0 & Serializable & scala.Serializable)() -> {
            Object[] $org_scalatest_assert_macro_left = Predef$.MODULE$.refArrayOps((Object[])inputPartitions);
            int $org_scalatest_assert_macro_right = numPartitionsGenerated$1;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.lengthSizeMacroBool((Object)new ArrayOps.ofRef($org_scalatest_assert_macro_left), "size", (Object)BoxesRunTime.boxToInteger((int)new ArrayOps.ofRef($org_scalatest_assert_macro_left).size()), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1452));
        });
    }

    private final void test$1(String minPartitions, int numPartitionsGenerated, boolean reusesConsumers, String topic$10, TopicPartition tp$1) {
        SparkSession$.MODULE$.setActiveSession(this.spark());
        this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable & scala.Serializable)dir -> {
            KafkaMicroBatchV2SourceSuite.$anonfun$new$135(this, topic$10, minPartitions, tp$1, numPartitionsGenerated, dir);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ String $anonfun$new$144(int x$45) {
        return Integer.toString(x$45);
    }

    public static final /* synthetic */ String $anonfun$new$145(int x$46) {
        return Integer.toString(x$46);
    }

    public static final /* synthetic */ String $anonfun$new$146(int x$47) {
        return Integer.toString(x$47);
    }

    public static final /* synthetic */ String $anonfun$new$147(int x$48) {
        return Integer.toString(x$48);
    }

    public static final /* synthetic */ String $anonfun$new$148(int x$49) {
        return Integer.toString(x$49);
    }

    public static final /* synthetic */ ProducerRecord $anonfun$new$149(String topic$11, Seq headers$1, int num) {
        return new RecordBuilder(topic$11, Integer.toString(num)).partition(num - 31).headers((Seq<Tuple2<String, byte[]>>)headers$1).build();
    }

    public static final /* synthetic */ int $anonfun$new$151(Tuple2 kv) {
        return new StringOps(Predef$.MODULE$.augmentString((String)kv._2())).toInt() + 1;
    }

    public static final /* synthetic */ String $anonfun$new$153(int x$50) {
        return Integer.toString(x$50);
    }

    public static final /* synthetic */ String $anonfun$new$154(int x$51) {
        return Integer.toString(x$51);
    }

    public static final /* synthetic */ int $anonfun$new$155(String x$52) {
        return new StringOps(Predef$.MODULE$.augmentString(x$52)).toInt();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static final /* synthetic */ boolean $anonfun$new$158(SourceProgress progress) {
        if (progress.metrics().isEmpty()) return false;
        StringOps stringOps = new StringOps(Predef$.MODULE$.augmentString((String)progress.metrics().get("avgOffsetsBehindLatest")));
        if (!(stringOps.toDouble() > 0.0)) return false;
        return true;
    }

    public static final /* synthetic */ String $anonfun$new$160(int x$54) {
        return Integer.toString(x$54);
    }

    public static final /* synthetic */ String $anonfun$new$161(int x$55) {
        return Integer.toString(x$55);
    }

    public static final /* synthetic */ int $anonfun$new$162(String x$56) {
        return new StringOps(Predef$.MODULE$.augmentString(x$56)).toInt();
    }

    public KafkaMicroBatchV2SourceSuite() {
        this.test("V2 Source is used by default", (Seq<Tag>)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0<Object>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = this.newTopic();
            KafkaTestUtils qual$1 = this.testUtils();
            String x$1 = topic;
            int x$2 = 5;
            boolean x$3 = qual$1.createTopic$default$3();
            qual$1.createTopic(x$1, x$2, x$3);
            Dataset kafka = this.spark().readStream().format("kafka").option("kafka.bootstrap.servers", this.testUtils().brokerAddress()).option("kafka.metadata.max.age.ms", "1").option("subscribePattern", new StringBuilder(2).append(topic).append(".*").toString()).load();
            this.testStream(kafka, this.testStream$default$2(), (Seq<StreamTest.StreamAction>)Predef$.MODULE$.wrapRefArray((Object[])new StreamTest.StreamAction[]{this.makeSureGetOffsetCalled(), this.AssertOnQuery().apply((Function1 & Serializable & scala.Serializable)query -> BoxesRunTime.boxToBoolean((boolean)KafkaMicroBatchV2SourceSuite.$anonfun$new$132(query)), this.AssertOnQuery().apply$default$2())}));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1404));
        this.testWithUninterruptibleThread("minPartitions is supported", this.testWithUninterruptibleThread$default$2(), (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = this.newTopic();
            TopicPartition tp = new TopicPartition(topic, 0);
            KafkaTestUtils qual$2 = this.testUtils();
            String x$4 = topic;
            int x$5 = 1;
            boolean x$6 = qual$2.createTopic$default$3();
            qual$2.createTopic(x$4, x$5, x$6);
            this.test$1(null, 1, true, topic, tp);
            this.test$1("1", 1, true, topic, tp);
            this.test$1("4", 4, false, topic, tp);
            this.intercept((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.test$1("a", 1, true, topic, tp), ClassTag$.MODULE$.apply(IllegalArgumentException.class), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1463));
            this.intercept((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.test$1("1.0", 1, true, topic, tp), ClassTag$.MODULE$.apply(IllegalArgumentException.class), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1464));
            this.intercept((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.test$1("0", 1, true, topic, tp), ClassTag$.MODULE$.apply(IllegalArgumentException.class), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1465));
            this.intercept((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.test$1("-1", 1, true, topic, tp), ClassTag$.MODULE$.apply(IllegalArgumentException.class), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1466));
        });
        this.test("default config of includeHeader doesn't break existing query from Spark 2.4", (Seq<Tag>)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0<Object>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62";
            this.testUtils().createTopic(topic, 5, true);
            this.testUtils().sendMessages(topic, (String[])new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{-20, -21, -22})).map((Function1 & Serializable & scala.Serializable)x$45 -> KafkaMicroBatchV2SourceSuite.$anonfun$new$144(BoxesRunTime.unboxToInt((Object)x$45)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), (Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)0)));
            this.testUtils().sendMessages(topic, (String[])new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{-10, -11, -12})).map((Function1 & Serializable & scala.Serializable)x$46 -> KafkaMicroBatchV2SourceSuite.$anonfun$new$145(BoxesRunTime.unboxToInt((Object)x$46)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), (Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)1)));
            this.testUtils().sendMessages(topic, (String[])new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{0, 1, 2})).map((Function1 & Serializable & scala.Serializable)x$47 -> KafkaMicroBatchV2SourceSuite.$anonfun$new$146(BoxesRunTime.unboxToInt((Object)x$47)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), (Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)2)));
            this.testUtils().sendMessages(topic, (String[])new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{10, 11, 12})).map((Function1 & Serializable & scala.Serializable)x$48 -> KafkaMicroBatchV2SourceSuite.$anonfun$new$147(BoxesRunTime.unboxToInt((Object)x$48)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), (Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)3)));
            this.testUtils().sendMessages(topic, (String[])new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{20, 21, 22})).map((Function1 & Serializable & scala.Serializable)x$49 -> KafkaMicroBatchV2SourceSuite.$anonfun$new$148(BoxesRunTime.unboxToInt((Object)x$49)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), (Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)4)));
            Predef$.MODULE$.require(this.convertToEqualizer(BoxesRunTime.boxToInteger((int)this.testUtils().getLatestOffsets((scala.collection.immutable.Set<String>)((scala.collection.immutable.Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic})))).size())).$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)5), Equality$.MODULE$.default()));
            Seq headers = (Seq)new .colon.colon((Object)new Tuple2((Object)"a", (Object)"b".getBytes(StandardCharsets.UTF_8)), (List)new .colon.colon((Object)new Tuple2((Object)"c", (Object)"d".getBytes(StandardCharsets.UTF_8)), (List)Nil$.MODULE$));
            ((IterableLike)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(31), 35).map((Function1 & Serializable & scala.Serializable)num -> KafkaMicroBatchV2SourceSuite.$anonfun$new$149(topic, headers, BoxesRunTime.unboxToInt((Object)num)), IndexedSeq$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)rec -> this.testUtils().sendMessage((ProducerRecord<String, String>)rec));
            Dataset kafka = this.spark().readStream().format("kafka").option("kafka.bootstrap.servers", this.testUtils().brokerAddress()).option("kafka.metadata.max.age.ms", "1").option("subscribePattern", topic).option("startingOffsets", "earliest").load();
            JavaUniverse $u = package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(KafkaMicroBatchV2SourceSuite.class.getClassLoader());
            public final class Org_apache_spark_sql_kafka010_KafkaMicroBatchV2SourceSuite$$typecreator5$2
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.Tuple2"), (List)new .colon.colon((Object)$u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($m.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$), (List)new .colon.colon((Object)$u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($m.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$), (List)Nil$.MODULE$)));
                }

                public Org_apache_spark_sql_kafka010_KafkaMicroBatchV2SourceSuite$$typecreator5$2(KafkaMicroBatchV2SourceSuite $outer) {
                }
            }
            Dataset query = kafka.dropDuplicates().selectExpr((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"CAST(key AS STRING)", "CAST(value AS STRING)"})).as(this.testImplicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Org_apache_spark_sql_kafka010_KafkaMicroBatchV2SourceSuite$$typecreator5$2(null)))).map((Function1 & Serializable & scala.Serializable)kv -> BoxesRunTime.boxToInteger((int)KafkaMicroBatchV2SourceSuite.$anonfun$new$151(kv)), this.testImplicits().newIntEncoder());
            URI resourceUri = this.getClass().getResource("/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/").toURI();
            File checkpointDir = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2()).getCanonicalFile();
            FileUtils.copyDirectory((File)new File(resourceUri), (File)checkpointDir);
            StreamTest.StreamAction[] streamActionArray = new StreamTest.StreamAction[3];
            String x$7 = checkpointDir.getAbsolutePath();
            Trigger x$8 = this.StartStream().apply$default$1();
            Clock x$9 = this.StartStream().apply$default$2();
            scala.collection.immutable.Map x$10 = this.StartStream().apply$default$3();
            streamActionArray[0] = new StreamTest.StartStream((StreamTest)this, x$8, x$9, x$10, x$7);
            streamActionArray[1] = this.makeSureGetOffsetCalled();
            streamActionArray[2] = this.CheckNewAnswer().apply((Object)BoxesRunTime.boxToInteger((int)32), (Seq)Predef$.MODULE$.wrapIntArray(new int[]{33, 34, 35, 36}), this.testImplicits().newIntEncoder());
            this.testStream(query, this.testStream$default$2(), (Seq<StreamTest.StreamAction>)Predef$.MODULE$.wrapRefArray((Object[])streamActionArray));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1469));
        this.test("test custom metrics - with rate limit", (Seq<Tag>)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0<Object>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = this.newTopic();
            Range.Inclusive data = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10);
            KafkaTestUtils qual$3 = this.testUtils();
            String x$11 = topic;
            int x$12 = 2;
            boolean x$13 = qual$3.createTopic$default$3();
            qual$3.createTopic(x$11, x$12, x$13);
            this.testUtils().sendMessages(topic, (String[])((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).map((Function1 & Serializable & scala.Serializable)x$50 -> KafkaMicroBatchV2SourceSuite.$anonfun$new$153(BoxesRunTime.unboxToInt((Object)x$50)), IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), (Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)0)));
            this.testUtils().sendMessages(topic, (String[])((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(6), 10).map((Function1 & Serializable & scala.Serializable)x$51 -> KafkaMicroBatchV2SourceSuite.$anonfun$new$154(BoxesRunTime.unboxToInt((Object)x$51)), IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), (Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)1)));
            Dataset kafka = this.spark().readStream().format("kafka").option("kafka.bootstrap.servers", this.testUtils().brokerAddress()).option("subscribe", topic).option("maxOffsetsPerTrigger", 1L).option(KafkaSourceProvider$.MODULE$.STARTING_OFFSETS_OPTION_KEY(), "earliest").load().selectExpr((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"CAST(value AS STRING)"})).as(this.testImplicits().newStringEncoder()).map((Function1 & Serializable & scala.Serializable)x$52 -> BoxesRunTime.boxToInteger((int)KafkaMicroBatchV2SourceSuite.$anonfun$new$155(x$52)), this.testImplicits().newIntEncoder());
            this.testStream(kafka, this.testStream$default$2(), (Seq<StreamTest.StreamAction>)Predef$.MODULE$.wrapRefArray((Object[])new StreamTest.StreamAction[]{new StreamTest.StartStream((StreamTest)this, this.StartStream().apply$default$1(), this.StartStream().apply$default$2(), this.StartStream().apply$default$3(), this.StartStream().apply$default$4()), this.makeSureGetOffsetCalled(), this.CheckAnswer().apply((Seq)data, this.testImplicits().newIntEncoder()), this.Execute().apply((Function1 & Serializable & scala.Serializable)query -> {
                Option progressWithDelay;
                Option $org_scalatest_assert_macro_left = progressWithDelay = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])query.recentProgress())).map((Function1 & Serializable & scala.Serializable)x$53 -> (SourceProgress)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x$53.sources())).head(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(SourceProgress.class))))).reverse())).find((Function1 & Serializable & scala.Serializable)progress -> BoxesRunTime.boxToBoolean((boolean)KafkaMicroBatchV2SourceSuite.$anonfun$new$158(progress)));
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "nonEmpty", $org_scalatest_assert_macro_left.nonEmpty(), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1558));
                java.util.Map metrics = ((SourceProgress)progressWithDelay.get()).metrics();
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(metrics.keySet());
                java.util.Set $org_scalatest_assert_macro_right = (java.util.Set)JavaConverters$.MODULE$.setAsJavaSetConverter((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"minOffsetsBehindLatest", "maxOffsetsBehindLatest", "avgOffsetsBehindLatest"}))).asJava();
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1560));
                long $org_scalatest_assert_macro_left3 = new StringOps(Predef$.MODULE$.augmentString((String)metrics.get("minOffsetsBehindLatest"))).toLong();
                int $org_scalatest_assert_macro_right2 = 0;
                Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left3), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left3 > (long)$org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1564));
                long $org_scalatest_assert_macro_left4 = new StringOps(Predef$.MODULE$.augmentString((String)metrics.get("maxOffsetsBehindLatest"))).toLong();
                int $org_scalatest_assert_macro_right3 = 0;
                Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left4), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), $org_scalatest_assert_macro_left4 > (long)$org_scalatest_assert_macro_right3, Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1565));
                double $org_scalatest_assert_macro_left5 = new StringOps(Predef$.MODULE$.augmentString((String)metrics.get("avgOffsetsBehindLatest"))).toDouble();
                int $org_scalatest_assert_macro_right4 = 0;
                Bool $org_scalatest_assert_macro_expr5 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToDouble((double)$org_scalatest_assert_macro_left5), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), $org_scalatest_assert_macro_left5 > (double)$org_scalatest_assert_macro_right4, Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr5, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1566));
            })}));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1527));
        this.test("test custom metrics - no rate limit", (Seq<Tag>)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0<Object>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = this.newTopic();
            Range.Inclusive data = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10);
            KafkaTestUtils qual$4 = this.testUtils();
            String x$14 = topic;
            int x$15 = 2;
            boolean x$16 = qual$4.createTopic$default$3();
            qual$4.createTopic(x$14, x$15, x$16);
            this.testUtils().sendMessages(topic, (String[])((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).map((Function1 & Serializable & scala.Serializable)x$54 -> KafkaMicroBatchV2SourceSuite.$anonfun$new$160(BoxesRunTime.unboxToInt((Object)x$54)), IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), (Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)0)));
            this.testUtils().sendMessages(topic, (String[])((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(6), 10).map((Function1 & Serializable & scala.Serializable)x$55 -> KafkaMicroBatchV2SourceSuite.$anonfun$new$161(BoxesRunTime.unboxToInt((Object)x$55)), IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), (Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)1)));
            Dataset kafka = this.spark().readStream().format("kafka").option("kafka.bootstrap.servers", this.testUtils().brokerAddress()).option("subscribe", topic).option(KafkaSourceProvider$.MODULE$.STARTING_OFFSETS_OPTION_KEY(), "earliest").load().selectExpr((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"CAST(value AS STRING)"})).as(this.testImplicits().newStringEncoder()).map((Function1 & Serializable & scala.Serializable)x$56 -> BoxesRunTime.boxToInteger((int)KafkaMicroBatchV2SourceSuite.$anonfun$new$162(x$56)), this.testImplicits().newIntEncoder());
            this.testStream(kafka, this.testStream$default$2(), (Seq<StreamTest.StreamAction>)Predef$.MODULE$.wrapRefArray((Object[])new StreamTest.StreamAction[]{new StreamTest.StartStream((StreamTest)this, this.StartStream().apply$default$1(), this.StartStream().apply$default$2(), this.StartStream().apply$default$3(), this.StartStream().apply$default$4()), this.makeSureGetOffsetCalled(), this.CheckAnswer().apply((Seq)data, this.testImplicits().newIntEncoder()), this.Execute().apply((Function1 & Serializable & scala.Serializable)query -> {
                Option progress;
                Option $org_scalatest_assert_macro_left = progress = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])query.recentProgress())).map((Function1 & Serializable & scala.Serializable)x$57 -> (SourceProgress)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x$57.sources())).head(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(SourceProgress.class))))).lastOption();
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "nonEmpty", $org_scalatest_assert_macro_left.nonEmpty(), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1597));
                java.util.Map metrics = ((SourceProgress)progress.get()).metrics();
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(metrics.keySet());
                java.util.Set $org_scalatest_assert_macro_right = (java.util.Set)JavaConverters$.MODULE$.setAsJavaSetConverter((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"minOffsetsBehindLatest", "maxOffsetsBehindLatest", "avgOffsetsBehindLatest"}))).asJava();
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1600));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left3 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)new StringOps(Predef$.MODULE$.augmentString((String)metrics.get("minOffsetsBehindLatest"))).toLong()));
                int $org_scalatest_assert_macro_right2 = 0;
                Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left3, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left3.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1604));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left4 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)new StringOps(Predef$.MODULE$.augmentString((String)metrics.get("maxOffsetsBehindLatest"))).toLong()));
                int $org_scalatest_assert_macro_right3 = 0;
                Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left4, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), $org_scalatest_assert_macro_left4.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1605));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left5 = this.convertToEqualizer(BoxesRunTime.boxToDouble((double)new StringOps(Predef$.MODULE$.augmentString((String)metrics.get("avgOffsetsBehindLatest"))).toDouble()));
                int $org_scalatest_assert_macro_right4 = 0;
                Bool $org_scalatest_assert_macro_expr5 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left5, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), $org_scalatest_assert_macro_left5.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr5, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1606));
            })}));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1571));
        this.test("test custom metrics - corner cases", (Seq<Tag>)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0<Object>)(Function0 & Serializable & scala.Serializable)() -> {
            TopicPartition topicPartition1 = new TopicPartition(this.newTopic(), 0);
            TopicPartition topicPartition2 = new TopicPartition(this.newTopic(), 0);
            scala.collection.immutable.Map latestOffset = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)topicPartition1, (Object)BoxesRunTime.boxToLong((long)3L)), new Tuple2((Object)topicPartition2, (Object)BoxesRunTime.boxToLong((long)6L))}));
            java.util.Map $org_scalatest_assert_macro_left = KafkaMicroBatchStream$.MODULE$.metrics(Optional.ofNullable(null), latestOffset);
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty(), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1617));
            KafkaSourceOffset offset = new KafkaSourceOffset((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)topicPartition1, (Object)BoxesRunTime.boxToLong((long)1L)), new Tuple2((Object)topicPartition2, (Object)BoxesRunTime.boxToLong((long)2L))})));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(KafkaMicroBatchStream$.MODULE$.metrics(Optional.ofNullable(offset), latestOffset));
            java.util.Map $org_scalatest_assert_macro_right = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"minOffsetsBehindLatest"), (Object)"2"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"maxOffsetsBehindLatest"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"avgOffsetsBehindLatest"), (Object)"3.0")}))).asJava();
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1622));
            java.util.Map $org_scalatest_assert_macro_left3 = KafkaMicroBatchStream$.MODULE$.metrics(Optional.ofNullable(offset), null);
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left3, "isEmpty", $org_scalatest_assert_macro_left3.isEmpty(), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1630));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1611));
    }
}

