/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.kafka010;

import java.io.File;
import java.io.Serializable;
import org.apache.spark.TestUtils$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.execution.streaming.MemoryStream;
import org.apache.spark.sql.execution.streaming.MemoryStreamBase;
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream;
import org.apache.spark.sql.kafka010.KafkaSinkStreamingSuiteBase$;
import org.apache.spark.sql.kafka010.KafkaSinkSuiteBase;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.Tag;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.Span;
import org.scalatest.time.SpanSugar$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.math.Ordering;
import scala.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.Symbols;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\u0005Ef!B\b\u0011\u0003\u0003Y\u0002\"\u0002\u0011\u0001\t\u0003\t\u0003bB\u0012\u0001\u0005\u0004%\t\u0002\n\u0005\u0007[\u0001\u0001\u000b\u0011B\u0013\t\u000b9\u0002a\u0011C\u0018\t\u000b\u0015\u0003a\u0011\u0003$\t\u000ba\u0003a\u0011C-\t\u000b\u0001\u0004A\u0011C1\t\u0013\u0005=\u0001!%A\u0005\u0012\u0005E\u0001\"CA\u0014\u0001E\u0005I\u0011CA\u0015\u0011%\ti\u0003AI\u0001\n#\ty\u0003C\u0004\u00024\u0001!I!!\u000e\t\u000f\u00055\u0003\u0001\"\u0003\u0002P!9\u00111\u0013\u0001\u0005\n\u0005U\u0005bBAS\u0001\u0011%\u0011q\u0015\u0002\u001c\u0017\u000647.Y*j].\u001cFO]3b[&twmU;ji\u0016\u0014\u0015m]3\u000b\u0005E\u0011\u0012\u0001C6bM.\f\u0007'\r\u0019\u000b\u0005M!\u0012aA:rY*\u0011QCF\u0001\u0006gB\f'o\u001b\u0006\u0003/a\ta!\u00199bG\",'\"A\r\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001a\u0002CA\u000f\u001f\u001b\u0005\u0001\u0012BA\u0010\u0011\u0005IY\u0015MZ6b'&t7nU;ji\u0016\u0014\u0015m]3\u0002\rqJg.\u001b;?)\u0005\u0011\u0003CA\u000f\u0001\u0003A\u0019HO]3b[&tw\rV5nK>,H/F\u0001&!\t13&D\u0001(\u0015\tA\u0013&\u0001\u0003uS6,'B\u0001\u0016\u0019\u0003%\u00198-\u00197bi\u0016\u001cH/\u0003\u0002-O\t!1\u000b]1o\u0003E\u0019HO]3b[&tw\rV5nK>,H\u000fI\u0001\u0013GJ,\u0017\r^3NK6|'/_*ue\u0016\fW\u000eF\u00011!\r\td\u0007O\u0007\u0002e)\u00111\u0007N\u0001\ngR\u0014X-Y7j]\u001eT!!\u000e\n\u0002\u0013\u0015DXmY;uS>t\u0017BA\u001c3\u0005AiU-\\8ssN#(/Z1n\u0005\u0006\u001cX\r\u0005\u0002:\u0005:\u0011!\b\u0011\t\u0003wyj\u0011\u0001\u0010\u0006\u0003{i\ta\u0001\u0010:p_Rt$\"A \u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0005s\u0014A\u0002)sK\u0012,g-\u0003\u0002D\t\n11\u000b\u001e:j]\u001eT!!\u0011 \u0002\u0019Y,'/\u001b4z%\u0016\u001cX\u000f\u001c;\u0015\u0005\u001d\u000bFC\u0001%M!\tI%*D\u0001?\u0013\tYeH\u0001\u0003V]&$\bBB'\u0006\t\u0003\u0007a*\u0001\u0005wKJLg-\u001f$o!\rIu\nS\u0005\u0003!z\u0012\u0001\u0002\u00102z]\u0006lWM\u0010\u0005\u0006%\u0016\u0001\raU\u0001\u0007oJLG/\u001a:\u0011\u0005Q3V\"A+\u000b\u0005M\u0012\u0012BA,V\u00059\u0019FO]3b[&tw-U;fef\fa\u0002Z3gCVdG\u000f\u0016:jO\u001e,'/F\u0001[!\rI5,X\u0005\u00039z\u0012aa\u00149uS>t\u0007C\u0001+_\u0013\tyVKA\u0004Ue&<w-\u001a:\u0002#\r\u0014X-\u0019;f\u0017\u000647.Y,sSR,'\u000f\u0006\u0004cQfd\u0018Q\u0001\u000b\u0003'\u000eDQ\u0001Z\u0004A\u0002\u0015\fab^5uQN+G.Z2u\u000bb\u0004(\u000fE\u0002JMbJ!a\u001a \u0003\u0015q\u0012X\r]3bi\u0016$g\bC\u0003j\u000f\u0001\u0007!.A\u0003j]B,H\u000f\u0005\u0002lm:\u0011A\u000e\u001e\b\u0003[Nt!A\u001c:\u000f\u0005=\fhBA\u001eq\u0013\u0005I\u0012BA\f\u0019\u0013\t)b#\u0003\u0002\u0014)%\u0011QOE\u0001\ba\u0006\u001c7.Y4f\u0013\t9\bPA\u0005ECR\fgI]1nK*\u0011QO\u0005\u0005\bu\u001e\u0001\n\u00111\u0001|\u0003%9\u0018\u000e\u001e5U_BL7\rE\u0002J7bBq!`\u0004\u0011\u0002\u0003\u0007a0\u0001\bxSRDw*\u001e;qkRlu\u000eZ3\u0011\u0007%[v\u0010E\u0002U\u0003\u0003I1!a\u0001V\u0005)yU\u000f\u001e9vi6{G-\u001a\u0005\n\u0003\u000f9\u0001\u0013!a\u0001\u0003\u0013\t1b^5uQ>\u0003H/[8ogB)\u0011(a\u00039q%\u0019\u0011Q\u0002#\u0003\u00075\u000b\u0007/A\u000ede\u0016\fG/Z&bM.\fwK]5uKJ$C-\u001a4bk2$HEM\u000b\u0003\u0003'Q3a_A\u000bW\t\t9\u0002\u0005\u0003\u0002\u001a\u0005\rRBAA\u000e\u0015\u0011\ti\"a\b\u0002\u0013Ut7\r[3dW\u0016$'bAA\u0011}\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005\u0015\u00121\u0004\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017aG2sK\u0006$XmS1gW\u0006<&/\u001b;fe\u0012\"WMZ1vYR$3'\u0006\u0002\u0002,)\u001aa0!\u0006\u00027\r\u0014X-\u0019;f\u0017\u000647.Y,sSR,'\u000f\n3fM\u0006,H\u000e\u001e\u00135+\t\t\tD\u000b\u0003\u0002\n\u0005U\u0011A\u0005:v]\u0006sGMV3sS\u001aLh+\u00197vKN$r\u0001SA\u001c\u0003s\tY\u0004C\u0003j\u0017\u0001\u0007\u0001\u0007C\u0003S\u0017\u0001\u00071\u000bC\u0004\u0002>-\u0001\r!a\u0010\u0002\rI,\u0017\rZ3s!\u0019\t\t%a\u0011\u0002H5\t!#C\u0002\u0002FI\u0011q\u0001R1uCN,G\u000fE\u0002J\u0003\u0013J1!a\u0013?\u0005\rIe\u000e^\u0001\u0016eVt\u0017I\u001c3WKJLg-_#yG\u0016\u0004H/[8o+\u0011\t\t&a\u001b\u0015\r\u0005M\u0013QRAH)\u0011\t)&a\"\u0015\u0007!\u000b9\u0006C\u0005\u0002Z1\t\t\u0011q\u0001\u0002\\\u0005QQM^5eK:\u001cW\rJ\u0019\u0011\r\u0005u\u00131MA4\u001b\t\tyFC\u0002\u0002by\nqA]3gY\u0016\u001cG/\u0003\u0003\u0002f\u0005}#\u0001C\"mCN\u001cH+Y4\u0011\t\u0005%\u00141\u000e\u0007\u0001\t\u001d\ti\u0007\u0004b\u0001\u0003_\u0012\u0011\u0001V\t\u0005\u0003c\n9\bE\u0002J\u0003gJ1!!\u001e?\u0005\u001dqu\u000e\u001e5j]\u001e\u0004B!!\u001f\u0002\u0002:!\u00111PA@\u001d\rY\u0014QP\u0005\u0002\u007f%\u0011QOP\u0005\u0005\u0003\u0007\u000b)IA\u0005Fq\u000e,\u0007\u000f^5p]*\u0011QO\u0010\u0005\t\u0003\u0013cA\u00111\u0001\u0002\f\u0006AqO]5uKJ4e\u000eE\u0002J\u001fNCQ!\u001b\u0007A\u0002ABa!!%\r\u0001\u0004A\u0014AD3ya\u0016\u001cG/\u0012:s_Jl5oZ\u0001\u0012CN\u001cXM\u001d;Xe>twmU2iK6\fGc\u0002%\u0002\u0018\u0006e\u00151\u0015\u0005\u0006S6\u0001\r\u0001\r\u0005\b\u00037k\u0001\u0019AAO\u0003)\u0019X\r\\3di\u0016C\bO\u001d\t\u0006\u0003s\ny\nO\u0005\u0005\u0003C\u000b)IA\u0002TKFDa!!%\u000e\u0001\u0004A\u0014!E1tg\u0016\u0014Ho\u0016:p]\u001e|\u0005\u000f^5p]R9\u0001*!+\u0002,\u0006=\u0006\"B5\u000f\u0001\u0004\u0001\u0004bBAW\u001d\u0001\u0007\u0011\u0011B\u0001\b_B$\u0018n\u001c8t\u0011\u0019\t\tJ\u0004a\u0001q\u0001")
public abstract class KafkaSinkStreamingSuiteBase
extends KafkaSinkSuiteBase {
    private final Span streamingTimeout = SpanSugar$.MODULE$.convertIntToGrainOfTime(30).seconds();

    public Span streamingTimeout() {
        return this.streamingTimeout;
    }

    public abstract MemoryStreamBase<String> createMemoryStream();

    public abstract void verifyResult(StreamingQuery var1, Function0<BoxedUnit> var2);

    public abstract Option<Trigger> defaultTrigger();

    public StreamingQuery createKafkaWriter(Dataset<Row> input, Option<String> withTopic, Option<OutputMode> withOutputMode, Map<String, String> withOptions, Seq<String> withSelectExpr) {
        ObjectRef stream = ObjectRef.create(null);
        this.withTempDir((Function1<File, BoxedUnit>)(Function1 & Serializable)checkpointDir -> {
            KafkaSinkStreamingSuiteBase.$anonfun$createKafkaWriter$1(this, input, withSelectExpr, stream, withTopic, withOutputMode, withOptions, checkpointDir);
            return BoxedUnit.UNIT;
        });
        return ((DataStreamWriter)stream.elem).start();
    }

    public Option<String> createKafkaWriter$default$2() {
        return None$.MODULE$;
    }

    public Option<OutputMode> createKafkaWriter$default$3() {
        return None$.MODULE$;
    }

    public Map<String, String> createKafkaWriter$default$4() {
        return (Map)Predef$.MODULE$.Map().apply((Seq)Nil$.MODULE$);
    }

    private void runAndVerifyValues(MemoryStreamBase<String> input, StreamingQuery writer, Dataset<Object> reader) {
        try {
            input.addData((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"1", "2", "3", "4", "5"}));
            this.verifyResult(writer, (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.checkDatasetUnorderly((Function0 & Serializable)() -> reader, (Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2, 3, 4, 5}), (Ordering)Ordering.Int$.MODULE$));
            input.addData((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"6", "7", "8", "9", "10"}));
            this.verifyResult(writer, (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.checkDatasetUnorderly((Function0 & Serializable)() -> reader, (Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), (Ordering)Ordering.Int$.MODULE$));
        }
        finally {
            writer.stop();
        }
    }

    private <T extends Exception> void runAndVerifyException(MemoryStreamBase<String> input, String expectErrorMsg, Function0<StreamingQuery> writerFn, ClassTag<T> evidence$1) {
        Exception exception;
        ObjectRef writer = ObjectRef.create(null);
        try {
            exception = (Exception)this.intercept((Function0 & Serializable)() -> {
                writer$1.elem = (StreamingQuery)writerFn.apply();
                input.addData((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"1", "2", "3", "4", "5"}));
                MemoryStreamBase memoryStreamBase = input;
                if (!(memoryStreamBase instanceof MemoryStream)) {
                    if (memoryStreamBase instanceof ContinuousMemoryStream) {
                        this.eventually(this.timeout(this.streamingTimeout()), (Function0 & Serializable)() -> {
                            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(((StreamingQuery)writer$1.elem).exception().isDefined(), "writer.exception.isDefined", Prettifier$.MODULE$.default());
                            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 252));
                        }, this.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 251));
                        throw (Throwable)((StreamingQuery)writer$1.elem).exception().get();
                    }
                    throw new MatchError((Object)memoryStreamBase);
                }
                ((StreamingQuery)writer$1.elem).processAllAvailable();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return boxedUnit;
            }, evidence$1, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 245));
        }
        finally {
            if ((StreamingQuery)writer.elem != null) {
                ((StreamingQuery)writer.elem).stop();
            }
        }
        Exception ex = exception;
        TestUtils$.MODULE$.assertExceptionMsg((Throwable)ex, expectErrorMsg, true, evidence$1);
    }

    private void assertWrongSchema(MemoryStreamBase<String> input, Seq<String> selectExpr, String expectErrorMsg) {
        this.runAndVerifyException(input, expectErrorMsg, (Function0<StreamingQuery>)(Function0 & Serializable)() -> this.createKafkaWriter((Dataset<Row>)input.toDF(), this.createKafkaWriter$default$2(), this.createKafkaWriter$default$3(), this.createKafkaWriter$default$4(), selectExpr), ClassTag$.MODULE$.apply(Exception.class));
    }

    private void assertWrongOption(MemoryStreamBase<String> input, Map<String, String> options, String expectErrorMsg) {
        this.runAndVerifyException(input, expectErrorMsg, (Function0<StreamingQuery>)(Function0 & Serializable)() -> {
            Dataset x$1 = input.toDF();
            Map x$2 = options;
            Option<String> x$3 = this.createKafkaWriter$default$2();
            Option<OutputMode> x$4 = this.createKafkaWriter$default$3();
            return this.createKafkaWriter((Dataset<Row>)x$1, x$3, x$4, (Map<String, String>)x$2, (Seq<String>)Nil$.MODULE$);
        }, ClassTag$.MODULE$.apply(Exception.class));
    }

    public static final /* synthetic */ void $anonfun$createKafkaWriter$1(KafkaSinkStreamingSuiteBase $this, Dataset input$2, Seq withSelectExpr$1, ObjectRef stream$1, Option withTopic$1, Option withOutputMode$1, Map withOptions$1, File checkpointDir) {
        Dataset df = input$2.toDF();
        if (withSelectExpr$1.length() > 0) {
            df = df.selectExpr(withSelectExpr$1);
        }
        stream$1.elem = df.writeStream().format("kafka").option("checkpointLocation", checkpointDir.getCanonicalPath()).option("kafka.bootstrap.servers", $this.testUtils().brokerAddress()).option("kafka.max.block.ms", "5000").queryName("kafkaStream");
        withTopic$1.foreach((Function1 & Serializable)x$5 -> ((DataStreamWriter)stream$1.elem).option("topic", x$5));
        withOutputMode$1.foreach((Function1 & Serializable)x$6 -> ((DataStreamWriter)stream$1.elem).outputMode(x$6));
        withOptions$1.foreach((Function1 & Serializable)opt -> ((DataStreamWriter)stream$1.elem).option((String)opt._1(), (String)opt._2()));
        $this.defaultTrigger().foreach((Function1 & Serializable)x$7 -> ((DataStreamWriter)stream$1.elem).trigger(x$7));
    }

    public KafkaSinkStreamingSuiteBase() {
        this.test("streaming - write to kafka with topic field", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            MemoryStreamBase<String> input = this.createMemoryStream();
            String topic = this.newTopic();
            this.testUtils().createTopic(topic);
            StreamingQuery writer = this.createKafkaWriter((Dataset<Row>)input.toDF(), (Option<String>)None$.MODULE$, (Option<OutputMode>)new Some((Object)OutputMode.Append()), this.createKafkaWriter$default$4(), (Seq<String>)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{new StringBuilder(11).append("'").append(topic).append("' as topic").toString(), "value"}));
            JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(KafkaSinkStreamingSuiteBase.class.getClassLoader());
            public final class Org_apache_spark_sql_kafka010_KafkaSinkStreamingSuiteBase$$typecreator5$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.Tuple2"), (List)new .colon.colon((Object)$u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.Option"), (List)new .colon.colon((Object)$m.staticClass("scala.Int").asType().toTypeConstructor(), (List)Nil$.MODULE$)), (List)new .colon.colon((Object)$m.staticClass("scala.Int").asType().toTypeConstructor(), (List)Nil$.MODULE$)));
                }

                public Org_apache_spark_sql_kafka010_KafkaSinkStreamingSuiteBase$$typecreator5$1(KafkaSinkStreamingSuiteBase $outer) {
                }
            }
            Dataset reader = this.createKafkaReader(topic, this.createKafkaReader$default$2()).selectExpr((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"CAST(key as STRING) key", "CAST(value as STRING) value"})).selectExpr((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"CAST(key as INT) key", "CAST(value as INT) value"})).as(this.testImplicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Org_apache_spark_sql_kafka010_KafkaSinkStreamingSuiteBase$$typecreator5$1(null)))).map((Function1 & Serializable)x$2 -> BoxesRunTime.boxToInteger((int)x$2._2$mcI$sp()), this.testImplicits().newIntEncoder());
            this.runAndVerifyValues(input, writer, (Dataset<Object>)reader);
        }, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 89));
        this.test("streaming - write w/o topic field, with topic option", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            MemoryStreamBase<String> input = this.createMemoryStream();
            String topic = this.newTopic();
            this.testUtils().createTopic(topic);
            StreamingQuery writer = this.createKafkaWriter((Dataset<Row>)input.toDF(), (Option<String>)new Some((Object)topic), (Option<OutputMode>)new Some((Object)OutputMode.Append()), this.createKafkaWriter$default$4(), (Seq<String>)Nil$.MODULE$);
            JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(KafkaSinkStreamingSuiteBase.class.getClassLoader());
            public final class Org_apache_spark_sql_kafka010_KafkaSinkStreamingSuiteBase$$typecreator10$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.Tuple2"), (List)new .colon.colon((Object)$u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.Option"), (List)new .colon.colon((Object)$m.staticClass("scala.Int").asType().toTypeConstructor(), (List)Nil$.MODULE$)), (List)new .colon.colon((Object)$m.staticClass("scala.Int").asType().toTypeConstructor(), (List)Nil$.MODULE$)));
                }

                public Org_apache_spark_sql_kafka010_KafkaSinkStreamingSuiteBase$$typecreator10$1(KafkaSinkStreamingSuiteBase $outer) {
                }
            }
            Dataset reader = this.createKafkaReader(topic, this.createKafkaReader$default$2()).selectExpr((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"CAST(key as STRING) key", "CAST(value as STRING) value"})).selectExpr((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"CAST(key as INT) key", "CAST(value as INT) value"})).as(this.testImplicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Org_apache_spark_sql_kafka010_KafkaSinkStreamingSuiteBase$$typecreator10$1(null)))).map((Function1 & Serializable)x$3 -> BoxesRunTime.boxToInteger((int)x$3._2$mcI$sp()), this.testImplicits().newIntEncoder());
            this.runAndVerifyValues(input, writer, (Dataset<Object>)reader);
        }, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 109));
        this.test("streaming - topic field and topic option", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            MemoryStreamBase<String> input = this.createMemoryStream();
            String topic = this.newTopic();
            this.testUtils().createTopic(topic);
            StreamingQuery writer = this.createKafkaWriter((Dataset<Row>)input.toDF(), (Option<String>)new Some((Object)topic), (Option<OutputMode>)new Some((Object)OutputMode.Append()), this.createKafkaWriter$default$4(), (Seq<String>)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"'foo' as topic", "value"}));
            JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(KafkaSinkStreamingSuiteBase.class.getClassLoader());
            public final class Org_apache_spark_sql_kafka010_KafkaSinkStreamingSuiteBase$$typecreator15$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.Tuple2"), (List)new .colon.colon((Object)$u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.Option"), (List)new .colon.colon((Object)$m.staticClass("scala.Int").asType().toTypeConstructor(), (List)Nil$.MODULE$)), (List)new .colon.colon((Object)$m.staticClass("scala.Int").asType().toTypeConstructor(), (List)Nil$.MODULE$)));
                }

                public Org_apache_spark_sql_kafka010_KafkaSinkStreamingSuiteBase$$typecreator15$1(KafkaSinkStreamingSuiteBase $outer) {
                }
            }
            Dataset reader = this.createKafkaReader(topic, this.createKafkaReader$default$2()).selectExpr((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"CAST(key AS STRING)", "CAST(value AS STRING)"})).selectExpr((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"CAST(key AS INT)", "CAST(value AS INT)"})).as(this.testImplicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Org_apache_spark_sql_kafka010_KafkaSinkStreamingSuiteBase$$typecreator15$1(null)))).map((Function1 & Serializable)x$4 -> BoxesRunTime.boxToInteger((int)x$4._2$mcI$sp()), this.testImplicits().newIntEncoder());
            this.runAndVerifyValues(input, writer, (Dataset<Object>)reader);
        }, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 128));
        this.test("streaming - write data with bad schema", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            MemoryStreamBase<String> input = this.createMemoryStream();
            String topic = this.newTopic();
            this.testUtils().createTopic(topic);
            this.assertWrongSchema(input, (Seq<String>)((Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"value as key", "value"}))), "topic option required when no 'topic' attribute is present");
            this.assertWrongSchema(input, (Seq<String>)((Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{new StringBuilder(11).append("'").append(topic).append("' as topic").toString(), "value as key"}))), "required attribute 'value' not found");
        }, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 155));
        this.test("streaming - write data with valid schema but wrong types", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            MemoryStreamBase<String> input = this.createMemoryStream();
            String topic = this.newTopic();
            this.testUtils().createTopic(topic);
            this.assertWrongSchema(input, (Seq<String>)((Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"CAST('1' as INT) as topic", "value"}))), "topic must be a(n) string");
            this.assertWrongSchema(input, (Seq<String>)((Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{new StringBuilder(11).append("'").append(topic).append("' as topic").toString(), "CAST(value as INT) as value"}))), "value must be a(n) string or binary");
            this.assertWrongSchema(input, (Seq<String>)((Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{new StringBuilder(11).append("'").append(topic).append("' as topic").toString(), "CAST(value as INT) as key", "value"}))), "key must be a(n) string or binary");
            this.assertWrongSchema(input, (Seq<String>)((Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{new StringBuilder(11).append("'").append(topic).append("' as topic").toString(), "value", "value as partition"}))), "partition must be a(n) int");
        }, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 166));
        this.test("streaming - write to non-existing topic", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            MemoryStreamBase<String> input = this.createMemoryStream();
            this.runAndVerifyException(input, "job aborted", (Function0<StreamingQuery>)(Function0 & Serializable)() -> this.createKafkaWriter((Dataset<Row>)input.toDF(), (Option<String>)new Some((Object)this.newTopic()), this.createKafkaWriter$default$3(), this.createKafkaWriter$default$4(), (Seq<String>)Nil$.MODULE$), ClassTag$.MODULE$.apply(StreamingQueryException.class));
        }, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 181));
        this.test("streaming - exception on config serializer", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            MemoryStreamBase<String> input = this.createMemoryStream();
            this.assertWrongOption(input, (Map<String, String>)((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"kafka.key.serializer"), (Object)"foo")}))), "kafka option 'key.serializer' is not supported");
            this.assertWrongOption(input, (Map<String, String>)((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"kafka.value.serializer"), (Object)"foo")}))), "kafka option 'value.serializer' is not supported");
        }, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 189));
    }
}

