/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.pulsar;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.AnalysisException$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.Literal;
import org.apache.spark.sql.catalyst.expressions.package$;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.pulsar.CachedPulsarClient$;
import org.apache.spark.sql.pulsar.PulsarOptions$;
import org.apache.spark.sql.pulsar.PulsarWriteTask;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.CalendarIntervalType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.ObjectType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampType$;
import org.apache.spark.sql.types.UserDefinedType;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.control.NonFatal$;

public final class PulsarSinks$
implements Logging {
    public static PulsarSinks$ MODULE$;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new PulsarSinks$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public void checkForUnsupportedType(Seq<DataType> valuesDT) {
        valuesDT.map((Function1 & Serializable & scala.Serializable)dt -> {
            PulsarSinks$.$anonfun$checkForUnsupportedType$1(dt);
            return BoxedUnit.UNIT;
        }, Seq$.MODULE$.canBuildFrom());
    }

    public void validateQuery(Seq<Attribute> schema, Option<String> topic) {
        DataType dataType = ((Expression)schema.find((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)PulsarSinks$.$anonfun$validateQuery$1(x$2))).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            Option option = topic;
            if (!(option instanceof Some)) {
                if (None$.MODULE$.equals(option)) {
                    throw new AnalysisException(new StringBuilder(91).append("topic option required when no ").append("'").append(PulsarOptions$.MODULE$.TopicAttributeName()).append("' attribute is present. Use the ").append(PulsarOptions$.MODULE$.TopicSingle()).append(" option for setting a topic.").toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5(), AnalysisException$.MODULE$.$lessinit$greater$default$6(), AnalysisException$.MODULE$.$lessinit$greater$default$7());
                }
                throw new MatchError((Object)option);
            }
            Some some = (Some)option;
            String topicValue = (String)some.value();
            Literal literal = new Literal((Object)UTF8String.fromString((String)topicValue), (DataType)StringType$.MODULE$);
            return literal;
        })).dataType();
        if (!StringType$.MODULE$.equals(dataType)) {
            throw new AnalysisException(new StringBuilder(21).append("Topic type must be a ").append(StringType$.MODULE$.catalogString()).toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5(), AnalysisException$.MODULE$.$lessinit$greater$default$6(), AnalysisException$.MODULE$.$lessinit$greater$default$7());
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        DataType dataType2 = ((Expression)schema.find((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)PulsarSinks$.$anonfun$validateQuery$3(x$3))).getOrElse((Function0 & Serializable & scala.Serializable)() -> new Literal(null, (DataType)StringType$.MODULE$))).dataType();
        boolean bl = StringType$.MODULE$.equals(dataType2) ? true : BinaryType$.MODULE$.equals(dataType2);
        if (!bl) {
            throw new AnalysisException(new StringBuilder(30).append(PulsarOptions$.MODULE$.KeyAttributeName()).append(" attribute type ").append("must be a ").append(StringType$.MODULE$.catalogString()).append(" or ").append(BinaryType$.MODULE$.catalogString()).toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5(), AnalysisException$.MODULE$.$lessinit$greater$default$6(), AnalysisException$.MODULE$.$lessinit$greater$default$7());
        }
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        DataType dataType3 = ((Expression)schema.find((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)PulsarSinks$.$anonfun$validateQuery$5(x$4))).getOrElse((Function0 & Serializable & scala.Serializable)() -> new Literal(null, (DataType)LongType$.MODULE$))).dataType();
        boolean bl2 = LongType$.MODULE$.equals(dataType3) ? true : TimestampType$.MODULE$.equals(dataType3);
        if (!bl2) {
            throw new AnalysisException(new StringBuilder(30).append(PulsarOptions$.MODULE$.EventTimeName()).append(" attribute type ").append("must be a ").append(LongType$.MODULE$.catalogString()).append(" or ").append(TimestampType$.MODULE$.catalogString()).toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5(), AnalysisException$.MODULE$.$lessinit$greater$default$6(), AnalysisException$.MODULE$.$lessinit$greater$default$7());
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        schema.find((Function1 & Serializable & scala.Serializable)a -> BoxesRunTime.boxToBoolean((boolean)PulsarSinks$.$anonfun$validateQuery$7(a))).map((Function1 & Serializable & scala.Serializable)a -> {
            PulsarSinks$.MODULE$.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(196).append(a.name()).append(" attribute exists in schema,").append("it's reserved by Pulsar Source and generated automatically by pulsar for each record.").append("Choose another name if you want to keep this field or it will be ignored by pulsar.").toString());
            return BoxedUnit.UNIT;
        });
        Seq valuesExpression = (Seq)schema.filter((Function1 & Serializable & scala.Serializable)n -> BoxesRunTime.boxToBoolean((boolean)PulsarSinks$.$anonfun$validateQuery$10(n)));
        if (valuesExpression.length() == 0) {
            throw new AnalysisException("Schema should have at least one non-key/non-topic field", AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5(), AnalysisException$.MODULE$.$lessinit$greater$default$6(), AnalysisException$.MODULE$.$lessinit$greater$default$7());
        }
        this.checkForUnsupportedType((Seq<DataType>)((Seq)valuesExpression.map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.dataType(), Seq$.MODULE$.canBuildFrom())));
    }

    public void write(SparkSession sparkSession, QueryExecution queryExecution, Map<String, Object> pulsarClientConf, Map<String, Object> pulsarProducerConf, Option<String> topic) {
        Seq schema = queryExecution.analyzed().output();
        this.validateQuery((Seq<Attribute>)schema, topic);
        queryExecution.toRdd().foreachPartition((Function1 & Serializable & scala.Serializable)iter -> {
            PulsarSinks$.$anonfun$write$1(pulsarClientConf, pulsarProducerConf, topic, schema, iter);
            return BoxedUnit.UNIT;
        });
    }

    public <T> Producer<T> createProducer(Map<String, Object> clientConf, Map<String, Object> producerConf, String topic, Schema<T> schema) {
        Producer<T> producer;
        try {
            producer = CachedPulsarClient$.MODULE$.getOrCreate(clientConf).newProducer(schema).topic(topic).loadConf(producerConf).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxMessages(0x500000).create();
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            if (throwable2 instanceof PulsarClientException.IncompatibleSchemaException) {
                PulsarClientException.IncompatibleSchemaException incompatibleSchemaException = (PulsarClientException.IncompatibleSchemaException)throwable2;
                String x$1 = new StringBuilder(51).append("Cannot write incompatible data to topic ").append(topic).append(". ").append("Details: ").append(incompatibleSchemaException.getMessage()).toString();
                Some x$2 = new Some((Object)incompatibleSchemaException);
                Option x$3 = AnalysisException$.MODULE$.$lessinit$greater$default$2();
                Option x$4 = AnalysisException$.MODULE$.$lessinit$greater$default$3();
                Option x$5 = AnalysisException$.MODULE$.$lessinit$greater$default$4();
                Option x$6 = AnalysisException$.MODULE$.$lessinit$greater$default$6();
                String[] x$7 = AnalysisException$.MODULE$.$lessinit$greater$default$7();
                throw new AnalysisException(x$1, x$3, x$4, x$5, (Option)x$2, x$6, x$7);
            }
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (!option.isEmpty()) {
                Throwable e = (Throwable)option.get();
                String x$8 = new StringBuilder(51).append("Cannot create pulsar producer for topic ").append(topic).append(". ").append("Details: ").append(e.getMessage()).toString();
                Some x$9 = new Some((Object)e);
                Option x$10 = AnalysisException$.MODULE$.$lessinit$greater$default$2();
                Option x$11 = AnalysisException$.MODULE$.$lessinit$greater$default$3();
                Option x$12 = AnalysisException$.MODULE$.$lessinit$greater$default$4();
                Option x$13 = AnalysisException$.MODULE$.$lessinit$greater$default$6();
                String[] x$14 = AnalysisException$.MODULE$.$lessinit$greater$default$7();
                throw new AnalysisException(x$8, x$10, x$11, x$12, (Option)x$9, x$13, x$14);
            }
            throw throwable;
        }
        return producer;
    }

    public StructType toStructType(Seq<Attribute> attrs) {
        return package$.MODULE$.AttributeSeq(attrs).toStructType();
    }

    public static final /* synthetic */ void $anonfun$checkForUnsupportedType$1(DataType dt) {
        DataType dataType = dt;
        if (CalendarIntervalType$.MODULE$.equals(dataType)) {
            throw new AnalysisException("CalendarIntervalType not supported by pulsar sink yet", AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5(), AnalysisException$.MODULE$.$lessinit$greater$default$6(), AnalysisException$.MODULE$.$lessinit$greater$default$7());
        }
        if (dataType instanceof UserDefinedType) {
            UserDefinedType userDefinedType = (UserDefinedType)dataType;
            throw new AnalysisException(new StringBuilder(33).append(userDefinedType).append(" not supported by pulsar sink yet").toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5(), AnalysisException$.MODULE$.$lessinit$greater$default$6(), AnalysisException$.MODULE$.$lessinit$greater$default$7());
        }
        if (dataType instanceof ObjectType) {
            ObjectType objectType = (ObjectType)dataType;
            throw new AnalysisException(new StringBuilder(33).append(objectType).append(" not supported by pulsar sink yet").toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5(), AnalysisException$.MODULE$.$lessinit$greater$default$6(), AnalysisException$.MODULE$.$lessinit$greater$default$7());
        }
        if (dataType instanceof StructType) {
            StructType structType = (StructType)dataType;
            MODULE$.checkForUnsupportedType((Seq<DataType>)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])structType.fields())).map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.dataType(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(DataType.class))))).toSeq());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ boolean $anonfun$validateQuery$1(Attribute x$2) {
        String string = x$2.name();
        String string2 = PulsarOptions$.MODULE$.TopicAttributeName();
        return !(string != null ? !string.equals(string2) : string2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$validateQuery$3(Attribute x$3) {
        String string = x$3.name();
        String string2 = PulsarOptions$.MODULE$.KeyAttributeName();
        return !(string != null ? !string.equals(string2) : string2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$validateQuery$5(Attribute x$4) {
        String string = x$4.name();
        String string2 = PulsarOptions$.MODULE$.EventTimeName();
        return !(string != null ? !string.equals(string2) : string2 != null);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static final /* synthetic */ boolean $anonfun$validateQuery$7(Attribute a) {
        String string = a.name();
        String string2 = PulsarOptions$.MODULE$.MessageIdName();
        if (string == null) {
            if (string2 == null) return true;
        } else if (string.equals(string2)) return true;
        String string3 = a.name();
        String string4 = PulsarOptions$.MODULE$.PublishTimeName();
        if (string3 != null) {
            if (!string3.equals(string4)) return false;
            return true;
        }
        if (string4 == null) return true;
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$validateQuery$10(Attribute n) {
        return !PulsarOptions$.MODULE$.MetaFieldNames().contains((Object)n.name());
    }

    public static final /* synthetic */ void $anonfun$write$1(Map pulsarClientConf$1, Map pulsarProducerConf$1, Option topic$2, Seq schema$1, Iterator iter) {
        PulsarWriteTask writeTask = new PulsarWriteTask(pulsarClientConf$1, (Map<String, Object>)pulsarProducerConf$1, (Option<String>)topic$2, (Seq<Attribute>)schema$1);
        Utils$.MODULE$.tryWithSafeFinally((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> writeTask.execute((Iterator<InternalRow>)iter), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> writeTask.close());
    }

    private PulsarSinks$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
    }
}

