package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import com.typesafe.config.Config;
import it.agilelab.bigdata.utils.NonEmptyList;
import it.agilelab.bigdata.utils.NonEmptyList$;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroSerializerExpression;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroSerializerExpression$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.SubjectUtils$;
import it.agilelab.bigdata.wasp.models.DatastoreModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.models.TopicModel;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.Cast$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayOps;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaWriters.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaWriters$.class */
public final class KafkaWriters$ implements Logging {
    public static KafkaWriters$ MODULE$;
    private final UserDefinedFunction unknownTopicExpression;
    private final WaspLogger logger;

    static {
        new KafkaWriters$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public Dataset<Row> addTopicNameCheckIfNeeded(Option<String> option, Seq<TopicModel> seq, Dataset<Row> dataset) {
        if (option.isEmpty()) {
            return dataset;
        }
        Set set = ((TraversableOnce) seq.map(topicModel -> {
            return topicModel.name();
        }, Seq$.MODULE$.canBuildFrom())).toSet();
        Function1 function1 = str -> {
            if (set.apply(str)) {
                return str;
            }
            throw new Exception(new StringBuilder(69).append("Topic name \"").append(str).append("\" is not in the topic models for the MultiTopicModel used").toString());
        };
        return dataset.withColumn("topic", functions$.MODULE$.udf(function1, package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        })).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("topic")})));
    }

    public Dataset<Row> prepareDfToWrite(Dataset<Row> dataset, Option<String> option, NonEmptyList<TopicModel> nonEmptyList, Option<Config> option2) {
        if (option instanceof Some) {
            String str = (String) ((Some) option).value();
            Predef$.MODULE$.require(nonEmptyList.size() > 1, () -> {
                return new StringBuilder(52).append("Got topicFieldName = ").append(str).append(" but only one topic to write (").append(nonEmptyList).append(")").toString();
            });
            return dataset.select(((TraversableOnce) ((TraversableLike) Option$.MODULE$.option2Iterable(keyExpression(nonEmptyList, option, str2 -> {
                return dataset.col(str2);
            }, option2).map(column -> {
                return column.as("key");
            })).$plus$plus(Option$.MODULE$.option2Iterable(headerExpression(nonEmptyList, option).map(column2 -> {
                return column2.as("headers");
            })), Iterable$.MODULE$.canBuildFrom())).$plus$plus(new $colon.colon(functions$.MODULE$.col(str).as("topic"), new $colon.colon(valueExpression(nonEmptyList, option, dataset.schema(), str3 -> {
                return dataset.col(str3);
            }, option2).as("value"), Nil$.MODULE$)), Iterable$.MODULE$.canBuildFrom())).toSeq());
        }
        if (!None$.MODULE$.equals(option)) {
            throw new MatchError(option);
        }
        Predef$.MODULE$.require(nonEmptyList.size() == 1, () -> {
            return "More than one topic to write specified but there's no column containing the topics' name.";
        });
        return dataset.select(((TraversableOnce) ((TraversableLike) Option$.MODULE$.option2Iterable(keyExpression(nonEmptyList, option, str4 -> {
            return dataset.col(str4);
        }, option2).map(column3 -> {
            return column3.as("key");
        })).$plus$plus(Option$.MODULE$.option2Iterable(headerExpression(nonEmptyList, option).map(column4 -> {
            return column4.as("headers");
        })), Iterable$.MODULE$.canBuildFrom())).$plus$plus(new $colon.colon(valueExpression(nonEmptyList, option, dataset.schema(), str5 -> {
            return dataset.col(str5);
        }, option2).as("value"), Nil$.MODULE$), Iterable$.MODULE$.canBuildFrom())).toSeq());
    }

    private Option<Column> keyExpression(NonEmptyList<TopicModel> nonEmptyList, Option<String> option, Function1<String, Column> function1, Option<Config> option2) {
        return nonEmptyList.exists(topicModel -> {
            return BoxesRunTime.boxToBoolean($anonfun$keyExpression$3(topicModel));
        }) ? new Some(computeFieldExpression(nonEmptyList, option, topicModel2 -> {
            return valueOfKey$1(topicModel2, function1, option2);
        })) : None$.MODULE$;
    }

    private Column valueExpression(NonEmptyList<TopicModel> nonEmptyList, Option<String> option, StructType structType, Function1<String, Column> function1, Option<Config> option2) {
        return computeFieldExpression(nonEmptyList, option, topicModel -> {
            return this.valueOfValue$1(topicModel, structType, function1, option2);
        });
    }

    private Option<Column> headerExpression(NonEmptyList<TopicModel> nonEmptyList, Option<String> option) {
        return nonEmptyList.exists(topicModel -> {
            return BoxesRunTime.boxToBoolean($anonfun$headerExpression$3(topicModel));
        }) ? new Some(computeFieldExpression(nonEmptyList, option, topicModel2 -> {
            return valueOfHeader$1(topicModel2);
        })) : None$.MODULE$;
    }

    private UserDefinedFunction unknownTopicExpression() {
        return this.unknownTopicExpression;
    }

    private Column computeFieldExpression(NonEmptyList<TopicModel> nonEmptyList, Option<String> option, Function1<TopicModel, Column> function1) {
        if (nonEmptyList == null) {
            throw new MatchError(nonEmptyList);
        }
        Tuple2 tuple2 = new Tuple2((TopicModel) nonEmptyList.head(), nonEmptyList.tail());
        TopicModel topicModel = (TopicModel) tuple2._1();
        List list = (List) tuple2._2();
        return (Column) option.map(str -> {
            return ((Column) list.foldLeft(functions$.MODULE$.when(MODULE$.conditionOnTopicName(str, topicModel), function1.apply(topicModel)), (column, topicModel2) -> {
                return column.when(MODULE$.conditionOnTopicName(str, topicModel2), function1.apply(topicModel2));
            })).otherwise(MODULE$.unknownTopicExpression().apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(str)})));
        }).getOrElse(() -> {
            return (Column) function1.apply(topicModel);
        });
    }

    private Column conditionOnTopicName(String str, TopicModel topicModel) {
        return functions$.MODULE$.col(str).equalTo(topicModel.name());
    }

    private Column convertKeyForAvro(Column column, TopicModel topicModel, Option<Config> option) {
        Function2 function2;
        if (!topicModel.keySchema().isDefined()) {
            if (Cast$.MODULE$.canCast(column.expr().dataType(), BinaryType$.MODULE$)) {
                return column.cast(BinaryType$.MODULE$).as("key");
            }
            throw new Exception(new StringBuilder(160).append("Cannot serialize key for Kafka topic because column ").append(column.toString()).append(" cannot ").append("be cast to Binary. If you want to serialize it in Avro format, set TopicModel.keySchema accordingly.").toString());
        }
        DataType dataType = column.expr().dataType();
        String sb = new StringBuilder(3).append(topicModel.name()).append("key").toString();
        String str = "wasp";
        if (topicModel.useAvroSchemaManager()) {
            Schema attachSubjectToSchema = SubjectUtils$.MODULE$.attachSubjectToSchema(topicModel, new Schema.Parser().parse((String) topicModel.keySchema().get()), true);
            Config config = (Config) option.get();
            function2 = (expression, dataType2) -> {
                return AvroSerializerExpression$.MODULE$.apply(config, attachSubjectToSchema, sb, str, expression, dataType2);
            };
        } else {
            Some some = new Some(topicModel.keySchema().get());
            function2 = (expression2, dataType3) -> {
                return AvroSerializerExpression$.MODULE$.apply(some, sb, str, expression2, dataType3);
            };
        }
        return new Column((AvroSerializerExpression) function2.apply(column.expr(), dataType)).as("key");
    }

    private Column convertKeyToBinary(Column column) {
        if (Cast$.MODULE$.canCast(column.expr().dataType(), BinaryType$.MODULE$)) {
            return column.cast(BinaryType$.MODULE$).as("key");
        }
        throw new Exception(new StringBuilder(78).append("Cannot serialize key for Kafka topic because column ").append(column.toString()).append(" cannot ").append("be cast to Binary.").toString());
    }

    private Column convertValueForJson(Seq<String> seq) {
        Column as = functions$.MODULE$.to_json(functions$.MODULE$.struct((Seq) seq.map(str -> {
            return functions$.MODULE$.col(str);
        }, Seq$.MODULE$.canBuildFrom()))).cast(BinaryType$.MODULE$).as("value");
        logger().debug(() -> {
            return new StringBuilder(29).append("Generated select expression: ").append(as.expr().toString()).toString();
        });
        return as;
    }

    private Column convertValueForPlainText(Seq<String> seq, StructType structType) {
        Predef$.MODULE$.require(seq.size() == 1, () -> {
            return new StringBuilder(136).append("Exactly one value field name must be defined for plaintext topic data type but zero or more than one ").append("were specified; value field names: ").append(seq.mkString("\"", "\", \"", "\"")).toString();
        });
        String str = (String) seq.head();
        Option find = structType.find(structField -> {
            return BoxesRunTime.boxToBoolean($anonfun$convertValueForPlainText$2(str, structField));
        });
        Predef$.MODULE$.require(find.isDefined(), () -> {
            return new StringBuilder(80).append("The specified value field name \"").append(str).append("\" does not match any column; columns in schema: ").append(((TraversableOnce) structType.map(structField2 -> {
                return structField2.name();
            }, Seq$.MODULE$.canBuildFrom())).mkString("[", "], [", "]")).toString();
        });
        DataType dataType = ((StructField) find.get()).dataType();
        Predef$.MODULE$.require(Cast$.MODULE$.canCast(dataType, StringType$.MODULE$), () -> {
            return new StringBuilder(107).append("The specified value field name \"").append(str).append("\" matches a column with a type that is not string; ").append("incompatible type ").append(dataType).append(" found").toString();
        });
        Column as = functions$.MODULE$.col(str).cast(StringType$.MODULE$).cast(BinaryType$.MODULE$).as("value");
        logger().debug(() -> {
            return new StringBuilder(29).append("Generated select expression: ").append(as.expr().toString()).toString();
        });
        return as;
    }

    public Column convertValueForBinary(Seq<String> seq, StructType structType) {
        Predef$.MODULE$.require(seq.size() == 1, () -> {
            return new StringBuilder(133).append("Exactly one value field name must be defined for binary topic data type but zero or more than one were ").append("specified; value field names: ").append(seq.mkString("\"", "\", \"", "\"")).toString();
        });
        String str = (String) seq.head();
        Option find = structType.find(structField -> {
            return BoxesRunTime.boxToBoolean($anonfun$convertValueForBinary$2(str, structField));
        });
        Predef$.MODULE$.require(find.isDefined(), () -> {
            return new StringBuilder(80).append("The specified value field name \"").append(str).append("\" does not match any column; columns in schema: ").append(((TraversableOnce) structType.map(structField2 -> {
                return structField2.name();
            }, Seq$.MODULE$.canBuildFrom())).mkString("[", "], [", "]")).toString();
        });
        DataType dataType = ((StructField) find.get()).dataType();
        Predef$ predef$ = Predef$.MODULE$;
        BinaryType$ binaryType$ = BinaryType$.MODULE$;
        predef$.require(dataType != null ? dataType.equals(binaryType$) : binaryType$ == null, () -> {
            return new StringBuilder(107).append("The specified value field name \"").append(str).append("\" matches a column with a type that is not binary; ").append("incompatible type ").append(dataType).append(" found").toString();
        });
        String sb = new StringBuilder(9).append(str).append(" AS value").toString();
        logger().debug(() -> {
            return new StringBuilder(30).append("Generated select expressions: ").append(sb).toString();
        });
        return functions$.MODULE$.expr(sb);
    }

    private Column convertValueForAvro(Seq<String> seq, TopicModel topicModel, StructType structType, Function1<String, Column> function1, Option<Config> option) {
        Function2 function2;
        Expression expr = functions$.MODULE$.struct((Seq) seq.map(function1, Seq$.MODULE$.canBuildFrom())).expr();
        StructType dataType = expr.dataType();
        String name = topicModel.name();
        String str = "wasp";
        if (topicModel.useAvroSchemaManager()) {
            Schema attachSubjectToSchema = SubjectUtils$.MODULE$.attachSubjectToSchema(topicModel, new Schema.Parser().parse(topicModel.getJsonSchema()), false);
            Config config = (Config) option.get();
            function2 = (expression, dataType2) -> {
                return AvroSerializerExpression$.MODULE$.apply(config, attachSubjectToSchema, name, str, expression, dataType2);
            };
        } else {
            Some some = new Some(topicModel.getJsonSchema());
            function2 = (expression2, dataType3) -> {
                return AvroSerializerExpression$.MODULE$.apply(some, name, str, expression2, dataType3);
            };
        }
        return new Column((AvroSerializerExpression) function2.apply(expr, dataType));
    }

    public Dataset<Row> convertDataframe(Dataset<Row> dataset, Option<String> option, Seq<TopicModel> seq, DatastoreModel datastoreModel, Option<Config> option2) {
        NonEmptyList<TopicModel> nonEmptyList;
        TopicModelUtils$.MODULE$.isTopicWritable(datastoreModel, seq, dataset).fold(str -> {
            throw new IllegalArgumentException(str);
        }, boxedUnit -> {
            $anonfun$convertDataframe$2(boxedUnit);
            return BoxedUnit.UNIT;
        });
        if (Nil$.MODULE$.equals(seq)) {
            nonEmptyList = NonEmptyList$.MODULE$.one((TopicModel) datastoreModel);
        } else {
            if (!(seq instanceof $colon.colon)) {
                throw new MatchError(seq);
            }
            $colon.colon colonVar = ($colon.colon) seq;
            nonEmptyList = new NonEmptyList<>((TopicModel) colonVar.head(), colonVar.tl$access$1());
        }
        NonEmptyList<TopicModel> nonEmptyList2 = nonEmptyList;
        logger().info(() -> {
            return new StringBuilder(27).append("Writing with topic models: ").append(nonEmptyList2.map(topicModel -> {
                return topicModel.name();
            }).mkString(" ")).toString();
        });
        if (datastoreModel instanceof MultiTopicModel) {
            logger().info(() -> {
                return new StringBuilder(48).append("Topic model \"").append(datastoreModel.name()).append("\" is a MultiTopicModel for topics: ").append(seq).toString();
            });
        }
        logger().debug(() -> {
            return new StringBuilder(14).append("Input schema:\n").append(dataset.schema().treeString()).toString();
        });
        return prepareDfToWrite(dataset, option, nonEmptyList2, option2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Column valueOfKey$1(TopicModel topicModel, Function1 function1, Option option) {
        return (Column) topicModel.keyFieldName().map(str -> {
            String str = topicModel.topicDataType();
            if ("avro".equals(str)) {
                return MODULE$.convertKeyForAvro((Column) function1.apply(str), topicModel, option);
            }
            if ("json".equals(str) ? true : "binary".equals(str) ? true : "plaintext".equals(str)) {
                return MODULE$.convertKeyToBinary((Column) function1.apply(str));
            }
            throw new UnsupportedOperationException(new StringBuilder(24).append("Unknown topic data type ").append(str).toString());
        }).getOrElse(() -> {
            return functions$.MODULE$.lit((Object) null).cast(BinaryType$.MODULE$);
        });
    }

    public static final /* synthetic */ boolean $anonfun$keyExpression$3(TopicModel topicModel) {
        return topicModel.keyFieldName().isDefined();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Column valueOfValue$1(TopicModel topicModel, StructType structType, Function1 function1, Option option) {
        Seq<String> seq;
        Some valueFieldsNames = topicModel.valueFieldsNames();
        if (valueFieldsNames instanceof Some) {
            seq = (Seq) valueFieldsNames.value();
        } else {
            if (!None$.MODULE$.equals(valueFieldsNames)) {
                throw new MatchError(valueFieldsNames);
            }
            seq = (Seq) TopicModelUtils$.MODULE$.getAllValueFieldsFromSchema(topicModel).getOrElse(() -> {
                return new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(structType.fieldNames())).toList();
            });
        }
        Seq<String> seq2 = seq;
        String str = topicModel.topicDataType();
        if ("avro".equals(str)) {
            return convertValueForAvro(seq2, topicModel, structType, function1, option);
        }
        if ("json".equals(str)) {
            return convertValueForJson(seq2);
        }
        if ("plaintext".equals(str)) {
            return convertValueForPlainText(seq2, structType);
        }
        if ("binary".equals(str)) {
            return convertValueForBinary(seq2, structType);
        }
        throw new UnsupportedOperationException(new StringBuilder(24).append("Unknown topic data type ").append(str).toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Column valueOfHeader$1(TopicModel topicModel) {
        return (Column) topicModel.headersFieldName().map(str -> {
            return functions$.MODULE$.col(str);
        }).getOrElse(() -> {
            return functions$.MODULE$.lit((Object) null);
        });
    }

    public static final /* synthetic */ boolean $anonfun$headerExpression$3(TopicModel topicModel) {
        return topicModel.headersFieldName().isDefined();
    }

    public static final /* synthetic */ boolean $anonfun$convertValueForPlainText$2(String str, StructField structField) {
        String name = structField.name();
        return name != null ? name.equals(str) : str == null;
    }

    public static final /* synthetic */ boolean $anonfun$convertValueForBinary$2(String str, StructField structField) {
        String name = structField.name();
        return name != null ? name.equals(str) : str == null;
    }

    public static final /* synthetic */ void $anonfun$convertDataframe$2(BoxedUnit boxedUnit) {
    }

    private KafkaWriters$() {
        MODULE$ = this;
        Logging.$init$(this);
        functions$ functions_ = functions$.MODULE$;
        Function1 function1 = str -> {
            throw new Exception(new StringBuilder(19).append("Unknown topic name ").append(str).toString());
        };
        TypeTags.TypeTag Nothing = package$.MODULE$.universe().TypeTag().Nothing();
        TypeTags universe = package$.MODULE$.universe();
        this.unknownTopicExpression = functions_.udf(function1, Nothing, universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator1$2
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe2 = mirror.universe();
                return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe2.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }));
    }
}
