package org.apache.streampark.spark.connector.kafka.source;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.apache.streampark.spark.connector.kafka.offset.KafkaClient;
import org.apache.streampark.spark.connector.source.Source;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.util.Try$;

/* compiled from: KafkaSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]g\u0001\u0002\r\u001a\u0001!B\u0001\"\u000e\u0001\u0003\u0006\u0004%\tA\u000e\u0005\t}\u0001\u0011\t\u0011)A\u0005o!A1\t\u0001B\u0001B\u0003%A\t\u0003\u0005S\u0001\t\r\t\u0015a\u0003T\u0011!!\u0007AaA!\u0002\u0017)\u0007\"B5\u0001\t\u0003Q\u0007b\u0002:\u0001\u0005\u0004%\te\u001d\u0005\u0007i\u0002\u0001\u000b\u0011B(\t\u0011U\u0004\u0001R1A\u0005\u0002YD\u0001B\u001f\u0001\t\u0006\u0004%Ia\u001f\u0005\n\u007f\u0002A)\u0019!C\u0005\u0003\u0003A!\"a\u0001\u0001\u0011\u000b\u0007I\u0011AA\u0003\u000b\u0019\ti\u0001\u0001\u0011\u0002\u0010!I\u0011\u0011\u0005\u0001C\u0002\u0013\u0005\u00111\u0005\u0005\t\u0003c\u0001\u0001\u0015!\u0003\u0002&!Q\u00111\u0007\u0001\t\u0006\u0004%I!!\u000e\t\u000f\u0005u\u0003\u0001\"\u0011\u0002`!9\u0011q\u0011\u0001\u0005\u0002\u0005%u!CAN3\u0005\u0005\t\u0012AAO\r!A\u0012$!A\t\u0002\u0005}\u0005BB5\u0015\t\u0003\t9\u000bC\u0005\u0002*R\t\n\u0011\"\u0001\u0002,\"I\u0011q\u0019\u000b\u0002\u0002\u0013%\u0011\u0011\u001a\u0002\f\u0017\u000647.Y*pkJ\u001cWM\u0003\u0002\u001b7\u000511o\\;sG\u0016T!\u0001H\u000f\u0002\u000b-\fgm[1\u000b\u0005yy\u0012!C2p]:,7\r^8s\u0015\t\u0001\u0013%A\u0003ta\u0006\u00148N\u0003\u0002#G\u0005Q1\u000f\u001e:fC6\u0004\u0018M]6\u000b\u0005\u0011*\u0013AB1qC\u000eDWMC\u0001'\u0003\ry'oZ\u0002\u0001+\rI3lZ\n\u0004\u0001)\u0002\u0004CA\u0016/\u001b\u0005a#\"A\u0017\u0002\u000bM\u001c\u0017\r\\1\n\u0005=b#AB!osJ+g\r\u0005\u00022g5\t!G\u0003\u0002\u001b;%\u0011AG\r\u0002\u0007'>,(oY3\u0002\u0007M\u001c8-F\u00018!\tAD(D\u0001:\u0015\tQ4(A\u0005tiJ,\u0017-\\5oO*\u0011\u0001eI\u0005\u0003{e\u0012\u0001c\u0015;sK\u0006l\u0017N\\4D_:$X\r\u001f;\u0002\tM\u001c8\r\t\u0015\u0003\u0005\u0001\u0003\"aK!\n\u0005\tc#!\u0003;sC:\u001c\u0018.\u001a8u\u00039yg/\u001a:sS\u0012,\u0007+\u0019:b[N\u0004B!\u0012'P\u001f:\u0011aI\u0013\t\u0003\u000f2j\u0011\u0001\u0013\u0006\u0003\u0013\u001e\na\u0001\u0010:p_Rt\u0014BA&-\u0003\u0019\u0001&/\u001a3fM&\u0011QJ\u0014\u0002\u0004\u001b\u0006\u0004(BA&-!\t)\u0005+\u0003\u0002R\u001d\n11\u000b\u001e:j]\u001e\f!\"\u001a<jI\u0016t7-\u001a\u00132!\r!v+W\u0007\u0002+*\u0011a\u000bL\u0001\be\u00164G.Z2u\u0013\tAVK\u0001\u0005DY\u0006\u001c8\u000fV1h!\tQ6\f\u0004\u0001\u0005\u000bq\u0003!\u0019A/\u0003\u0003-\u000b\"AX1\u0011\u0005-z\u0016B\u00011-\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"a\u000b2\n\u0005\rd#aA!os\u0006QQM^5eK:\u001cW\r\n\u001a\u0011\u0007Q;f\r\u0005\u0002[O\u0012)\u0001\u000e\u0001b\u0001;\n\ta+\u0001\u0004=S:LGO\u0010\u000b\u0004WB\fHc\u00017o_B!Q\u000eA-g\u001b\u0005I\u0002\"\u0002*\u0007\u0001\b\u0019\u0006\"\u00023\u0007\u0001\b)\u0007\"B\u001b\u0007\u0001\u00049\u0004bB\"\u0007!\u0003\u0005\r\u0001R\u0001\u0007aJ,g-\u001b=\u0016\u0003=\u000bq\u0001\u001d:fM&D\b%A\u0006sKB\f'\u000f^5uS>tW#A<\u0011\u0005-B\u0018BA=-\u0005\rIe\u000e^\u0001\ti>\u0004\u0018nY*fiV\tA\u0010E\u0002F{>K!A (\u0003\u0007M+G/A\u0006lC\u001a\\\u0017\rU1sC6\u001cX#\u0001#\u0002\u000f\u001d\u0014x.\u001e9JIV\u0011\u0011q\u0001\t\u0005W\u0005%q*C\u0002\u0002\f1\u0012aa\u00149uS>t'AC*pkJ\u001cW\rV=qKB1\u0011\u0011CA\u000f3\u001al!!a\u0005\u000b\t\u0005U\u0011qC\u0001\tG>t7/^7fe*!\u0011\u0011DA\u000e\u0003\u001d\u0019G.[3oiNT!\u0001H\u0012\n\t\u0005}\u00111\u0003\u0002\u000f\u0007>t7/^7feJ+7m\u001c:e\u0003-Y\u0017MZ6b\u00072LWM\u001c;\u0016\u0005\u0005\u0015\u0002\u0003BA\u0014\u0003[i!!!\u000b\u000b\u0007\u0005-2$\u0001\u0004pM\u001a\u001cX\r^\u0005\u0005\u0003_\tICA\u0006LC\u001a\\\u0017m\u00117jK:$\u0018\u0001D6bM.\f7\t\\5f]R\u0004\u0013\u0001D8gMN,GOU1oO\u0016\u001cXCAA\u001c!!\tI$a\u0011\u0002F\u0005-SBAA\u001e\u0015\u0011\ti$a\u0010\u0002\tU$\u0018\u000e\u001c\u0006\u0003\u0003\u0003\nAA[1wC&\u0019Q*a\u000f\u0011\u0007-\n9%C\u0002\u0002J1\u0012A\u0001T8oOB)1&!\u0014\u0002R%\u0019\u0011q\n\u0017\u0003\u000b\u0005\u0013(/Y=\u0011\t\u0005M\u0013\u0011L\u0007\u0003\u0003+R1!a\u0016:\u0003!Y\u0017MZ6baE\u0002\u0014\u0002BA.\u0003+\u00121b\u00144gg\u0016$(+\u00198hK\u0006Qq-\u001a;E'R\u0014X-Y7\u0016\t\u0005\u0005\u00141\u000f\u000b\u0005\u0003G\ni\b\u0006\u0003\u0002f\u0005]\u0004CBA4\u0003[\n\t(\u0004\u0002\u0002j)\u0019\u00111N\u001d\u0002\u000f\u0011\u001cHO]3b[&!\u0011qNA5\u0005\u001d!5\u000b\u001e:fC6\u00042AWA:\t\u0019\t)(\u0005b\u0001;\n\t!\u000bC\u0005\u0002zE\t\t\u0011q\u0001\u0002|\u0005QQM^5eK:\u001cW\rJ\u001a\u0011\tQ;\u0016\u0011\u000f\u0005\b\u0003\u007f\n\u0002\u0019AAA\u00035\u0011XmY8sI\"\u000bg\u000e\u001a7feB91&a!\u0002\u0010\u0005E\u0014bAACY\tIa)\u001e8di&|g.M\u0001\rkB$\u0017\r^3PM\u001a\u001cX\r\u001e\u000b\u0005\u0003\u0017\u000b\t\nE\u0002,\u0003\u001bK1!a$-\u0005\u0011)f.\u001b;\t\u000f\u0005M%\u00031\u0001\u0002\u0016\u0006!A/[7f!\rA\u0014qS\u0005\u0004\u00033K$\u0001\u0002+j[\u0016\f1bS1gW\u0006\u001cv.\u001e:dKB\u0011Q\u000eF\n\u0005))\n\t\u000bE\u0002,\u0003GK1!!*-\u00051\u0019VM]5bY&T\u0018M\u00197f)\t\ti*A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$HEM\u000b\u0007\u0003[\u000b\u0019-!2\u0016\u0005\u0005=&f\u0001#\u00022.\u0012\u00111\u0017\t\u0005\u0003k\u000by,\u0004\u0002\u00028*!\u0011\u0011XA^\u0003%)hn\u00195fG.,GMC\u0002\u0002>2\n!\"\u00198o_R\fG/[8o\u0013\u0011\t\t-a.\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\rB\u0003]-\t\u0007Q\fB\u0003i-\t\u0007Q,A\u0006sK\u0006$'+Z:pYZ,GCAAf!\u0011\ti-a5\u000e\u0005\u0005='\u0002BAi\u0003\u007f\tA\u0001\\1oO&!\u0011Q[Ah\u0005\u0019y%M[3di\u0002")
/* loaded from: input_file:org/apache/streampark/spark/connector/kafka/source/KafkaSource.class */
public class KafkaSource<K, V> implements Source {
    private int repartition;
    private Set<String> topicSet;
    private Map<String, String> kafkaParams;
    private Option<String> groupId;
    private java.util.Map<Object, OffsetRange[]> offsetRanges;
    private final transient StreamingContext ssc;
    private final Map<String, String> overrideParams;
    private final ClassTag<K> evidence$1;
    private final ClassTag<V> evidence$2;
    private final String prefix;
    private final KafkaClient kafkaClient;
    private SparkConf sparkConf;
    private Map<String, String> param;
    private transient Logger org$apache$streampark$common$util$Logger$$_logger;
    private final String org$apache$streampark$common$util$Logger$$prefix;
    private volatile byte bitmap$0;

