/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import com.typesafe.config.Config;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.TopicModelUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroSerializerExpression;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroSerializerExpression$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.SubjectUtils$;
import it.agilelab.bigdata.wasp.models.DatastoreModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.models.TopicModel;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.Cast$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.StringBuilder;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.Symbols;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;

public final class KafkaWriters$
implements Logging {
    public static final KafkaWriters$ MODULE$;
    private final WaspLogger logger;

    static {
        new KafkaWriters$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    public Dataset<Row> addTopicNameCheckIfNeeded(Option<String> topicFieldName, Seq<TopicModel> topics, Dataset<Row> df) {
        Dataset dataset;
        if (topicFieldName.isEmpty()) {
            dataset = df;
        } else {
            Set acceptedTopicNames = ((TraversableOnce)topics.map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply(TopicModel x$1) {
                    return x$1.name();
                }
            }, Seq$.MODULE$.canBuildFrom())).toSet();
            Serializable checkTopicName = new Serializable(acceptedTopicNames){
                public static final long serialVersionUID = 0L;
                private final Set acceptedTopicNames$1;

                public final String apply(String topicName) {
                    if (this.acceptedTopicNames$1.apply((Object)topicName)) {
                        return topicName;
                    }
                    throw new Exception(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Topic name \"", "\" is not in the topic models for the MultiTopicModel used"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicName})));
                }
                {
                    this.acceptedTopicNames$1 = acceptedTopicNames$1;
                }
            };
            JavaUniverse $u = package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            JavaUniverse $u2 = package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m2 = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            public final class It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator1$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$);
                }

                public It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator1$1() {
                }
            }
            public final class It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator2$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$);
                }

                public It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator2$1() {
                }
            }
            UserDefinedFunction checkTopicNameUdf = functions$.MODULE$.udf((Function1)checkTopicName, ((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator1$1()), ((TypeTags)$u2).TypeTag().apply((Mirror)$m2, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator2$1()));
            dataset = df.withColumn("topic", checkTopicNameUdf.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("topic")})));
        }
        return dataset;
    }

    public Dataset<Row> prepareDfToWrite(Dataset<Row> df, Option<String> topicFieldNameOpt, Seq<TopicModel> topics, Option<Config> darwinConf) {
        Option<String> option;
        block4: {
            Dataset dataset;
            block3: {
                UserDefinedFunction throwException;
                block2: {
                    JavaUniverse $u = package$.MODULE$.universe();
                    JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
                    public final class It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator3$1
                    extends TypeCreator {
                        public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                            Universe $u = $m$untyped.universe();
                            Mirror<U> $m = $m$untyped;
                            return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$);
                        }

                        public It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator3$1() {
                        }
                    }
                    throwException = functions$.MODULE$.udf((Function1)new Serializable(){
                        public static final long serialVersionUID = 0L;

                        public final Nothing$ apply(String s) {
                            throw new Exception(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unknown topic name ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{s})));
                        }
                    }, ((TypeTags)package$.MODULE$.universe()).TypeTag().Nothing(), ((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator3$1()));
                    option = topicFieldNameOpt;
                    if (!(option instanceof Some)) break block2;
                    Some some = (Some)option;
                    String topicFieldName = (String)some.x();
                    Predef$.MODULE$.require(topics.size() > 1, (Function0)new Serializable(topics, topicFieldName){
                        public static final long serialVersionUID = 0L;
                        private final Seq topics$2;
                        private final String topicFieldName$1;

                        public final String apply() {
                            return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Got topicFieldName = ", " but only one topic to write (", ")"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.topicFieldName$1, this.topics$2}));
                        }
                        {
                            this.topics$2 = topics$2;
                            this.topicFieldName$1 = topicFieldName$1;
                        }
                    });
                    Option<Column> keyCol = this.keyExpression(topics, topicFieldNameOpt, throwException, (Function1<String, Column>)new Serializable(df){
                        public static final long serialVersionUID = 0L;
                        private final Dataset df$1;

                        public final Column apply(String colName) {
                            return this.df$1.col(colName);
                        }
                        {
                            this.df$1 = df$1;
                        }
                    }, darwinConf);
                    Option<Column> headersCol = this.headerExpression(topics, topicFieldNameOpt, throwException);
                    Column topicCol = functions$.MODULE$.col(topicFieldName);
                    Column valueCol = this.valueExpression(topics, topicFieldNameOpt, df.schema(), (Function1<String, Column>)new Serializable(df){
                        public static final long serialVersionUID = 0L;
                        private final Dataset df$1;

                        public final Column apply(String colName) {
                            return this.df$1.col(colName);
                        }
                        {
                            this.df$1 = df$1;
                        }
                    }, throwException, darwinConf);
                    Seq columns = ((TraversableOnce)((TraversableLike)Option$.MODULE$.option2Iterable(keyCol.map((Function1)new Serializable(){
                        public static final long serialVersionUID = 0L;

                        public final Column apply(Column x$2) {
                            return x$2.as("key");
                        }
                    })).$plus$plus((GenTraversableOnce)Option$.MODULE$.option2Iterable(headersCol.map((Function1)new Serializable(){
                        public static final long serialVersionUID = 0L;

                        public final Column apply(Column x$3) {
                            return x$3.as("headers");
                        }
                    })), Iterable$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{topicCol.as("topic"), valueCol.as("value")})), Iterable$.MODULE$.canBuildFrom())).toSeq();
                    dataset = df.select(columns);
                    break block3;
                }
                if (!None$.MODULE$.equals(option)) break block4;
                Predef$.MODULE$.require(topics.size() == 1, (Function0)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "More than one topic to write specified but there's no column containing the topics' name.";
                    }
                });
                Option<Column> keyCol = this.keyExpression(topics, topicFieldNameOpt, throwException, (Function1<String, Column>)new Serializable(df){
                    public static final long serialVersionUID = 0L;
                    private final Dataset df$1;

                    public final Column apply(String colName) {
                        return this.df$1.col(colName);
                    }
                    {
                        this.df$1 = df$1;
                    }
                }, darwinConf);
                Option<Column> headersCol = this.headerExpression(topics, topicFieldNameOpt, throwException);
                Column valueCol = this.valueExpression(topics, topicFieldNameOpt, df.schema(), (Function1<String, Column>)new Serializable(df){
                    public static final long serialVersionUID = 0L;
                    private final Dataset df$1;

                    public final Column apply(String colName) {
                        return this.df$1.col(colName);
                    }
                    {
                        this.df$1 = df$1;
                    }
                }, throwException, darwinConf);
                Seq columns = ((TraversableOnce)((TraversableLike)Option$.MODULE$.option2Iterable(keyCol.map((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final Column apply(Column x$4) {
                        return x$4.as("key");
                    }
                })).$plus$plus((GenTraversableOnce)Option$.MODULE$.option2Iterable(headersCol.map((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final Column apply(Column x$5) {
                        return x$5.as("headers");
                    }
                })), Iterable$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{valueCol.as("value")})), Iterable$.MODULE$.canBuildFrom())).toSeq();
                dataset = df.select(columns);
            }
            return dataset;
        }
        throw new MatchError(option);
    }

    private Option<Column> keyExpression(Seq<TopicModel> topics, Option<String> topicFieldName, UserDefinedFunction exceptionUdf, Function1<String, Column> columnExtractor, Option<Config> darwinConf) {
        None$ none$;
        if (topics.exists((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(TopicModel x$6) {
                return x$6.keyFieldName().isDefined();
            }
        })) {
            if (topicFieldName.isDefined()) {
                TopicModel head = (TopicModel)topics.head();
                Seq tail = (Seq)topics.tail();
                none$ = new Some((Object)((Column)tail.foldLeft((Object)functions$.MODULE$.when(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName((String)topicFieldName.get(), head), (Object)this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfKey$1(head, columnExtractor, darwinConf)), (Function2)new Serializable(topicFieldName, columnExtractor, darwinConf){
                    public static final long serialVersionUID = 0L;
                    private final Option topicFieldName$2;
                    private final Function1 columnExtractor$1;
                    private final Option darwinConf$1;

                    public final Column apply(Column z, TopicModel x) {
                        return z.when(KafkaWriters$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName((String)this.topicFieldName$2.get(), x), (Object)KafkaWriters$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfKey$1(x, this.columnExtractor$1, this.darwinConf$1));
                    }
                    {
                        this.topicFieldName$2 = topicFieldName$2;
                        this.columnExtractor$1 = columnExtractor$1;
                        this.darwinConf$1 = darwinConf$1;
                    }
                })).otherwise((Object)exceptionUdf.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col((String)topicFieldName.get())}))));
            } else {
                none$ = new Some((Object)this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfKey$1((TopicModel)topics.head(), columnExtractor, darwinConf));
            }
        } else {
            none$ = None$.MODULE$;
        }
        return none$;
    }

    private Column valueExpression(Seq<TopicModel> topics, Option<String> topicFieldName, StructType dfSchema, Function1<String, Column> columnExtractor, UserDefinedFunction exceptionUdf, Option<Config> darwinConf) {
        Column column;
        if (topicFieldName.isDefined()) {
            TopicModel head = (TopicModel)topics.head();
            Seq tail = (Seq)topics.tail();
            column = ((Column)tail.foldLeft((Object)functions$.MODULE$.when(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName((String)topicFieldName.get(), head), (Object)this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfValue$1(head, dfSchema, columnExtractor, darwinConf)), (Function2)new Serializable(topicFieldName, dfSchema, columnExtractor, darwinConf){
                public static final long serialVersionUID = 0L;
                private final Option topicFieldName$4;
                private final StructType dfSchema$1;
                private final Function1 columnExtractor$2;
                private final Option darwinConf$2;

                public final Column apply(Column z, TopicModel x) {
                    return z.when(KafkaWriters$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName((String)this.topicFieldName$4.get(), x), (Object)KafkaWriters$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfValue$1(x, this.dfSchema$1, this.columnExtractor$2, this.darwinConf$2));
                }
                {
                    this.topicFieldName$4 = topicFieldName$4;
                    this.dfSchema$1 = dfSchema$1;
                    this.columnExtractor$2 = columnExtractor$2;
                    this.darwinConf$2 = darwinConf$2;
                }
            })).otherwise((Object)exceptionUdf.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col((String)topicFieldName.get())})));
        } else {
            column = this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfValue$1((TopicModel)topics.head(), dfSchema, columnExtractor, darwinConf);
        }
        return column;
    }

    private Option<Column> headerExpression(Seq<TopicModel> topics, Option<String> topicFieldName, UserDefinedFunction exceptionUdf) {
        None$ none$;
        if (topics.exists((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(TopicModel x$7) {
                return x$7.headersFieldName().isDefined();
            }
        })) {
            if (topicFieldName.isDefined()) {
                TopicModel head = (TopicModel)topics.head();
                Seq tail = (Seq)topics.tail();
                none$ = new Some((Object)((Column)tail.foldLeft((Object)functions$.MODULE$.when(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName((String)topicFieldName.get(), head), (Object)this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfHeader$1(head)), (Function2)new Serializable(topicFieldName){
                    public static final long serialVersionUID = 0L;
                    private final Option topicFieldName$3;

                    public final Column apply(Column z, TopicModel x) {
                        return z.when(KafkaWriters$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName((String)this.topicFieldName$3.get(), x), (Object)KafkaWriters$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfHeader$1(x));
                    }
                    {
                        this.topicFieldName$3 = topicFieldName$3;
                    }
                })).otherwise((Object)exceptionUdf.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col((String)topicFieldName.get())}))));
            } else {
                none$ = new Some((Object)this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfHeader$1((TopicModel)topics.head()));
            }
        } else {
            none$ = None$.MODULE$;
        }
        return none$;
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName(String topicFieldName, TopicModel head) {
        return functions$.MODULE$.col(topicFieldName).equalTo((Object)head.name());
    }

    private Column convertKeyForAvro(Column keyColumn, TopicModel topicModel, Option<Config> darwinConf) {
        block7: {
            Column column;
            block6: {
                block5: {
                    Object object;
                    if (!topicModel.keySchema().isDefined()) break block5;
                    Column exprToConvertToAvro = keyColumn;
                    DataType keySchema = exprToConvertToAvro.expr().dataType();
                    String avroRecordName = new StringBuilder().append((Object)topicModel.name()).append((Object)"key").toString();
                    String avroRecordNamespace = "wasp";
                    if (topicModel.useAvroSchemaManager()) {
                        Schema avroSchema = new Schema.Parser().parse((String)topicModel.keySchema().get());
                        Schema updatedSchema = SubjectUtils$.MODULE$.attachSubjectToSchema(topicModel, avroSchema, true);
                        Config config = (Config)darwinConf.get();
                        object = new Serializable(avroRecordName, avroRecordNamespace, updatedSchema, config){
                            public static final long serialVersionUID = 0L;
                            private final String avroRecordName$1;
                            private final String avroRecordNamespace$1;
                            private final Schema updatedSchema$1;
                            private final Config eta$0$1$1;

                            public final AvroSerializerExpression apply(Expression child, DataType sparkSchema) {
                                return AvroSerializerExpression$.MODULE$.apply(this.eta$0$1$1, this.updatedSchema$1, this.avroRecordName$1, this.avroRecordNamespace$1, child, sparkSchema);
                            }
                            {
                                this.avroRecordName$1 = avroRecordName$1;
                                this.avroRecordNamespace$1 = avroRecordNamespace$1;
                                this.updatedSchema$1 = updatedSchema$1;
                                this.eta$0$1$1 = eta$0$1$1;
                            }
                        };
                    } else {
                        Some some = new Some(topicModel.keySchema().get());
                        object = new Serializable(avroRecordName, avroRecordNamespace, some){
                            public static final long serialVersionUID = 0L;
                            private final String avroRecordName$1;
                            private final String avroRecordNamespace$1;
                            private final Some eta$0$2$1;

                            public final AvroSerializerExpression apply(Expression child, DataType sparkSchema) {
                                return AvroSerializerExpression$.MODULE$.apply((Option)this.eta$0$2$1, this.avroRecordName$1, this.avroRecordNamespace$1, child, sparkSchema);
                            }
                            {
                                this.avroRecordName$1 = avroRecordName$1;
                                this.avroRecordNamespace$1 = avroRecordNamespace$1;
                                this.eta$0$2$1 = eta$0$2$1;
                            }
                        };
                    }
                    Serializable rowToAvroExprFactory = object;
                    AvroSerializerExpression rowToAvroExpr = (AvroSerializerExpression)rowToAvroExprFactory.apply((Object)exprToConvertToAvro.expr(), (Object)keySchema);
                    column = new Column((Expression)rowToAvroExpr).as("key");
                    break block6;
                }
                if (!Cast$.MODULE$.canCast(keyColumn.expr().dataType(), (DataType)BinaryType$.MODULE$)) break block7;
                column = keyColumn.cast((DataType)BinaryType$.MODULE$).as("key");
            }
            return column;
        }
        throw new Exception(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Cannot serialize key for Kafka topic because column ", " cannot "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{keyColumn.toString()}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"be cast to Binary. If you want to serialize it in Avro format, set TopicModel.keySchema accordingly."})).s((Seq)Nil$.MODULE$)).toString());
    }

    private Column convertKeyToBinary(Column keyColumn) {
        if (Cast$.MODULE$.canCast(keyColumn.expr().dataType(), (DataType)BinaryType$.MODULE$)) {
            return keyColumn.cast((DataType)BinaryType$.MODULE$).as("key");
        }
        throw new Exception(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Cannot serialize key for Kafka topic because column ", " cannot "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{keyColumn.toString()}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"be cast to Binary."})).s((Seq)Nil$.MODULE$)).toString());
    }

    /*
     * WARNING - void declaration
     */
    private Column convertValueForJson(Seq<String> columnsInValues) {
        void var2_2;
        Column columnExpr = functions$.MODULE$.to_json(functions$.MODULE$.struct((Seq)columnsInValues.map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Column apply(String colName) {
                return functions$.MODULE$.col(colName);
            }
        }, Seq$.MODULE$.canBuildFrom()))).cast((DataType)BinaryType$.MODULE$).as("value");
        this.logger().debug((Function0)new Serializable(columnExpr){
            public static final long serialVersionUID = 0L;
            private final Column columnExpr$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Generated select expression: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.columnExpr$1.expr().toString()}));
            }
            {
                this.columnExpr$1 = columnExpr$1;
            }
        });
        return var2_2;
    }

    private Column convertValueForPlainText(Seq<String> columnsInValues, StructType schema) {
        Predef$.MODULE$.require(columnsInValues.size() == 1, (Function0)new Serializable(columnsInValues){
            public static final long serialVersionUID = 0L;
            private final Seq columnsInValues$1;

            public final String apply() {
                return new StringBuilder().append((Object)"Exactly one value field name must be defined for plaintext topic data type but zero or more than one ").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"were specified; value field names: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.columnsInValues$1.mkString("\"", "\", \"", "\"")}))).toString();
            }
            {
                this.columnsInValues$1 = columnsInValues$1;
            }
        });
        String valueFieldName = (String)columnsInValues.head();
        Option maybeValueColumn = schema.find((Function1)new Serializable(valueFieldName){
            public static final long serialVersionUID = 0L;
            private final String valueFieldName$1;

            public final boolean apply(StructField x$8) {
                String string = x$8.name();
                String string2 = this.valueFieldName$1;
                return !(string != null ? !string.equals(string2) : string2 != null);
            }
            {
                this.valueFieldName$1 = valueFieldName$1;
            }
        });
        Predef$.MODULE$.require(maybeValueColumn.isDefined(), (Function0)new Serializable(schema, valueFieldName){
            public static final long serialVersionUID = 0L;
            private final StructType schema$1;
            private final String valueFieldName$1;

            public final String apply() {
                return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"The specified value field name \"", "\" does not match any column; columns in schema: "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.valueFieldName$1}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{((TraversableOnce)this.schema$1.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(StructField x$9) {
                        return x$9.name();
                    }
                }, Seq$.MODULE$.canBuildFrom())).mkString("[", "], [", "]")}))).toString();
            }
            {
                this.schema$1 = schema$1;
                this.valueFieldName$1 = valueFieldName$1;
            }
        });
        StructField valueColumn = (StructField)maybeValueColumn.get();
        DataType valueColumnDataType = valueColumn.dataType();
        Predef$.MODULE$.require(Cast$.MODULE$.canCast(valueColumnDataType, (DataType)StringType$.MODULE$), (Function0)new Serializable(valueFieldName, valueColumnDataType){
            public static final long serialVersionUID = 0L;
            private final String valueFieldName$1;
            private final DataType valueColumnDataType$1;

            public final String apply() {
                return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"The specified value field name \"", "\" matches a column with a type that is not string; "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.valueFieldName$1}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"incompatible type ", " found"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.valueColumnDataType$1}))).toString();
            }
            {
                this.valueFieldName$1 = valueFieldName$1;
                this.valueColumnDataType$1 = valueColumnDataType$1;
            }
        });
        Column columnExpr = functions$.MODULE$.col(valueFieldName).cast((DataType)StringType$.MODULE$).cast((DataType)BinaryType$.MODULE$).as("value");
        this.logger().debug((Function0)new Serializable(columnExpr){
            public static final long serialVersionUID = 0L;
            private final Column columnExpr$2;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Generated select expression: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.columnExpr$2.expr().toString()}));
            }
            {
                this.columnExpr$2 = columnExpr$2;
            }
        });
        return columnExpr;
    }

    public Column convertValueForBinary(Seq<String> columnsInValues, StructType schema) {
        DataType valueColumnDataType;
        Predef$.MODULE$.require(columnsInValues.size() == 1, (Function0)new Serializable(columnsInValues){
            public static final long serialVersionUID = 0L;
            private final Seq columnsInValues$2;

            public final String apply() {
                return new StringBuilder().append((Object)"Exactly one value field name must be defined for binary topic data type but zero or more than one were ").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"specified; value field names: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.columnsInValues$2.mkString("\"", "\", \"", "\"")}))).toString();
            }
            {
                this.columnsInValues$2 = columnsInValues$2;
            }
        });
        String valueFieldName = (String)columnsInValues.head();
        Option maybeValueColumn = schema.find((Function1)new Serializable(valueFieldName){
            public static final long serialVersionUID = 0L;
            private final String valueFieldName$2;

            public final boolean apply(StructField x$10) {
                String string = x$10.name();
                String string2 = this.valueFieldName$2;
                return !(string != null ? !string.equals(string2) : string2 != null);
            }
            {
                this.valueFieldName$2 = valueFieldName$2;
            }
        });
        Predef$.MODULE$.require(maybeValueColumn.isDefined(), (Function0)new Serializable(schema, valueFieldName){
            public static final long serialVersionUID = 0L;
            private final StructType schema$2;
            private final String valueFieldName$2;

            public final String apply() {
                return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"The specified value field name \"", "\" does not match any column; columns in schema: "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.valueFieldName$2}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{((TraversableOnce)this.schema$2.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(StructField x$11) {
                        return x$11.name();
                    }
                }, Seq$.MODULE$.canBuildFrom())).mkString("[", "], [", "]")}))).toString();
            }
            {
                this.schema$2 = schema$2;
                this.valueFieldName$2 = valueFieldName$2;
            }
        });
        StructField valueColumn = (StructField)maybeValueColumn.get();
        DataType dataType = valueColumnDataType = valueColumn.dataType();
        BinaryType$ binaryType$ = BinaryType$.MODULE$;
        Predef$.MODULE$.require(!(dataType != null ? !dataType.equals(binaryType$) : binaryType$ != null), (Function0)new Serializable(valueFieldName, valueColumnDataType){
            public static final long serialVersionUID = 0L;
            private final String valueFieldName$2;
            private final DataType valueColumnDataType$2;

            public final String apply() {
                return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"The specified value field name \"", "\" matches a column with a type that is not binary; "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.valueFieldName$2}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"incompatible type ", " found"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.valueColumnDataType$2}))).toString();
            }
            {
                this.valueFieldName$2 = valueFieldName$2;
                this.valueColumnDataType$2 = valueColumnDataType$2;
            }
        });
        String expression = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " AS value"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{valueFieldName}));
        this.logger().debug((Function0)new Serializable(expression){
            public static final long serialVersionUID = 0L;
            private final String expression$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Generated select expressions: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.expression$1}));
            }
            {
                this.expression$1 = expression$1;
            }
        });
        return functions$.MODULE$.expr(expression);
    }

    private Column convertValueForAvro(Seq<String> columnsInValues, TopicModel topicModel, StructType schema, Function1<String, Column> columnExtractor, Option<Config> darwinConf) {
        Object object;
        Expression exprToConvertToAvro = functions$.MODULE$.struct((Seq)columnsInValues.map(columnExtractor, Seq$.MODULE$.canBuildFrom())).expr();
        StructType valueSchema = (StructType)exprToConvertToAvro.dataType();
        String avroRecordName = topicModel.name();
        String avroRecordNamespace = "wasp";
        if (topicModel.useAvroSchemaManager()) {
            Schema avroSchema = new Schema.Parser().parse(topicModel.getJsonSchema());
            Schema updatedSchema = SubjectUtils$.MODULE$.attachSubjectToSchema(topicModel, avroSchema, false);
            Config config = (Config)darwinConf.get();
            object = new Serializable(avroRecordName, avroRecordNamespace, updatedSchema, config){
                public static final long serialVersionUID = 0L;
                private final String avroRecordName$2;
                private final String avroRecordNamespace$2;
                private final Schema updatedSchema$2;
                private final Config eta$0$3$1;

                public final AvroSerializerExpression apply(Expression child, DataType sparkSchema) {
                    return AvroSerializerExpression$.MODULE$.apply(this.eta$0$3$1, this.updatedSchema$2, this.avroRecordName$2, this.avroRecordNamespace$2, child, sparkSchema);
                }
                {
                    this.avroRecordName$2 = avroRecordName$2;
                    this.avroRecordNamespace$2 = avroRecordNamespace$2;
                    this.updatedSchema$2 = updatedSchema$2;
                    this.eta$0$3$1 = eta$0$3$1;
                }
            };
        } else {
            Some some = new Some((Object)topicModel.getJsonSchema());
            object = new Serializable(avroRecordName, avroRecordNamespace, some){
                public static final long serialVersionUID = 0L;
                private final String avroRecordName$2;
                private final String avroRecordNamespace$2;
                private final Some eta$0$4$1;

                public final AvroSerializerExpression apply(Expression child, DataType sparkSchema) {
                    return AvroSerializerExpression$.MODULE$.apply((Option)this.eta$0$4$1, this.avroRecordName$2, this.avroRecordNamespace$2, child, sparkSchema);
                }
                {
                    this.avroRecordName$2 = avroRecordName$2;
                    this.avroRecordNamespace$2 = avroRecordNamespace$2;
                    this.eta$0$4$1 = eta$0$4$1;
                }
            };
        }
        Serializable rowToAvroExprFactory = object;
        AvroSerializerExpression rowToAvroExpr = (AvroSerializerExpression)rowToAvroExprFactory.apply((Object)exprToConvertToAvro, (Object)valueSchema);
        return new Column((Expression)rowToAvroExpr);
    }

    public Dataset<Row> convertDataframe(Dataset<Row> stream, Option<String> topicFieldName, Seq<TopicModel> topics, DatastoreModel mainTopicModel, Option<Config> darwinConf) {
        TopicModelUtils$.MODULE$.isTopicWritable(mainTopicModel, (Seq<TopicModel>)topics, stream).fold((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Nothing$ apply(String s) {
                throw new IllegalArgumentException(s);
            }
        }, (Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final void apply(BoxedUnit x$12) {
            }
        });
        List topicsToWrite = topics.isEmpty() ? List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicModel[]{(TopicModel)mainTopicModel})) : topics;
        this.logger().info((Function0)new Serializable((Seq)topicsToWrite){
            public static final long serialVersionUID = 0L;
            private final Seq topicsToWrite$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Writing with topic models: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{((TraversableOnce)this.topicsToWrite$1.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(TopicModel x$13) {
                        return x$13.name();
                    }
                }, Seq$.MODULE$.canBuildFrom())).mkString(" ")}));
            }
            {
                this.topicsToWrite$1 = topicsToWrite$1;
            }
        });
        if (mainTopicModel instanceof MultiTopicModel) {
            this.logger().info((Function0)new Serializable((Seq)topics, mainTopicModel){
                public static final long serialVersionUID = 0L;
                private final Seq topics$1;
                private final DatastoreModel mainTopicModel$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Topic model \"", "\" is a MultiTopicModel for topics: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.mainTopicModel$1.name(), this.topics$1}));
                }
                {
                    this.topics$1 = topics$1;
                    this.mainTopicModel$1 = mainTopicModel$1;
                }
            });
        }
        this.logger().debug((Function0)new Serializable(stream){
            public static final long serialVersionUID = 0L;
            private final Dataset stream$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Input schema:\\n", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.stream$1.schema().treeString()}));
            }
            {
                this.stream$1 = stream$1;
            }
        });
        return this.prepareDfToWrite(stream, topicFieldName, (Seq<TopicModel>)topicsToWrite, darwinConf);
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfKey$1(TopicModel topicModel, Function1 columnExtractor$1, Option darwinConf$1) {
        String string;
        block5: {
            Column column;
            block3: {
                String keyField;
                block4: {
                    block2: {
                        keyField = (String)topicModel.keyFieldName().get();
                        string = topicModel.topicDataType();
                        if (!"avro".equals(string)) break block2;
                        column = this.convertKeyForAvro((Column)columnExtractor$1.apply((Object)keyField), topicModel, (Option<Config>)darwinConf$1);
                        break block3;
                    }
                    String string2 = string;
                    String string3 = "json";
                    if (!(string2 == null ? string3 != null : !string2.equals(string3))) break block4;
                    String string4 = string;
                    String string5 = "binary";
                    if (!(string4 == null ? string5 != null : !string4.equals(string5))) break block4;
                    String string6 = string;
                    String string7 = "plaintext";
                    if (string6 != null ? !string6.equals(string7) : string7 != null) break block5;
                }
                column = this.convertKeyToBinary((Column)columnExtractor$1.apply((Object)keyField));
            }
            return column;
        }
        throw new UnsupportedOperationException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unknown topic data type ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{string})));
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfValue$1(TopicModel topicModel, StructType dfSchema$1, Function1 columnExtractor$2, Option darwinConf$2) {
        Option option;
        block4: {
            String string;
            block9: {
                Column column;
                block6: {
                    Seq columnsInValues;
                    block8: {
                        block7: {
                            block5: {
                                Seq seq;
                                block3: {
                                    block2: {
                                        Seq values;
                                        option = topicModel.valueFieldsNames();
                                        if (!(option instanceof Some)) break block2;
                                        Some some = (Some)option;
                                        seq = values = (Seq)some.x();
                                        break block3;
                                    }
                                    if (!None$.MODULE$.equals(option)) break block4;
                                    seq = (Seq)TopicModelUtils$.MODULE$.getAllValueFieldsFromSchema(topicModel).getOrElse((Function0)new Serializable(dfSchema$1){
                                        public static final long serialVersionUID = 0L;
                                        private final StructType dfSchema$1;

                                        public final List<String> apply() {
                                            return Predef$.MODULE$.refArrayOps((Object[])this.dfSchema$1.fieldNames()).toList();
                                        }
                                        {
                                            this.dfSchema$1 = dfSchema$1;
                                        }
                                    });
                                }
                                columnsInValues = seq;
                                string = topicModel.topicDataType();
                                if (!"avro".equals(string)) break block5;
                                column = this.convertValueForAvro((Seq<String>)columnsInValues, topicModel, dfSchema$1, (Function1<String, Column>)columnExtractor$2, (Option<Config>)darwinConf$2);
                                break block6;
                            }
                            if (!"json".equals(string)) break block7;
                            column = this.convertValueForJson((Seq<String>)columnsInValues);
                            break block6;
                        }
                        if (!"plaintext".equals(string)) break block8;
                        column = this.convertValueForPlainText((Seq<String>)columnsInValues, dfSchema$1);
                        break block6;
                    }
                    if (!"binary".equals(string)) break block9;
                    column = this.convertValueForBinary((Seq<String>)columnsInValues, dfSchema$1);
                }
                return column;
            }
            throw new UnsupportedOperationException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unknown topic data type ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{string})));
        }
        throw new MatchError((Object)option);
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfHeader$1(TopicModel head) {
        return (Column)head.headersFieldName().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Column apply(String colName) {
                return functions$.MODULE$.col(colName);
            }
        }).getOrElse((Function0)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Column apply() {
                return functions$.MODULE$.lit(null);
            }
        });
    }

    private KafkaWriters$() {
        MODULE$ = this;
        Logging.class.$init$((Logging)this);
    }
}

