/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import com.typesafe.config.Config;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.TopicModelUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroSerializerExpression;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroSerializerExpression$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.SubjectUtils$;
import it.agilelab.bigdata.wasp.models.DatastoreModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.models.TopicModel;
import java.io.Serializable;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.Cast$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayOps;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.Symbols;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class KafkaWriters$
implements Logging {
    public static KafkaWriters$ MODULE$;
    private final WaspLogger logger;

    static {
        new KafkaWriters$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    public Dataset<Row> addTopicNameCheckIfNeeded(Option<String> topicFieldName, Seq<TopicModel> topics, Dataset<Row> df) {
        Dataset dataset;
        if (topicFieldName.isEmpty()) {
            dataset = df;
        } else {
            Set acceptedTopicNames = ((TraversableOnce)topics.map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.name(), Seq$.MODULE$.canBuildFrom())).toSet();
            Function1 & Serializable & scala.Serializable checkTopicName = (Function1 & Serializable & scala.Serializable)topicName -> {
                if (!acceptedTopicNames.apply(topicName)) {
                    throw new Exception(new StringBuilder(69).append("Topic name \"").append((String)topicName).append("\" is not in the topic models for the MultiTopicModel used").toString());
                }
                return topicName;
            };
            JavaUniverse $u = package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            JavaUniverse $u2 = package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m2 = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            public final class It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator1$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($m.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$);
                }

                public It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator1$1() {
                }
            }
            public final class It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator2$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($m.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$);
                }

                public It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator2$1() {
                }
            }
            UserDefinedFunction checkTopicNameUdf = functions$.MODULE$.udf((Function1)checkTopicName, ((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator1$1()), ((TypeTags)$u2).TypeTag().apply((Mirror)$m2, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator2$1()));
            dataset = df.withColumn("topic", checkTopicNameUdf.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("topic")})));
        }
        return dataset;
    }

    public Dataset<Row> prepareDfToWrite(Dataset<Row> df, Option<String> topicFieldNameOpt, Seq<TopicModel> topics, Option<Config> darwinConf) {
        public final class It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator1$2
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($m.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$);
            }

            public It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator1$2() {
            }
        }
        Dataset dataset;
        JavaUniverse $u = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        UserDefinedFunction throwException = functions$.MODULE$.udf((Function1 & Serializable & scala.Serializable)s -> {
            throw new Exception(new StringBuilder(19).append("Unknown topic name ").append((String)s).toString());
        }, ((TypeTags)package$.MODULE$.universe()).TypeTag().Nothing(), ((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaWriters$$typecreator1$2()));
        Option<String> option = topicFieldNameOpt;
        if (option instanceof Some) {
            Some some = (Some)option;
            String topicFieldName = (String)some.value();
            Predef$.MODULE$.require(topics.size() > 1, (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Got topicFieldName = ").append(topicFieldName).append(" but only one topic to write (").append(topics).append(")").toString());
            Option<Column> keyCol = this.keyExpression(topics, topicFieldNameOpt, throwException, (Function1<String, Column>)(Function1 & Serializable & scala.Serializable)colName -> df.col(colName), darwinConf);
            Option<Column> headersCol = this.headerExpression(topics, topicFieldNameOpt, throwException);
            Column topicCol = functions$.MODULE$.col(topicFieldName);
            Column valueCol = this.valueExpression(topics, topicFieldNameOpt, df.schema(), (Function1<String, Column>)(Function1 & Serializable & scala.Serializable)colName -> df.col(colName), throwException, darwinConf);
            Seq columns = ((TraversableOnce)((TraversableLike)Option$.MODULE$.option2Iterable(keyCol.map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.as("key"))).$plus$plus((GenTraversableOnce)Option$.MODULE$.option2Iterable(headersCol.map((Function1 & Serializable & scala.Serializable)x$3 -> x$3.as("headers"))), Iterable$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{topicCol.as("topic"), valueCol.as("value")})), Iterable$.MODULE$.canBuildFrom())).toSeq();
            dataset = df.select(columns);
        } else if (None$.MODULE$.equals(option)) {
            Predef$.MODULE$.require(topics.size() == 1, (Function0 & Serializable & scala.Serializable)() -> "More than one topic to write specified but there's no column containing the topics' name.");
            Option<Column> keyCol = this.keyExpression(topics, topicFieldNameOpt, throwException, (Function1<String, Column>)(Function1 & Serializable & scala.Serializable)colName -> df.col(colName), darwinConf);
            Option<Column> headersCol = this.headerExpression(topics, topicFieldNameOpt, throwException);
            Column valueCol = this.valueExpression(topics, topicFieldNameOpt, df.schema(), (Function1<String, Column>)(Function1 & Serializable & scala.Serializable)colName -> df.col(colName), throwException, darwinConf);
            Seq columns = ((TraversableOnce)((TraversableLike)Option$.MODULE$.option2Iterable(keyCol.map((Function1 & Serializable & scala.Serializable)x$4 -> x$4.as("key"))).$plus$plus((GenTraversableOnce)Option$.MODULE$.option2Iterable(headersCol.map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.as("headers"))), Iterable$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{valueCol.as("value")})), Iterable$.MODULE$.canBuildFrom())).toSeq();
            dataset = df.select(columns);
        } else {
            throw new MatchError(option);
        }
        return dataset;
    }

    private Option<Column> keyExpression(Seq<TopicModel> topics, Option<String> topicFieldName, UserDefinedFunction exceptionUdf, Function1<String, Column> columnExtractor, Option<Config> darwinConf) {
        None$ none$;
        if (topics.exists((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)KafkaWriters$.$anonfun$keyExpression$1(x$6)))) {
            if (topicFieldName.isDefined()) {
                TopicModel head = (TopicModel)topics.head();
                Seq tail = (Seq)topics.tail();
                none$ = new Some((Object)((Column)tail.foldLeft((Object)functions$.MODULE$.when(this.conditionOnTopicName((String)topicFieldName.get(), head), (Object)this.valueOfKey$1(head, columnExtractor, darwinConf)), (Function2 & Serializable & scala.Serializable)(z, x) -> z.when(MODULE$.conditionOnTopicName((String)topicFieldName.get(), (TopicModel)x), (Object)this.valueOfKey$1((TopicModel)x, (Function1)columnExtractor, (Option)darwinConf)))).otherwise((Object)exceptionUdf.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col((String)topicFieldName.get())}))));
            } else {
                none$ = new Some((Object)this.valueOfKey$1((TopicModel)topics.head(), columnExtractor, darwinConf));
            }
        } else {
            none$ = None$.MODULE$;
        }
        return none$;
    }

    private Column valueExpression(Seq<TopicModel> topics, Option<String> topicFieldName, StructType dfSchema, Function1<String, Column> columnExtractor, UserDefinedFunction exceptionUdf, Option<Config> darwinConf) {
        Column column;
        if (topicFieldName.isDefined()) {
            TopicModel head = (TopicModel)topics.head();
            Seq tail = (Seq)topics.tail();
            column = ((Column)tail.foldLeft((Object)functions$.MODULE$.when(this.conditionOnTopicName((String)topicFieldName.get(), head), (Object)this.valueOfValue$1(head, dfSchema, columnExtractor, darwinConf)), (Function2 & Serializable & scala.Serializable)(z, x) -> z.when(MODULE$.conditionOnTopicName((String)topicFieldName.get(), (TopicModel)x), (Object)this.valueOfValue$1((TopicModel)x, dfSchema, (Function1)columnExtractor, (Option)darwinConf)))).otherwise((Object)exceptionUdf.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col((String)topicFieldName.get())})));
        } else {
            column = this.valueOfValue$1((TopicModel)topics.head(), dfSchema, columnExtractor, darwinConf);
        }
        return column;
    }

    private Option<Column> headerExpression(Seq<TopicModel> topics, Option<String> topicFieldName, UserDefinedFunction exceptionUdf) {
        None$ none$;
        if (topics.exists((Function1 & Serializable & scala.Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)KafkaWriters$.$anonfun$headerExpression$3(x$7)))) {
            if (topicFieldName.isDefined()) {
                TopicModel head = (TopicModel)topics.head();
                Seq tail = (Seq)topics.tail();
                none$ = new Some((Object)((Column)tail.foldLeft((Object)functions$.MODULE$.when(this.conditionOnTopicName((String)topicFieldName.get(), head), (Object)KafkaWriters$.valueOfHeader$1(head)), (Function2 & Serializable & scala.Serializable)(z, x) -> z.when(MODULE$.conditionOnTopicName((String)topicFieldName.get(), (TopicModel)x), (Object)KafkaWriters$.valueOfHeader$1(x)))).otherwise((Object)exceptionUdf.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col((String)topicFieldName.get())}))));
            } else {
                none$ = new Some((Object)KafkaWriters$.valueOfHeader$1((TopicModel)topics.head()));
            }
        } else {
            none$ = None$.MODULE$;
        }
        return none$;
    }

    private Column conditionOnTopicName(String topicFieldName, TopicModel head) {
        return functions$.MODULE$.col(topicFieldName).equalTo((Object)head.name());
    }

    private Column convertKeyForAvro(Column keyColumn, TopicModel topicModel, Option<Config> darwinConf) {
        Column column;
        if (topicModel.keySchema().isDefined()) {
            Function2 & Serializable & scala.Serializable intersect;
            Column exprToConvertToAvro = keyColumn;
            DataType keySchema = exprToConvertToAvro.expr().dataType();
            String avroRecordName = new StringBuilder(3).append(topicModel.name()).append("key").toString();
            String avroRecordNamespace = "wasp";
            if (topicModel.useAvroSchemaManager()) {
                Schema avroSchema = new Schema.Parser().parse((String)topicModel.keySchema().get());
                Schema updatedSchema = SubjectUtils$.MODULE$.attachSubjectToSchema(topicModel, avroSchema, true);
                Config config = (Config)darwinConf.get();
                intersect = (Function2 & Serializable & scala.Serializable)(child, sparkSchema) -> AvroSerializerExpression$.MODULE$.apply(config, updatedSchema, avroRecordName, avroRecordNamespace, child, sparkSchema);
            } else {
                Some some = new Some(topicModel.keySchema().get());
                intersect = (Function2 & Serializable & scala.Serializable)(child, sparkSchema) -> AvroSerializerExpression$.MODULE$.apply((Option)some, avroRecordName, avroRecordNamespace, child, sparkSchema);
            }
            Function2 & Serializable & scala.Serializable rowToAvroExprFactory = intersect;
            AvroSerializerExpression rowToAvroExpr = (AvroSerializerExpression)rowToAvroExprFactory.apply((Object)exprToConvertToAvro.expr(), (Object)keySchema);
            column = new Column((Expression)rowToAvroExpr).as("key");
        } else if (Cast$.MODULE$.canCast(keyColumn.expr().dataType(), (DataType)BinaryType$.MODULE$)) {
            column = keyColumn.cast((DataType)BinaryType$.MODULE$).as("key");
        } else {
            throw new Exception(new StringBuilder(160).append("Cannot serialize key for Kafka topic because column ").append(keyColumn.toString()).append(" cannot ").append("be cast to Binary. If you want to serialize it in Avro format, set TopicModel.keySchema accordingly.").toString());
        }
        return column;
    }

    private Column convertKeyToBinary(Column keyColumn) {
        if (!Cast$.MODULE$.canCast(keyColumn.expr().dataType(), (DataType)BinaryType$.MODULE$)) {
            throw new Exception(new StringBuilder(78).append("Cannot serialize key for Kafka topic because column ").append(keyColumn.toString()).append(" cannot ").append("be cast to Binary.").toString());
        }
        return keyColumn.cast((DataType)BinaryType$.MODULE$).as("key");
    }

    /*
     * WARNING - void declaration
     */
    private Column convertValueForJson(Seq<String> columnsInValues) {
        void var2_2;
        Column columnExpr = functions$.MODULE$.to_json(functions$.MODULE$.struct((Seq)columnsInValues.map((Function1 & Serializable & scala.Serializable)colName -> functions$.MODULE$.col(colName), Seq$.MODULE$.canBuildFrom()))).cast((DataType)BinaryType$.MODULE$).as("value");
        this.logger().debug((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(29).append("Generated select expression: ").append(columnExpr.expr().toString()).toString());
        return var2_2;
    }

    private Column convertValueForPlainText(Seq<String> columnsInValues, StructType schema) {
        Predef$.MODULE$.require(columnsInValues.size() == 1, (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(136).append("Exactly one value field name must be defined for plaintext topic data type but zero or more than one ").append("were specified; value field names: ").append(columnsInValues.mkString("\"", "\", \"", "\"")).toString());
        String valueFieldName = (String)columnsInValues.head();
        Option maybeValueColumn = schema.find((Function1 & Serializable & scala.Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)KafkaWriters$.$anonfun$convertValueForPlainText$2(valueFieldName, x$8)));
        Predef$.MODULE$.require(maybeValueColumn.isDefined(), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(80).append("The specified value field name \"").append(valueFieldName).append("\" does not match any column; columns in schema: ").append(((TraversableOnce)schema.map((Function1 & Serializable & scala.Serializable)x$9 -> x$9.name(), Seq$.MODULE$.canBuildFrom())).mkString("[", "], [", "]")).toString());
        StructField valueColumn = (StructField)maybeValueColumn.get();
        DataType valueColumnDataType = valueColumn.dataType();
        Predef$.MODULE$.require(Cast$.MODULE$.canCast(valueColumnDataType, (DataType)StringType$.MODULE$), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(107).append("The specified value field name \"").append(valueFieldName).append("\" matches a column with a type that is not string; ").append("incompatible type ").append(valueColumnDataType).append(" found").toString());
        Column columnExpr = functions$.MODULE$.col(valueFieldName).cast((DataType)StringType$.MODULE$).cast((DataType)BinaryType$.MODULE$).as("value");
        this.logger().debug((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(29).append("Generated select expression: ").append(columnExpr.expr().toString()).toString());
        return columnExpr;
    }

    public Column convertValueForBinary(Seq<String> columnsInValues, StructType schema) {
        DataType valueColumnDataType;
        Predef$.MODULE$.require(columnsInValues.size() == 1, (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(133).append("Exactly one value field name must be defined for binary topic data type but zero or more than one were ").append("specified; value field names: ").append(columnsInValues.mkString("\"", "\", \"", "\"")).toString());
        String valueFieldName = (String)columnsInValues.head();
        Option maybeValueColumn = schema.find((Function1 & Serializable & scala.Serializable)x$10 -> BoxesRunTime.boxToBoolean((boolean)KafkaWriters$.$anonfun$convertValueForBinary$2(valueFieldName, x$10)));
        Predef$.MODULE$.require(maybeValueColumn.isDefined(), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(80).append("The specified value field name \"").append(valueFieldName).append("\" does not match any column; columns in schema: ").append(((TraversableOnce)schema.map((Function1 & Serializable & scala.Serializable)x$11 -> x$11.name(), Seq$.MODULE$.canBuildFrom())).mkString("[", "], [", "]")).toString());
        StructField valueColumn = (StructField)maybeValueColumn.get();
        DataType dataType = valueColumnDataType = valueColumn.dataType();
        BinaryType$ binaryType$ = BinaryType$.MODULE$;
        Predef$.MODULE$.require(!(dataType != null ? !dataType.equals(binaryType$) : binaryType$ != null), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(107).append("The specified value field name \"").append(valueFieldName).append("\" matches a column with a type that is not binary; ").append("incompatible type ").append(valueColumnDataType).append(" found").toString());
        String expression = new StringBuilder(9).append(valueFieldName).append(" AS value").toString();
        this.logger().debug((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Generated select expressions: ").append(expression).toString());
        return functions$.MODULE$.expr(expression);
    }

    private Column convertValueForAvro(Seq<String> columnsInValues, TopicModel topicModel, StructType schema, Function1<String, Column> columnExtractor, Option<Config> darwinConf) {
        Function2 & Serializable & scala.Serializable intersect;
        Expression exprToConvertToAvro = functions$.MODULE$.struct((Seq)columnsInValues.map(columnExtractor, Seq$.MODULE$.canBuildFrom())).expr();
        StructType valueSchema = (StructType)exprToConvertToAvro.dataType();
        String avroRecordName = topicModel.name();
        String avroRecordNamespace = "wasp";
        if (topicModel.useAvroSchemaManager()) {
            Schema avroSchema = new Schema.Parser().parse(topicModel.getJsonSchema());
            Schema updatedSchema = SubjectUtils$.MODULE$.attachSubjectToSchema(topicModel, avroSchema, false);
            Config config = (Config)darwinConf.get();
            intersect = (Function2 & Serializable & scala.Serializable)(child, sparkSchema) -> AvroSerializerExpression$.MODULE$.apply(config, updatedSchema, avroRecordName, avroRecordNamespace, child, sparkSchema);
        } else {
            Some some = new Some((Object)topicModel.getJsonSchema());
            intersect = (Function2 & Serializable & scala.Serializable)(child, sparkSchema) -> AvroSerializerExpression$.MODULE$.apply((Option)some, avroRecordName, avroRecordNamespace, child, sparkSchema);
        }
        Function2 & Serializable & scala.Serializable rowToAvroExprFactory = intersect;
        AvroSerializerExpression rowToAvroExpr = (AvroSerializerExpression)rowToAvroExprFactory.apply((Object)exprToConvertToAvro, (Object)valueSchema);
        return new Column((Expression)rowToAvroExpr);
    }

    public Dataset<Row> convertDataframe(Dataset<Row> stream, Option<String> topicFieldName, Seq<TopicModel> topics, DatastoreModel mainTopicModel, Option<Config> darwinConf) {
        TopicModelUtils$.MODULE$.isTopicWritable(mainTopicModel, (Seq<TopicModel>)topics, stream).fold((Function1 & Serializable & scala.Serializable)s -> {
            throw new IllegalArgumentException((String)s);
        }, (Function1 & Serializable & scala.Serializable)x$12 -> {
            KafkaWriters$.$anonfun$convertDataframe$2(x$12);
            return BoxedUnit.UNIT;
        });
        .colon.colon topicsToWrite = topics.isEmpty() ? new .colon.colon((Object)((TopicModel)mainTopicModel), (List)Nil$.MODULE$) : topics;
        this.logger().info(() -> KafkaWriters$.$anonfun$convertDataframe$3((Seq)topicsToWrite));
        if (mainTopicModel instanceof MultiTopicModel) {
            this.logger().info(() -> KafkaWriters$.$anonfun$convertDataframe$5(mainTopicModel, (Seq)topics));
        }
        this.logger().debug((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(14).append("Input schema:\n").append(stream.schema().treeString()).toString());
        return this.prepareDfToWrite(stream, topicFieldName, (Seq<TopicModel>)topicsToWrite, darwinConf);
    }

    private final Column valueOfKey$1(TopicModel topicModel, Function1 columnExtractor$1, Option darwinConf$1) {
        Column column;
        String keyField = (String)topicModel.keyFieldName().get();
        String string = topicModel.topicDataType();
        if ("avro".equals(string)) {
            column = this.convertKeyForAvro((Column)columnExtractor$1.apply((Object)keyField), topicModel, (Option<Config>)darwinConf$1);
        } else {
            String string2 = string;
            String string3 = "json";
            if (string2 == null ? string3 != null : !string2.equals(string3)) {
                String string4 = string;
                String string5 = "binary";
                if (string4 == null ? string5 != null : !string4.equals(string5)) {
                    String string6 = string;
                    String string7 = "plaintext";
                    if (string6 != null ? !string6.equals(string7) : string7 != null) {
                        throw new UnsupportedOperationException(new StringBuilder(24).append("Unknown topic data type ").append(string).toString());
                    }
                }
            }
            column = this.convertKeyToBinary((Column)columnExtractor$1.apply((Object)keyField));
        }
        return column;
    }

    public static final /* synthetic */ boolean $anonfun$keyExpression$1(TopicModel x$6) {
        return x$6.keyFieldName().isDefined();
    }

    private final Column valueOfValue$1(TopicModel topicModel, StructType dfSchema$1, Function1 columnExtractor$2, Option darwinConf$2) {
        Column column;
        Seq seq;
        Option option = topicModel.valueFieldsNames();
        if (option instanceof Some) {
            Seq values;
            Some some = (Some)option;
            seq = values = (Seq)some.value();
        } else if (None$.MODULE$.equals(option)) {
            seq = (Seq)TopicModelUtils$.MODULE$.getAllValueFieldsFromSchema(topicModel).getOrElse((Function0 & Serializable & scala.Serializable)() -> new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])dfSchema$1.fieldNames())).toList());
        } else {
            throw new MatchError((Object)option);
        }
        Seq columnsInValues = seq;
        String string = topicModel.topicDataType();
        if ("avro".equals(string)) {
            column = this.convertValueForAvro((Seq<String>)columnsInValues, topicModel, dfSchema$1, (Function1<String, Column>)columnExtractor$2, (Option<Config>)darwinConf$2);
        } else if ("json".equals(string)) {
            column = this.convertValueForJson((Seq<String>)columnsInValues);
        } else if ("plaintext".equals(string)) {
            column = this.convertValueForPlainText((Seq<String>)columnsInValues, dfSchema$1);
        } else if ("binary".equals(string)) {
            column = this.convertValueForBinary((Seq<String>)columnsInValues, dfSchema$1);
        } else {
            throw new UnsupportedOperationException(new StringBuilder(24).append("Unknown topic data type ").append(string).toString());
        }
        return column;
    }

    private static final Column valueOfHeader$1(TopicModel head) {
        return (Column)head.headersFieldName().map((Function1 & Serializable & scala.Serializable)colName -> functions$.MODULE$.col(colName)).getOrElse((Function0 & Serializable & scala.Serializable)() -> functions$.MODULE$.lit(null));
    }

    public static final /* synthetic */ boolean $anonfun$headerExpression$3(TopicModel x$7) {
        return x$7.headersFieldName().isDefined();
    }

    public static final /* synthetic */ boolean $anonfun$convertValueForPlainText$2(String valueFieldName$1, StructField x$8) {
        String string = x$8.name();
        String string2 = valueFieldName$1;
        return !(string != null ? !string.equals(string2) : string2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$convertValueForBinary$2(String valueFieldName$2, StructField x$10) {
        String string = x$10.name();
        String string2 = valueFieldName$2;
        return !(string != null ? !string.equals(string2) : string2 != null);
    }

    public static final /* synthetic */ void $anonfun$convertDataframe$2(BoxedUnit x$12) {
    }

    public static final /* synthetic */ String $anonfun$convertDataframe$3(Seq topicsToWrite$1) {
        return new StringBuilder(27).append("Writing with topic models: ").append(((TraversableOnce)topicsToWrite$1.map((Function1 & Serializable & scala.Serializable)x$13 -> x$13.name(), Seq$.MODULE$.canBuildFrom())).mkString(" ")).toString();
    }

    public static final /* synthetic */ String $anonfun$convertDataframe$5(DatastoreModel mainTopicModel$1, Seq topics$2) {
        return new StringBuilder(48).append("Topic model \"").append(mainTopicModel$1.name()).append("\" is a MultiTopicModel for topics: ").append(topics$2).toString();
    }

    private KafkaWriters$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
    }
}

