package org.apache.spark.sql.eventhubs;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.eventhubs.EventHubsConf;
import org.apache.spark.eventhubs.EventHubsConf$;
import org.apache.spark.eventhubs.NameAndPartition;
import org.apache.spark.eventhubs.client.Client;
import org.apache.spark.eventhubs.package$PartitionPreferredLocationStrategy$;
import org.apache.spark.eventhubs.rdd.EventHubsRDD;
import org.apache.spark.eventhubs.rdd.OffsetRange;
import org.apache.spark.eventhubs.rdd.OffsetRange$;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.SerializedOffset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Enumeration;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple5;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.StringOps$;
import scala.collection.mutable.ArrayOps;
import scala.math.Numeric$LongIsIntegral$;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: EventHubsSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\t%a!\u0002\u0014(\u0001-\n\u0004\u0002\u0003%\u0001\u0005\u0003\u0005\u000b\u0011\u0002&\t\u00119\u0003!\u0011!Q\u0001\n=C\u0001b\u0018\u0001\u0003\u0002\u0003\u0006I\u0001\u0018\u0005\u0007A\u0002!\taJ1\t\u0011\u001d\u0004\u0001R1A\u0005\n!D\u0001\u0002\u001d\u0001\t\u0006\u0004%I!\u001d\u0005\bm\u0002\u0011\r\u0011\"\u0003x\u0011\u0019a\b\u0001)A\u0005q\"9Q\u0010\u0001b\u0001\n\u0013q\bBB@\u0001A\u0003%A\fC\u0005\u0002\u0002\u0001\u0011\r\u0011\"\u0003\u0002\u0004!A\u0011Q\u0002\u0001!\u0002\u0013\t)\u0001C\u0005\u0002\u0010\u0001\u0011\r\u0011\"\u0003\u0002\u0012!A\u0011q\u0004\u0001!\u0002\u0013\t\u0019\u0002\u0003\u0006\u0002\"\u0001A)\u0019!C\u0005\u0003GA\u0011\"a\u0013\u0001\u0001\u0004%I!!\u0014\t\u0013\u0005E\u0003\u00011A\u0005\n\u0005M\u0003\u0002CA0\u0001\u0001\u0006K!a\u0014\t\u0013\u0005\u0005\u0004\u00011A\u0005\n\u00055\u0003\"CA2\u0001\u0001\u0007I\u0011BA3\u0011!\tI\u0007\u0001Q!\n\u0005=\u0003bBA6\u0001\u0011\u0005\u0013Q\u000e\u0005\b\u0003w\u0002A\u0011IA?\u0011\u001d\t9\t\u0001C\u0005\u0003\u0013Cq!a$\u0001\t\u0013\t\t\nC\u0004\u0002\"\u0002!\t%a)\t\u000f\u0005u\u0006\u0001\"\u0011\u0002@\"9\u0011\u0011\u0019\u0001\u0005\n\u0005\rw\u0001CAeO!\u0005q%a3\u0007\u000f\u0019:\u0003\u0012A\u0014\u0002N\"1\u0001M\bC\u0001\u0003+D\u0001\"a6\u001f\u0005\u0004%\tA \u0005\b\u00033t\u0002\u0015!\u0003]\u0011%\tYN\bb\u0001\n\u00039\u0013\u000fC\u0004\u0002^z\u0001\u000b\u0011\u0002:\t\u000f\u0005}g\u0004\"\u0001\u0002b\"9\u00111\u001e\u0010\u0005\n\u00055(aD#wK:$\b*\u001e2t'>,(oY3\u000b\u0005!J\u0013!C3wK:$\b.\u001e2t\u0015\tQ3&A\u0002tc2T!\u0001L\u0017\u0002\u000bM\u0004\u0018M]6\u000b\u00059z\u0013AB1qC\u000eDWMC\u00011\u0003\ry'oZ\n\u0005\u0001IR$\t\u0005\u00024q5\tAG\u0003\u00026m\u0005!A.\u00198h\u0015\u00059\u0014\u0001\u00026bm\u0006L!!\u000f\u001b\u0003\r=\u0013'.Z2u!\tY\u0004)D\u0001=\u0015\tid(A\u0005tiJ,\u0017-\\5oO*\u0011q(K\u0001\nKb,7-\u001e;j_:L!!\u0011\u001f\u0003\rM{WO]2f!\t\u0019e)D\u0001E\u0015\t)5&\u0001\u0005j]R,'O\\1m\u0013\t9EIA\u0004M_\u001e<\u0017N\\4\u0002\u0015M\fHnQ8oi\u0016DHo\u0001\u0001\u0011\u0005-cU\"A\u0015\n\u00055K#AC*R\u0019\u000e{g\u000e^3yi\u0006Q\u0001/\u0019:b[\u0016$XM]:\u0011\tAKF\f\u0018\b\u0003#^\u0003\"AU+\u000e\u0003MS!\u0001V%\u0002\rq\u0012xn\u001c;?\u0015\u00051\u0016!B:dC2\f\u0017B\u0001-V\u0003\u0019\u0001&/\u001a3fM&\u0011!l\u0017\u0002\u0004\u001b\u0006\u0004(B\u0001-V!\t\u0001V,\u0003\u0002_7\n11\u000b\u001e:j]\u001e\fA\"\\3uC\u0012\fG/\u0019)bi\"\fa\u0001P5oSRtD\u0003\u00022eK\u001a\u0004\"a\u0019\u0001\u000e\u0003\u001dBQ\u0001\u0013\u0003A\u0002)CQA\u0014\u0003A\u0002=CQa\u0018\u0003A\u0002q\u000b\u0001\"\u001a5DY&,g\u000e^\u000b\u0002SB\u0011!N\\\u0007\u0002W*\u0011A.\\\u0001\u0007G2LWM\u001c;\u000b\u0005!Z\u0013BA8l\u0005\u0019\u0019E.[3oi\u0006q\u0001/\u0019:uSRLwN\\\"pk:$X#\u0001:\u0011\u0005M$X\"A+\n\u0005U,&aA%oi\u00061Q\r[\"p]\u001a,\u0012\u0001\u001f\t\u0003sjl\u0011!\\\u0005\u0003w6\u0014Q\"\u0012<f]RDUOY:D_:4\u0017aB3i\u0007>tg\rI\u0001\u0007K\"t\u0015-\\3\u0016\u0003q\u000bq!\u001a5OC6,\u0007%\u0001\u0002tGV\u0011\u0011Q\u0001\t\u0005\u0003\u000f\tI!D\u0001,\u0013\r\tYa\u000b\u0002\r'B\f'o[\"p]R,\u0007\u0010^\u0001\u0004g\u000e\u0004\u0013\u0001F7bq>3gm]3ugB+'\u000f\u0016:jO\u001e,'/\u0006\u0002\u0002\u0014A)1/!\u0006\u0002\u001a%\u0019\u0011qC+\u0003\r=\u0003H/[8o!\r\u0019\u00181D\u0005\u0004\u0003;)&\u0001\u0002'p]\u001e\fQ#\\1y\u001f\u001a47/\u001a;t!\u0016\u0014HK]5hO\u0016\u0014\b%\u0001\fj]&$\u0018.\u00197QCJ$\u0018\u000e^5p]N+\u0017OT8t+\t\t)\u0003\u0005\u0004Q3\u0006\u001d\u0012Q\u0006\t\u0004s\u0006%\u0012bAA\u0016[\n\u0001b*Y7f\u0003:$\u0007+\u0019:uSRLwN\u001c\t\u0005\u0003_\t)E\u0004\u0003\u00022\u0005\u0005c\u0002BA\u001a\u0003\u007fqA!!\u000e\u0002>9!\u0011qGA\u001e\u001d\r\u0011\u0016\u0011H\u0005\u0002a%\u0011afL\u0005\u0003Y5J!\u0001K\u0016\n\u0007\u0005\rS.A\u0004qC\u000e\\\u0017mZ3\n\t\u0005\u001d\u0013\u0011\n\u0002\u000f'\u0016\fX/\u001a8dK:+XNY3s\u0015\r\t\u0019%\\\u0001\u000eGV\u0014(/\u001a8u'\u0016\fhj\\:\u0016\u0005\u0005=\u0003#B:\u0002\u0016\u0005\u0015\u0012!E2veJ,g\u000e^*fc:{7o\u0018\u0013fcR!\u0011QKA.!\r\u0019\u0018qK\u0005\u0004\u00033*&\u0001B+oSRD\u0011\"!\u0018\u0012\u0003\u0003\u0005\r!a\u0014\u0002\u0007a$\u0013'\u0001\bdkJ\u0014XM\u001c;TKFtun\u001d\u0011\u0002\u001d\u0015\f'\u000f\\5fgR\u001cV-\u001d(pg\u0006\u0011R-\u0019:mS\u0016\u001cHoU3r\u001d>\u001cx\fJ3r)\u0011\t)&a\u001a\t\u0013\u0005uC#!AA\u0002\u0005=\u0013aD3be2LWm\u001d;TKFtun\u001d\u0011\u0002\rM\u001c\u0007.Z7b+\t\ty\u0007\u0005\u0003\u0002r\u0005]TBAA:\u0015\r\t)(K\u0001\u0006if\u0004Xm]\u0005\u0005\u0003s\n\u0019H\u0001\u0006TiJ,8\r\u001e+za\u0016\f\u0011bZ3u\u001f\u001a47/\u001a;\u0016\u0005\u0005}\u0004#B:\u0002\u0016\u0005\u0005\u0005cA\u001e\u0002\u0004&\u0019\u0011Q\u0011\u001f\u0003\r=3gm]3u\u0003Q\tGM[;tiN#\u0018M\u001d;j]\u001e|eMZ:fiR!\u0011QEAF\u0011\u001d\ti\t\u0007a\u0001\u0003K\tAA\u001a:p[\u0006I!/\u0019;f\u0019&l\u0017\u000e\u001e\u000b\u000b\u0003K\t\u0019*a&\u0002\u001a\u0006u\u0005bBAK3\u0001\u0007\u0011\u0011D\u0001\u0006Y&l\u0017\u000e\u001e\u0005\b\u0003\u001bK\u0002\u0019AA\u0013\u0011\u001d\tY*\u0007a\u0001\u0003K\tQ!\u001e8uS2Dq!a(\u001a\u0001\u0004\t)#A\u0004ge>lg*Z<\u0002\u0011\u001d,GOQ1uG\"$b!!*\u00026\u0006e\u0006\u0003BAT\u0003_sA!!+\u0002.:!\u00111GAV\u0013\tQ3&C\u0002\u0002D%JA!!-\u00024\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0004\u0003\u0007J\u0003bBA\\5\u0001\u0007\u0011qP\u0001\u0006gR\f'\u000f\u001e\u0005\b\u0003wS\u0002\u0019AAA\u0003\r)g\u000eZ\u0001\u0005gR|\u0007\u000f\u0006\u0002\u0002V\u0005q!/\u001a9peR$\u0015\r^1M_N\u001cH\u0003BA+\u0003\u000bDa!a2\u001d\u0001\u0004a\u0016aB7fgN\fw-Z\u0001\u0010\u000bZ,g\u000e\u001e%vEN\u001cv.\u001e:dKB\u00111MH\n\u0004=\u0005=\u0007cA:\u0002R&\u0019\u00111[+\u0003\r\u0005s\u0017PU3g)\t\tY-\u0001\u0011J]N$(/^2uS>t7OR8s!>$XM\u001c;jC2$\u0015\r^1M_N\u001c\u0018!I%ogR\u0014Xo\u0019;j_:\u001chi\u001c:Q_R,g\u000e^5bY\u0012\u000bG/\u0019'pgN\u0004\u0013a\u0002,F%NKuJT\u0001\t-\u0016\u00136+S(OA\u0005)r-\u001a;T_J$X\rZ#yK\u000e,Ho\u001c:MSN$H\u0003BAr\u0003S\u0004Ba]As9&\u0019\u0011q]+\u0003\u000b\u0005\u0013(/Y=\t\u000f\u0005\u0005A\u00051\u0001\u0002\u0006\u000591m\\7qCJ,GCBAx\u0003k\u0014)\u0001E\u0002t\u0003cL1!a=V\u0005\u001d\u0011un\u001c7fC:Dq!a>&\u0001\u0004\tI0A\u0001b!\u0011\tYP!\u0001\u000e\u0005\u0005u(bAA��W\u0005I1o\u00195fIVdWM]\u0005\u0005\u0005\u0007\tiPA\rFq\u0016\u001cW\u000f^8s\u0007\u0006\u001c\u0007.\u001a+bg.dunY1uS>t\u0007b\u0002B\u0004K\u0001\u0007\u0011\u0011`\u0001\u0002E\u0002")
/* loaded from: input_file:org/apache/spark/sql/eventhubs/EventHubsSource.class */
public class EventHubsSource implements Source, Logging {
    private Client ehClient;
    private int partitionCount;
    private Map<NameAndPartition, Object> initialPartitionSeqNos;
    public final SQLContext org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext;
    private final Map<String, String> parameters;
    public final String org$apache$spark$sql$eventhubs$EventHubsSource$$metadataPath;
    private final EventHubsConf ehConf;
    private final String ehName;
    private final SparkContext sc;
    private final Option<Object> maxOffsetsPerTrigger;
    private Option<Map<NameAndPartition, Object>> currentSeqNos;
    private Option<Map<NameAndPartition, Object>> earliestSeqNos;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    public static String[] getSortedExecutorList(SparkContext sparkContext) {
        return EventHubsSource$.MODULE$.getSortedExecutorList(sparkContext);
    }