    public String logName() {
        return org.apache.streampark.common.util.Logger.logName$(this);
    }

    public Logger logger() {
        return org.apache.streampark.common.util.Logger.logger$(this);
    }

    public void logInfo(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logInfo$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logDebug$(this, function0);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logTrace$(this, function0);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logTrace$(this, function0, th);
    }

    public void logWarn(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logWarn$(this, function0);
    }

    public void logWarn(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logWarn$(this, function0, th);
    }

    public void logError(Function0<String> function0) {
        org.apache.streampark.common.util.Logger.logError$(this, function0);
    }

    public void logError(Function0<String> function0, Throwable th) {
        org.apache.streampark.common.util.Logger.logError$(this, function0, th);
    }

    public void initializeLogIfNecessary(boolean z) {
        org.apache.streampark.common.util.Logger.initializeLogIfNecessary$(this, z);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.source.KafkaSource] */
    private SparkConf sparkConf$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 32)) == 0) {
                this.sparkConf = Source.sparkConf$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 32);
            }
        }
        return this.sparkConf;
    }

    public SparkConf sparkConf() {
        return ((byte) (this.bitmap$0 & 32)) == 0 ? sparkConf$lzycompute() : this.sparkConf;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.source.KafkaSource] */
    private Map<String, String> param$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 64)) == 0) {
                this.param = Source.param$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 64);
            }
        }
        return this.param;
    }

    public Map<String, String> param() {
        return ((byte) (this.bitmap$0 & 64)) == 0 ? param$lzycompute() : this.param;
    }

    public Logger org$apache$streampark$common$util$Logger$$_logger() {
        return this.org$apache$streampark$common$util$Logger$$_logger;
    }

    public void org$apache$streampark$common$util$Logger$$_logger_$eq(Logger logger) {
        this.org$apache$streampark$common$util$Logger$$_logger = logger;
    }

    public String org$apache$streampark$common$util$Logger$$prefix() {
        return this.org$apache$streampark$common$util$Logger$$prefix;
    }

    public final void org$apache$streampark$common$util$Logger$_setter_$org$apache$streampark$common$util$Logger$$prefix_$eq(String str) {
        this.org$apache$streampark$common$util$Logger$$prefix = str;
    }

    public StreamingContext ssc() {
        return this.ssc;
    }

    public String prefix() {
        return this.prefix;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.source.KafkaSource] */
    private int repartition$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.repartition = new StringOps(Predef$.MODULE$.augmentString(sparkConf().get("spark.source.kafka.consume.repartition", "0"))).toInt();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.repartition;
    }

    public int repartition() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? repartition$lzycompute() : this.repartition;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.source.KafkaSource] */
    private Set<String> topicSet$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.topicSet = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(((String) this.overrideParams.getOrElse("consume.topics", () -> {
                    return this.sparkConf().get("spark.source.kafka.consume.topics");
                })).split(","))).map(str -> {
                    return str.trim();
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).toSet();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.topicSet;
    }

    private Set<String> topicSet() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? topicSet$lzycompute() : this.topicSet;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.source.KafkaSource] */
    private Map<String, String> kafkaParams$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 4)) == 0) {
                this.kafkaParams = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(sparkConf().getAll())).flatMap(tuple2 -> {
                    Iterable option2Iterable;
                    if (tuple2 != null) {
                        String str = (String) tuple2._1();
                        String str2 = (String) tuple2._2();
                        if (str.startsWith(this.prefix()) && BoxesRunTime.unboxToBoolean(Try$.MODULE$.apply(() -> {
                            return new StringOps(Predef$.MODULE$.augmentString(str2)).nonEmpty();
                        }).getOrElse(() -> {
                            return false;
                        }))) {
                            option2Iterable = Option$.MODULE$.option2Iterable(new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str.substring(this.prefix().length())), str2)));
                            return option2Iterable;
                        }
                    }
                    option2Iterable = Option$.MODULE$.option2Iterable(None$.MODULE$);
                    return option2Iterable;
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toMap(Predef$.MODULE$.$conforms()).$plus$plus(this.overrideParams).$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("enable.auto.commit"), "false")})));
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 4);
            }
        }
        return this.kafkaParams;
    }

    private Map<String, String> kafkaParams() {
        return ((byte) (this.bitmap$0 & 4)) == 0 ? kafkaParams$lzycompute() : this.kafkaParams;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.source.KafkaSource] */
    private Option<String> groupId$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 8)) == 0) {
                this.groupId = kafkaParams().get("group.id");
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 8);
            }
        }
        return this.groupId;
    }

    public Option<String> groupId() {
        return ((byte) (this.bitmap$0 & 8)) == 0 ? groupId$lzycompute() : this.groupId;
    }

    public KafkaClient kafkaClient() {
        return this.kafkaClient;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.streampark.spark.connector.kafka.source.KafkaSource] */
    private java.util.Map<Object, OffsetRange[]> offsetRanges$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 16)) == 0) {
                this.offsetRanges = new ConcurrentHashMap();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 16);
            }
        }
        return this.offsetRanges;
    }

    private java.util.Map<Object, OffsetRange[]> offsetRanges() {
        return ((byte) (this.bitmap$0 & 16)) == 0 ? offsetRanges$lzycompute() : this.offsetRanges;
    }

    public <R> DStream<R> getDStream(Function1<ConsumerRecord<K, V>, R> function1, ClassTag<R> classTag) {
        return kafkaClient().createDirectStream(ssc(), kafkaParams(), topicSet(), this.evidence$1, this.evidence$2).transform((rdd, time) -> {
            this.offsetRanges().put(BoxesRunTime.boxToLong(time.milliseconds()), ((HasOffsetRanges) rdd).offsetRanges());
            return rdd;
        }, ClassTag$.MODULE$.apply(ConsumerRecord.class)).map(function1, classTag);
    }

    public void updateOffset(Time time) {
        long milliseconds = time.milliseconds();
        if (groupId().isDefined()) {
            logInfo(() -> {
                return new StringBuilder(43).append("updateOffset with ").append(this.kafkaClient().offsetStoreType()).append(" for time ").append(milliseconds).append(" offsetRanges: ").append(this.offsetRanges()).toString();
            });
            kafkaClient().updateOffset((String) groupId().get(), offsetRanges().get(BoxesRunTime.boxToLong(milliseconds)));
        }
        offsetRanges().remove(BoxesRunTime.boxToLong(milliseconds));
    }

    public KafkaSource(StreamingContext streamingContext, Map<String, String> map, ClassTag<K> classTag, ClassTag<V> classTag2) {
        this.ssc = streamingContext;
        this.overrideParams = map;
        this.evidence$1 = classTag;
        this.evidence$2 = classTag2;
        org.apache.streampark.common.util.Logger.$init$(this);
        Source.$init$(this);
        this.prefix = "spark.source.kafka.consume.";
        this.kafkaClient = new KafkaClient(streamingContext.sparkContext().getConf());
    }
}
