package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import com.typesafe.config.Config;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroSerializerExpression;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.SubjectUtils$;
import it.agilelab.bigdata.wasp.models.DatastoreModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.models.TopicModel;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.Cast$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;

/* compiled from: KafkaWriters.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaWriters$.class */
public final class KafkaWriters$ implements Logging {
    public static final KafkaWriters$ MODULE$ = null;
    private final WaspLogger logger;

    static {
        new KafkaWriters$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public Dataset<Row> addTopicNameCheckIfNeeded(Option<String> option, Seq<TopicModel> seq, Dataset<Row> dataset) {
        if (option.isEmpty()) {
            return dataset;
        }
        KafkaWriters$$anonfun$2 kafkaWriters$$anonfun$2 = new KafkaWriters$$anonfun$2(((TraversableOnce) seq.map(new KafkaWriters$$anonfun$1(), Seq$.MODULE$.canBuildFrom())).toSet());
        return dataset.withColumn("topic", functions$.MODULE$.udf(kafkaWriters$$anonfun$2, package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        })).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("topic")})));
    }

    public Dataset<Row> prepareDfToWrite(Dataset<Row> dataset, Option<String> option, Seq<TopicModel> seq, Option<Config> option2) {
        Dataset<Row> select;
        UserDefinedFunction udf = functions$.MODULE$.udf(new KafkaWriters$$anonfun$3(), package$.MODULE$.universe().TypeTag().Nothing(), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator3$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }));
        if (option instanceof Some) {
            String str = (String) ((Some) option).x();
            Predef$.MODULE$.require(seq.size() > 1, new KafkaWriters$$anonfun$prepareDfToWrite$1(seq, str));
            select = dataset.select(((TraversableOnce) ((TraversableLike) Option$.MODULE$.option2Iterable(keyExpression(seq, option, udf, new KafkaWriters$$anonfun$4(dataset), option2).map(new KafkaWriters$$anonfun$6())).$plus$plus(Option$.MODULE$.option2Iterable(headerExpression(seq, option, udf).map(new KafkaWriters$$anonfun$7())), Iterable$.MODULE$.canBuildFrom())).$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(str).as("topic"), valueExpression(seq, option, dataset.schema(), new KafkaWriters$$anonfun$5(dataset), udf, option2).as("value")})), Iterable$.MODULE$.canBuildFrom())).toSeq());
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            Predef$.MODULE$.require(seq.size() == 1, new KafkaWriters$$anonfun$prepareDfToWrite$2());
            select = dataset.select(((TraversableOnce) ((TraversableLike) Option$.MODULE$.option2Iterable(keyExpression(seq, option, udf, new KafkaWriters$$anonfun$8(dataset), option2).map(new KafkaWriters$$anonfun$10())).$plus$plus(Option$.MODULE$.option2Iterable(headerExpression(seq, option, udf).map(new KafkaWriters$$anonfun$11())), Iterable$.MODULE$.canBuildFrom())).$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{valueExpression(seq, option, dataset.schema(), new KafkaWriters$$anonfun$9(dataset), udf, option2).as("value")})), Iterable$.MODULE$.canBuildFrom())).toSeq());
        }
        return select;
    }

    private Option<Column> keyExpression(Seq<TopicModel> seq, Option<String> option, UserDefinedFunction userDefinedFunction, Function1<String, Column> function1, Option<Config> option2) {
        if (!seq.exists(new KafkaWriters$$anonfun$keyExpression$1())) {
            return None$.MODULE$;
        }
        if (!option.isDefined()) {
            return new Some(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfKey$1((TopicModel) seq.head(), function1, option2));
        }
        TopicModel topicModel = (TopicModel) seq.head();
        return new Some(((Column) ((Seq) seq.tail()).foldLeft(functions$.MODULE$.when(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName((String) option.get(), topicModel), it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfKey$1(topicModel, function1, option2)), new KafkaWriters$$anonfun$keyExpression$2(option, function1, option2))).otherwise(userDefinedFunction.apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col((String) option.get())}))));
    }

    private Column valueExpression(Seq<TopicModel> seq, Option<String> option, StructType structType, Function1<String, Column> function1, UserDefinedFunction userDefinedFunction, Option<Config> option2) {
        if (!option.isDefined()) {
            return it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfValue$1((TopicModel) seq.head(), structType, function1, option2);
        }
        TopicModel topicModel = (TopicModel) seq.head();
        return ((Column) ((Seq) seq.tail()).foldLeft(functions$.MODULE$.when(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName((String) option.get(), topicModel), it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfValue$1(topicModel, structType, function1, option2)), new KafkaWriters$$anonfun$valueExpression$1(option, structType, function1, option2))).otherwise(userDefinedFunction.apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col((String) option.get())})));
    }

    private Option<Column> headerExpression(Seq<TopicModel> seq, Option<String> option, UserDefinedFunction userDefinedFunction) {
        if (!seq.exists(new KafkaWriters$$anonfun$headerExpression$1())) {
            return None$.MODULE$;
        }
        if (!option.isDefined()) {
            return new Some(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfHeader$1((TopicModel) seq.head()));
        }
        TopicModel topicModel = (TopicModel) seq.head();
        return new Some(((Column) ((Seq) seq.tail()).foldLeft(functions$.MODULE$.when(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName((String) option.get(), topicModel), it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfHeader$1(topicModel)), new KafkaWriters$$anonfun$headerExpression$2(option))).otherwise(userDefinedFunction.apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col((String) option.get())}))));
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$conditionOnTopicName(String str, TopicModel topicModel) {
        return functions$.MODULE$.col(str).equalTo(topicModel.name());
    }

    private Column convertKeyForAvro(Column column, TopicModel topicModel, Option<Config> option) {
        if (topicModel.keySchema().isDefined()) {
            DataType dataType = column.expr().dataType();
            String stringBuilder = new StringBuilder().append(topicModel.name()).append("key").toString();
            return new Column((AvroSerializerExpression) (topicModel.useAvroSchemaManager() ? new KafkaWriters$$anonfun$13(stringBuilder, "wasp", SubjectUtils$.MODULE$.attachSubjectToSchema(topicModel, new Schema.Parser().parse((String) topicModel.keySchema().get()), true), (Config) option.get()) : new KafkaWriters$$anonfun$14(stringBuilder, "wasp", new Some(topicModel.keySchema().get()))).apply(column.expr(), dataType)).as("key");
        }
        if (Cast$.MODULE$.canCast(column.expr().dataType(), BinaryType$.MODULE$)) {
            return column.cast(BinaryType$.MODULE$).as("key");
        }
        throw new Exception(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Cannot serialize key for Kafka topic because column ", " cannot "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{column.toString()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"be cast to Binary. If you want to serialize it in Avro format, set TopicModel.keySchema accordingly."})).s(Nil$.MODULE$)).toString());
    }

    private Column convertKeyToBinary(Column column) {
        if (Cast$.MODULE$.canCast(column.expr().dataType(), BinaryType$.MODULE$)) {
            return column.cast(BinaryType$.MODULE$).as("key");
        }
        throw new Exception(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Cannot serialize key for Kafka topic because column ", " cannot "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{column.toString()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"be cast to Binary."})).s(Nil$.MODULE$)).toString());
    }

    private Column convertValueForJson(Seq<String> seq) {
        Column as = functions$.MODULE$.to_json(functions$.MODULE$.struct((Seq) seq.map(new KafkaWriters$$anonfun$15(), Seq$.MODULE$.canBuildFrom()))).cast(BinaryType$.MODULE$).as("value");
        logger().debug(new KafkaWriters$$anonfun$convertValueForJson$1(as));
        return as;
    }

    private Column convertValueForPlainText(Seq<String> seq, StructType structType) {
        Predef$.MODULE$.require(seq.size() == 1, new KafkaWriters$$anonfun$convertValueForPlainText$1(seq));
        String str = (String) seq.head();
        Option find = structType.find(new KafkaWriters$$anonfun$16(str));
        Predef$.MODULE$.require(find.isDefined(), new KafkaWriters$$anonfun$convertValueForPlainText$2(structType, str));
        DataType dataType = ((StructField) find.get()).dataType();
        Predef$.MODULE$.require(Cast$.MODULE$.canCast(dataType, StringType$.MODULE$), new KafkaWriters$$anonfun$convertValueForPlainText$3(str, dataType));
        Column as = functions$.MODULE$.col(str).cast(StringType$.MODULE$).cast(BinaryType$.MODULE$).as("value");
        logger().debug(new KafkaWriters$$anonfun$convertValueForPlainText$4(as));
        return as;
    }

    public Column convertValueForBinary(Seq<String> seq, StructType structType) {
        Predef$.MODULE$.require(seq.size() == 1, new KafkaWriters$$anonfun$convertValueForBinary$1(seq));
        String str = (String) seq.head();
        Option find = structType.find(new KafkaWriters$$anonfun$17(str));
        Predef$.MODULE$.require(find.isDefined(), new KafkaWriters$$anonfun$convertValueForBinary$2(structType, str));
        DataType dataType = ((StructField) find.get()).dataType();
        Predef$ predef$ = Predef$.MODULE$;
        BinaryType$ binaryType$ = BinaryType$.MODULE$;
        predef$.require(dataType != null ? dataType.equals(binaryType$) : binaryType$ == null, new KafkaWriters$$anonfun$convertValueForBinary$3(str, dataType));
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " AS value"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
        logger().debug(new KafkaWriters$$anonfun$convertValueForBinary$4(s));
        return functions$.MODULE$.expr(s);
    }

    private Column convertValueForAvro(Seq<String> seq, TopicModel topicModel, StructType structType, Function1<String, Column> function1, Option<Config> option) {
        Function2 kafkaWriters$$anonfun$19;
        Expression expr = functions$.MODULE$.struct((Seq) seq.map(function1, Seq$.MODULE$.canBuildFrom())).expr();
        StructType dataType = expr.dataType();
        String name = topicModel.name();
        if (topicModel.useAvroSchemaManager()) {
            kafkaWriters$$anonfun$19 = new KafkaWriters$$anonfun$18(name, "wasp", SubjectUtils$.MODULE$.attachSubjectToSchema(topicModel, new Schema.Parser().parse(topicModel.getJsonSchema()), false), (Config) option.get());
        } else {
            kafkaWriters$$anonfun$19 = new KafkaWriters$$anonfun$19(name, "wasp", new Some(topicModel.getJsonSchema()));
        }
        return new Column((AvroSerializerExpression) kafkaWriters$$anonfun$19.apply(expr, dataType));
    }

    public Dataset<Row> convertDataframe(Dataset<Row> dataset, Option<String> option, Seq<TopicModel> seq, DatastoreModel datastoreModel, Option<Config> option2) {
        TopicModelUtils$.MODULE$.isTopicWritable(datastoreModel, seq, dataset).fold(new KafkaWriters$$anonfun$convertDataframe$1(), new KafkaWriters$$anonfun$convertDataframe$2());
        Seq<TopicModel> apply = seq.isEmpty() ? List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new TopicModel[]{(TopicModel) datastoreModel})) : seq;
        logger().info(new KafkaWriters$$anonfun$convertDataframe$3(apply));
        if (datastoreModel instanceof MultiTopicModel) {
            logger().info(new KafkaWriters$$anonfun$convertDataframe$4(seq, datastoreModel));
        }
        logger().debug(new KafkaWriters$$anonfun$convertDataframe$5(dataset));
        return prepareDfToWrite(dataset, option, apply, option2);
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfKey$1(TopicModel topicModel, Function1 function1, Option option) {
        Column convertKeyToBinary;
        String str = (String) topicModel.keyFieldName().get();
        String str2 = topicModel.topicDataType();
        if ("avro".equals(str2)) {
            convertKeyToBinary = convertKeyForAvro((Column) function1.apply(str), topicModel, option);
        } else {
            if (str2 != null ? !str2.equals("json") : "json" != 0) {
                if (str2 != null ? !str2.equals("binary") : "binary" != 0) {
                    if (str2 != null ? !str2.equals("plaintext") : "plaintext" != 0) {
                        throw new UnsupportedOperationException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unknown topic data type ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2})));
                    }
                }
            }
            convertKeyToBinary = convertKeyToBinary((Column) function1.apply(str));
        }
        return convertKeyToBinary;
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfValue$1(TopicModel topicModel, StructType structType, Function1 function1, Option option) {
        Seq<String> seq;
        Column convertValueForBinary;
        Some valueFieldsNames = topicModel.valueFieldsNames();
        if (valueFieldsNames instanceof Some) {
            seq = (Seq) valueFieldsNames.x();
        } else {
            if (!None$.MODULE$.equals(valueFieldsNames)) {
                throw new MatchError(valueFieldsNames);
            }
            seq = (Seq) TopicModelUtils$.MODULE$.getAllValueFieldsFromSchema(topicModel).getOrElse(new KafkaWriters$$anonfun$12(structType));
        }
        Seq<String> seq2 = seq;
        String str = topicModel.topicDataType();
        if ("avro".equals(str)) {
            convertValueForBinary = convertValueForAvro(seq2, topicModel, structType, function1, option);
        } else if ("json".equals(str)) {
            convertValueForBinary = convertValueForJson(seq2);
        } else if ("plaintext".equals(str)) {
            convertValueForBinary = convertValueForPlainText(seq2, structType);
        } else {
            if (!"binary".equals(str)) {
                throw new UnsupportedOperationException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unknown topic data type ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
            }
            convertValueForBinary = convertValueForBinary(seq2, structType);
        }
        return convertValueForBinary;
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfHeader$1(TopicModel topicModel) {
        return (Column) topicModel.headersFieldName().map(new KafkaWriters$$anonfun$it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfHeader$1$1()).getOrElse(new KafkaWriters$$anonfun$it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaWriters$$valueOfHeader$1$2());
    }

    private KafkaWriters$() {
        MODULE$ = this;
        Logging.class.$init$(this);
    }
}