    public static String InstructionsForPotentialDataLoss() {
        return EventHubsSource$.MODULE$.InstructionsForPotentialDataLoss();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void commit(Offset offset) {
        Source.commit$(this, offset);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v11, types: [org.apache.spark.sql.eventhubs.EventHubsSource] */
    private Client ehClient$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.ehClient = (Client) EventHubsSourceProvider$.MODULE$.clientFactory(this.parameters).apply(ehConf());
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        this.parameters = null;
        return this.ehClient;
    }

    private Client ehClient() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? ehClient$lzycompute() : this.ehClient;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.eventhubs.EventHubsSource] */
    private int partitionCount$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.partitionCount = ehClient().partitionCount();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.partitionCount;
    }

    private int partitionCount() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? partitionCount$lzycompute() : this.partitionCount;
    }

    private EventHubsConf ehConf() {
        return this.ehConf;
    }

    private String ehName() {
        return this.ehName;
    }

    private SparkContext sc() {
        return this.sc;
    }

    private Option<Object> maxOffsetsPerTrigger() {
        return this.maxOffsetsPerTrigger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.eventhubs.EventHubsSource] */
    private Map<NameAndPartition, Object> initialPartitionSeqNos$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 4)) == 0) {
                HDFSMetadataLog<EventHubsSourceOffset> hDFSMetadataLog = new HDFSMetadataLog<EventHubsSourceOffset>(this) { // from class: org.apache.spark.sql.eventhubs.EventHubsSource$$anon$1
                    public void serialize(EventHubsSourceOffset eventHubsSourceOffset, OutputStream outputStream) {
                        outputStream.write(0);
                        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
                        bufferedWriter.write(new StringBuilder(2).append("v").append(EventHubsSource$.MODULE$.VERSION()).append("\n").toString());
                        bufferedWriter.write(eventHubsSourceOffset.json());
                        bufferedWriter.flush();
                    }

                    /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                    public EventHubsSourceOffset m21deserialize(InputStream inputStream) {
                        inputStream.read();
                        String iOUtils = IOUtils.toString(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                        Predef$.MODULE$.assert(iOUtils.length() != 0);
                        if (StringOps$.MODULE$.apply$extension(Predef$.MODULE$.augmentString(iOUtils), 0) != 'v') {
                            return EventHubsSourceOffset$.MODULE$.apply(new SerializedOffset(iOUtils));
                        }
                        int indexOf = iOUtils.indexOf("\n");
                        if (indexOf <= 0) {
                            throw new IllegalStateException("Log file was malformed.");
                        }
                        parseVersion(iOUtils.substring(0, indexOf), EventHubsSource$.MODULE$.VERSION());
                        return EventHubsSourceOffset$.MODULE$.apply(new SerializedOffset(iOUtils.substring(indexOf + 1)));
                    }

                    {
                        super(this.org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext.sparkSession(), this.org$apache$spark$sql$eventhubs$EventHubsSource$$metadataPath, ClassTag$.MODULE$.apply(EventHubsSourceOffset.class));
                    }
                };
                this.initialPartitionSeqNos = ((EventHubsSourceOffset) hDFSMetadataLog.get(0L).getOrElse(() -> {
                    Map map = (Map) this.ehClient().translate(this.ehConf(), this.partitionCount(), this.ehClient().translate$default$3()).map(tuple2 -> {
                        if (tuple2 == null) {
                            throw new MatchError(tuple2);
                        }
                        return new Tuple2(new NameAndPartition(this.ehName(), tuple2._1$mcI$sp()), BoxesRunTime.boxToLong(tuple2._2$mcJ$sp()));
                    }, Map$.MODULE$.canBuildFrom());
                    EventHubsSourceOffset eventHubsSourceOffset = new EventHubsSourceOffset(map);
                    hDFSMetadataLog.add(0L, eventHubsSourceOffset);
                    this.logInfo(() -> {
                        return new StringBuilder(26).append("Initial sequence numbers: ").append(map).toString();
                    });
                    return eventHubsSourceOffset;
                })).partitionToSeqNos();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 4);
            }
        }
        return this.initialPartitionSeqNos;
    }

    private Map<NameAndPartition, Object> initialPartitionSeqNos() {
        return ((byte) (this.bitmap$0 & 4)) == 0 ? initialPartitionSeqNos$lzycompute() : this.initialPartitionSeqNos;
    }

    private Option<Map<NameAndPartition, Object>> currentSeqNos() {
        return this.currentSeqNos;
    }

    private void currentSeqNos_$eq(Option<Map<NameAndPartition, Object>> option) {
        this.currentSeqNos = option;
    }

    private Option<Map<NameAndPartition, Object>> earliestSeqNos() {
        return this.earliestSeqNos;
    }

    private void earliestSeqNos_$eq(Option<Map<NameAndPartition, Object>> option) {
        this.earliestSeqNos = option;
    }

    public StructType schema() {
        return EventHubsSourceProvider$.MODULE$.eventHubsSchema();
    }

    public Option<Offset> getOffset() {
        Map<NameAndPartition, Object> rateLimit;
        initialPartitionSeqNos();
        Map<Object, Tuple2<Object, Object>> allBoundedSeqNos = ehClient().allBoundedSeqNos();
        earliestSeqNos_$eq(new Some(((Map) allBoundedSeqNos.map(tuple2 -> {
            if (tuple2 != null) {
                int _1$mcI$sp = tuple2._1$mcI$sp();
                Tuple2 tuple2 = (Tuple2) tuple2._2();
                if (tuple2 != null) {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new NameAndPartition(this.ehName(), _1$mcI$sp)), BoxesRunTime.boxToLong(tuple2._1$mcJ$sp()));
                }
            }
            throw new MatchError(tuple2);
        }, Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())));
        Map<NameAndPartition, Object> map = ((Map) allBoundedSeqNos.map(tuple22 -> {
            if (tuple22 != null) {
                int _1$mcI$sp = tuple22._1$mcI$sp();
                Tuple2 tuple22 = (Tuple2) tuple22._2();
                if (tuple22 != null) {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new NameAndPartition(this.ehName(), _1$mcI$sp)), BoxesRunTime.boxToLong(tuple22._2$mcJ$sp()));
                }
            }
            throw new MatchError(tuple22);
        }, Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        boolean z = false;
        Some some = null;
        Option<Object> maxOffsetsPerTrigger = maxOffsetsPerTrigger();
        if (None$.MODULE$.equals(maxOffsetsPerTrigger)) {
            rateLimit = map;
        } else {
            if (maxOffsetsPerTrigger instanceof Some) {
                z = true;
                some = (Some) maxOffsetsPerTrigger;
                long unboxToLong = BoxesRunTime.unboxToLong(some.value());
                if (currentSeqNos().isEmpty()) {
                    rateLimit = rateLimit(unboxToLong, adjustStartingOffset(initialPartitionSeqNos()), map, (Map) earliestSeqNos().get());
                }
            }
            if (!z) {
                throw new MatchError(maxOffsetsPerTrigger);
            }
            rateLimit = rateLimit(BoxesRunTime.unboxToLong(some.value()), adjustStartingOffset((Map) currentSeqNos().get()), map, (Map) earliestSeqNos().get());
        }
        Map<NameAndPartition, Object> map2 = rateLimit;
        currentSeqNos_$eq(new Some(map2));
        logInfo(() -> {
            return new StringBuilder(11).append("GetOffset: ").append(((SeqLike) map2.toSeq().map(tuple23 -> {
                return tuple23.toString();
            }, Seq$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$)).toString();
        });
        return new Some(new EventHubsSourceOffset(map2));
    }

    private Map<NameAndPartition, Object> adjustStartingOffset(Map<NameAndPartition, Object> map) {
        return (Map) map.map(tuple2 -> {
            Tuple2 $minus$greater$extension;
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            NameAndPartition nameAndPartition = (NameAndPartition) tuple2._1();
            long _2$mcJ$sp = tuple2._2$mcJ$sp();
            if (_2$mcJ$sp < BoxesRunTime.unboxToLong(((MapLike) this.earliestSeqNos().get()).apply(nameAndPartition))) {
                this.reportDataLoss(new StringBuilder(112).append("Starting seqNo ").append(_2$mcJ$sp).append(" in partition ").append(nameAndPartition.partitionId()).append(" of EventHub ").append(nameAndPartition.ehName()).append(" ").append(new StringBuilder(40).append("is behind the earliest sequence number ").append(((MapLike) this.earliestSeqNos().get()).apply(nameAndPartition)).append(" ").toString()).append("present in the service. Some events may have expired and been missed.").toString());
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(nameAndPartition), ((MapLike) this.earliestSeqNos().get()).apply(nameAndPartition));
            } else {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(nameAndPartition), BoxesRunTime.boxToLong(_2$mcJ$sp));
            }
            return $minus$greater$extension;
        }, Map$.MODULE$.canBuildFrom());
    }

    private Map<NameAndPartition, Object> rateLimit(long j, Map<NameAndPartition, Object> map, Map<NameAndPartition, Object> map2, Map<NameAndPartition, Object> map3) {
        Map map4 = (Map) map2.flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            NameAndPartition nameAndPartition = (NameAndPartition) tuple2._1();
            long _2$mcJ$sp = tuple2._2$mcJ$sp();
            return Option$.MODULE$.option2Iterable(map.get(nameAndPartition).orElse(() -> {
                return map3.get(nameAndPartition);
            }).flatMap(obj -> {
                return $anonfun$rateLimit$3(this, _2$mcJ$sp, nameAndPartition, BoxesRunTime.unboxToLong(obj));
            }));
        }, Map$.MODULE$.canBuildFrom());
        double unboxToLong = BoxesRunTime.unboxToLong(map4.values().sum(Numeric$LongIsIntegral$.MODULE$));
        return unboxToLong < ((double) 1) ? map2 : (Map) map2.map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            NameAndPartition nameAndPartition = (NameAndPartition) tuple22._1();
            long _2$mcJ$sp = tuple22._2$mcJ$sp();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(nameAndPartition), map4.get(nameAndPartition).map(j2 -> {
                long unboxToLong2 = BoxesRunTime.unboxToLong(map.getOrElse(nameAndPartition, () -> {
                    return BoxesRunTime.unboxToLong(map3.apply(nameAndPartition));
                }));
                double d = j * (j2 / unboxToLong);
                this.logDebug(() -> {
                    return new StringBuilder(30).append("rateLimit ").append(nameAndPartition).append(" prorated amount is ").append(d).toString();
                });
                long ceil = unboxToLong2 + ((long) (d < ((double) 1) ? Math.ceil(d) : Math.floor(d)));
                this.logDebug(() -> {
                    return new StringBuilder(25).append("rateLimit ").append(nameAndPartition).append(" new offset is ").append(ceil).toString();
                });
                return Math.min(_2$mcJ$sp, ceil);
            }).getOrElse(() -> {
                return _2$mcJ$sp;
            }));
        }, Map$.MODULE$.canBuildFrom());
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        Map<NameAndPartition, Object> adjustStartingOffset;
        initialPartitionSeqNos();
        logInfo(() -> {
            return new StringBuilder(40).append("getBatch called with start = ").append(option).append(" and end = ").append(offset).toString();
        });
        Map<NameAndPartition, Object> partitionSeqNos = EventHubsSourceOffset$.MODULE$.getPartitionSeqNos(offset);
        if (currentSeqNos().isEmpty()) {
            currentSeqNos_$eq(new Some(partitionSeqNos));
        }
        if (option.isDefined()) {
            Object obj = option.get();
            if (obj != null ? obj.equals(offset) : offset == null) {
                return this.org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext.internalCreateDataFrame(this.org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)), schema(), true);
            }
        }
        if (earliestSeqNos().isEmpty()) {
            earliestSeqNos_$eq(new Some(((Map) ehClient().allBoundedSeqNos().map(tuple2 -> {
                if (tuple2 != null) {
                    int _1$mcI$sp = tuple2._1$mcI$sp();
                    Tuple2 tuple2 = (Tuple2) tuple2._2();
                    if (tuple2 != null) {
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new NameAndPartition(this.ehName(), _1$mcI$sp)), BoxesRunTime.boxToLong(tuple2._1$mcJ$sp()));
                    }
                }
                throw new MatchError(tuple2);
            }, Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())));
        }
        if (option instanceof Some) {
            adjustStartingOffset = adjustStartingOffset(EventHubsSourceOffset$.MODULE$.getPartitionSeqNos((Offset) ((Some) option).value()));
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            adjustStartingOffset = adjustStartingOffset(initialPartitionSeqNos());
        }
        Map<NameAndPartition, Object> map = adjustStartingOffset;
        Seq seq = partitionSeqNos.keySet().toSeq();
        logDebug(() -> {
            return new StringBuilder(12).append("Partitions: ").append(seq.mkString(", ")).toString();
        });
        String[] sortedExecutorList = EventHubsSource$.MODULE$.getSortedExecutorList(sc());
        int length = sortedExecutorList.length;
        logDebug(() -> {
            return new StringBuilder(18).append("Sorted executors: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(sortedExecutorList)).mkString(", ")).toString();
        });
        OffsetRange[] offsetRangeArr = (OffsetRange[]) ((TraversableOnce) ((TraversableLike) ((TraversableLike) seq.map(nameAndPartition -> {
            int hashCode;
            long unboxToLong = BoxesRunTime.unboxToLong(map.getOrElse(nameAndPartition, () -> {
                throw new IllegalStateException(new StringBuilder(25).append(nameAndPartition).append(" doesn't have a fromSeqNo").toString());
            }));
            long unboxToLong2 = BoxesRunTime.unboxToLong(partitionSeqNos.apply(nameAndPartition));
            Enumeration.Value partitionPreferredLocationStrategy = this.ehConf().partitionPreferredLocationStrategy();
            Enumeration.Value Hash = package$PartitionPreferredLocationStrategy$.MODULE$.Hash();
            if (Hash != null ? !Hash.equals(partitionPreferredLocationStrategy) : partitionPreferredLocationStrategy != null) {
                Enumeration.Value BalancedHash = package$PartitionPreferredLocationStrategy$.MODULE$.BalancedHash();
                if (BalancedHash != null ? !BalancedHash.equals(partitionPreferredLocationStrategy) : partitionPreferredLocationStrategy != null) {
                    throw new IllegalArgumentException(new StringBuilder(32).append("Unsupported partition strategy: ").append(this.ehConf().partitionPreferredLocationStrategy()).toString());
                }
                hashCode = nameAndPartition.ehName().hashCode() + nameAndPartition.partitionId();
            } else {
                hashCode = nameAndPartition.hashCode();
            }
            int i = hashCode;
            return new Tuple5(nameAndPartition, BoxesRunTime.boxToLong(unboxToLong), BoxesRunTime.boxToLong(unboxToLong2), BoxesRunTime.boxToInteger(i), length > 0 ? new Some(sortedExecutorList[Math.floorMod(i, length)]) : None$.MODULE$);
        }, Seq$.MODULE$.canBuildFrom())).map(tuple5 -> {
            if (tuple5 == null) {
                throw new MatchError(tuple5);
            }
            return OffsetRange$.MODULE$.apply((NameAndPartition) tuple5._1(), BoxesRunTime.unboxToLong(tuple5._2()), BoxesRunTime.unboxToLong(tuple5._3()), (Option) tuple5._5());
        }, Seq$.MODULE$.canBuildFrom())).filter(offsetRange -> {
            return BoxesRunTime.boxToBoolean($anonfun$getBatch$8(this, offsetRange));
        })).toArray(ClassTag$.MODULE$.apply(OffsetRange.class));
        RDD<InternalRow> internalRow = EventHubsSourceProvider$.MODULE$.toInternalRow(new EventHubsRDD(sc(), ehConf().trimmed(), offsetRangeArr));
        logInfo(() -> {
            return new StringBuilder(41).append("GetBatch generating RDD of offset range: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(offsetRangeArr)).sortBy(offsetRange2 -> {
                return offsetRange2.nameAndPartition().toString();
            }, Ordering$String$.MODULE$))).mkString(", ")).toString();
        });
        return this.org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext.internalCreateDataFrame(internalRow, schema(), true);
    }

    public synchronized void stop() {
        ehClient().close();
    }

    private void reportDataLoss(String str) {
        logWarning(() -> {
            return new StringBuilder(0).append(str).append(new StringBuilder(2).append(". ").append(EventHubsSource$.MODULE$.InstructionsForPotentialDataLoss()).toString()).toString();
        });
    }

    public static final /* synthetic */ long $anonfun$maxOffsetsPerTrigger$1(String str) {
        return new StringOps(Predef$.MODULE$.augmentString(str)).toLong();
    }

    public static final /* synthetic */ Option $anonfun$rateLimit$3(EventHubsSource eventHubsSource, long j, NameAndPartition nameAndPartition, long j2) {
        long j3 = j - j2;
        eventHubsSource.logDebug(() -> {
            return new StringBuilder(19).append("rateLimit ").append(nameAndPartition).append(" size is ").append(j3).toString();
        });
        return j3 > 0 ? new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(nameAndPartition), BoxesRunTime.boxToLong(j3))) : None$.MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$getBatch$8(EventHubsSource eventHubsSource, OffsetRange offsetRange) {
        if (offsetRange.untilSeqNo() >= offsetRange.fromSeqNo()) {
            return true;
        }
        eventHubsSource.reportDataLoss(new StringBuilder(46).append("Partition ").append(offsetRange.nameAndPartition()).append("'s sequence number was changed from ").append(new StringBuilder(36).append(offsetRange.fromSeqNo()).append(" to ").append(offsetRange.untilSeqNo()).append(", some data may have been missed").toString()).toString());
        return false;
    }

    public EventHubsSource(SQLContext sQLContext, Map<String, String> map, String str) {
        this.org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext = sQLContext;
        this.parameters = map;
        this.org$apache$spark$sql$eventhubs$EventHubsSource$$metadataPath = str;
        Source.$init$(this);
        Logging.$init$(this);
        this.ehConf = EventHubsConf$.MODULE$.toConf(map);
        this.ehName = ehConf().name();
        this.sc = sQLContext.sparkContext();
        this.maxOffsetsPerTrigger = Option$.MODULE$.apply(map.get(EventHubsConf$.MODULE$.MaxEventsPerTriggerKey()).map(str2 -> {
            return BoxesRunTime.boxToLong($anonfun$maxOffsetsPerTrigger$1(str2));
        }).getOrElse(() -> {
            return this.partitionCount() * 1000;
        }));
        this.currentSeqNos = None$.MODULE$;
        this.earliestSeqNos = None$.MODULE$;
    }
}
