package org.apache.spark.streaming.eventhubs;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.servicebus.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Partition;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.eventhubs.checkpoint.OffsetRange;
import org.apache.spark.streaming.eventhubs.checkpoint.OffsetStoreParams;
import org.apache.spark.streaming.eventhubs.checkpoint.ProgressWriter;
import scala.Enumeration;
import scala.Function5;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.StringBuilder;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: EventHubRDD.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]c\u0001B\u0001\u0003\u00015\u00111\"\u0012<f]RDUO\u0019*E\t*\u00111\u0001B\u0001\nKZ,g\u000e\u001e5vENT!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0004\u0001M\u0011\u0001A\u0004\t\u0004\u001fI!R\"\u0001\t\u000b\u0005E1\u0011a\u0001:eI&\u00111\u0003\u0005\u0002\u0004%\u0012#\u0005CA\u000b\u001e\u001b\u00051\"BA\u0002\u0018\u0015\tA\u0012$A\u0003buV\u0014XM\u0003\u0002\u001b7\u0005IQ.[2s_N|g\r\u001e\u0006\u00029\u0005\u00191m\\7\n\u0005y1\"!C#wK:$H)\u0019;b\u0011!\u0001\u0003A!A!\u0002\u0013\t\u0013AA:d!\t\u00113%D\u0001\u0007\u0013\t!cA\u0001\u0007Ta\u0006\u00148nQ8oi\u0016DH\u000f\u0003\u0005'\u0001\t\u0005\t\u0015!\u0003(\u0003I)g/\u001a8u\u0011V\u00147\u000fU1sC6\u001cX*\u00199\u0011\t!r\u0013\u0007\u000e\b\u0003S1j\u0011A\u000b\u0006\u0002W\u0005)1oY1mC&\u0011QFK\u0001\u0007!J,G-\u001a4\n\u0005=\u0002$aA'ba*\u0011QF\u000b\t\u0003QIJ!a\r\u0019\u0003\rM#(/\u001b8h!\u0011Ac&M\u0019\t\u0011Y\u0002!Q1A\u0005\u0002]\nAb\u001c4gg\u0016$(+\u00198hKN,\u0012\u0001\u000f\t\u0004s\u0005#eB\u0001\u001e@\u001d\tYd(D\u0001=\u0015\tiD\"\u0001\u0004=e>|GOP\u0005\u0002W%\u0011\u0001IK\u0001\ba\u0006\u001c7.Y4f\u0013\t\u00115I\u0001\u0003MSN$(B\u0001!+!\t)\u0005*D\u0001G\u0015\t9%!\u0001\u0006dQ\u0016\u001c7\u000e]8j]RL!!\u0013$\u0003\u0017=3gm]3u%\u0006tw-\u001a\u0005\t\u0017\u0002\u0011\t\u0011)A\u0005q\u0005iqN\u001a4tKR\u0014\u0016M\\4fg\u0002B\u0001\"\u0014\u0001\u0003\u0002\u0003\u0006IAT\u0001\nE\u0006$8\r\u001b+j[\u0016\u0004\"a\u0014)\u000e\u0003\u0011I!!\u0015\u0003\u0003\tQKW.\u001a\u0005\t'\u0002\u0011\t\u0011)A\u0005)\u0006aqN\u001a4tKR\u0004\u0016M]1ngB\u0011Q)V\u0005\u0003-\u001a\u0013\u0011c\u00144gg\u0016$8\u000b^8sKB\u000b'/Y7t\u0011!A\u0006A!A!\u0002\u0013I\u0016aF3wK:$\b*\u001e2SK\u000e,\u0017N^3s\u0007J,\u0017\r^8s!!I#\f\u000e/`Er\u001b\u0018BA.+\u0005%1UO\\2uS>tW\u0007\u0005\u0002*;&\u0011aL\u000b\u0002\u0004\u0013:$\bCA\u0015a\u0013\t\t'F\u0001\u0003M_:<\u0007CA2q\u001d\t!gN\u0004\u0002f[:\u0011a\r\u001c\b\u0003O.t!\u0001\u001b6\u000f\u0005mJ\u0017\"A\u0006\n\u0005%Q\u0011BA\u0004\t\u0013\t)a!\u0003\u0002\u0004\t%\u0011qNA\u0001\u0015\u000bZ,g\u000e\u001e%vEN|eMZ:fiRK\b/Z:\n\u0005E\u0014(aE#wK:$\b*\u001e2t\u001f\u001a47/\u001a;UsB,'BA8\u0003!\t!X/D\u0001\u0003\u0013\t1(A\u0001\fFm\u0016tG\u000fS;cg\u000ec\u0017.\u001a8u/J\f\u0007\u000f]3s\u0011\u0015A\b\u0001\"\u0001z\u0003\u0019a\u0014N\\5u}QA!p\u001f?~}~\f\t\u0001\u0005\u0002u\u0001!)\u0001e\u001ea\u0001C!)ae\u001ea\u0001O!)ag\u001ea\u0001q!)Qj\u001ea\u0001\u001d\")1k\u001ea\u0001)\")\u0001l\u001ea\u00013\"9\u0011Q\u0001\u0001\u0005B\u0005\u001d\u0011!D4fiB\u000b'\u000f^5uS>t7/\u0006\u0002\u0002\nA)\u0011&a\u0003\u0002\u0010%\u0019\u0011Q\u0002\u0016\u0003\u000b\u0005\u0013(/Y=\u0011\u0007\t\n\t\"C\u0002\u0002\u0014\u0019\u0011\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\u0005]\u0001\u0001\"\u0003\u0002\u001a\u0005yqO]1qa&twMU3dK&4X\r\u0006\u0005\u0002\u001c\u0005u\u0011qEA\u0016!\rI\u0014\t\u0006\u0005\t\u0003?\t)\u00021\u0001\u0002\"\u0005ARM^3oi\"+(MT1nK\u0006sG\rU1si&$\u0018n\u001c8\u0011\u0007Q\f\u0019#C\u0002\u0002&\t\u0011\u0001$\u0012<f]RDUO\u0019(b[\u0016\fe\u000e\u001a)beRLG/[8o\u0011\u001d\tI#!\u0006A\u0002M\fa\"\u001a<f]RDUOY\"mS\u0016tG\u000fC\u0004\u0002.\u0005U\u0001\u0019\u0001/\u0002'\u0015D\b/Z2uK\u0012,e/\u001a8u\u001dVl'-\u001a:\t\u000f\u0005E\u0002\u0001\"\u0011\u00024\u000591m\\7qkR,GCBA\u001b\u0003w\ty\u0004\u0005\u0003:\u0003o!\u0012bAA\u001d\u0007\nA\u0011\n^3sCR|'\u000f\u0003\u0005\u0002>\u0005=\u0002\u0019AA\b\u0003\u0015\u0019\b\u000f\\5u\u0011!\t\t%a\fA\u0002\u0005\r\u0013aB2p]R,\u0007\u0010\u001e\t\u0004E\u0005\u0015\u0013bAA$\r\tYA+Y:l\u0007>tG/\u001a=uQ\u0011\ty#a\u0013\u0011\t\u00055\u00131K\u0007\u0003\u0003\u001fR1!!\u0015\u0007\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003+\nyE\u0001\u0007EKZ,Gn\u001c9fe\u0006\u0003\u0018\u000e")
/* loaded from: input_file:org/apache/spark/streaming/eventhubs/EventHubRDD.class */
public class EventHubRDD extends RDD<EventData> {
    private final Map<String, Map<String, String>> eventHubsParamsMap;
    private final List<OffsetRange> offsetRanges;
    public final Time org$apache$spark$streaming$eventhubs$EventHubRDD$$batchTime;
    private final OffsetStoreParams offsetParams;
    private final Function5<Map<String, String>, Object, Object, Enumeration.Value, Object, EventHubsClientWrapper> eventHubReceiverCreator;

    public List<OffsetRange> offsetRanges() {
        return this.offsetRanges;
    }

    public Partition[] getPartitions() {
        return (Partition[]) ((TraversableOnce) ((List) offsetRanges().zipWithIndex(List$.MODULE$.canBuildFrom())).map(new EventHubRDD$$anonfun$getPartitions$1(this), List$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(Partition.class));
    }

    private List<EventData> wrappingReceive(EventHubNameAndPartition eventHubNameAndPartition, EventHubsClientWrapper eventHubsClientWrapper, int i) {
        ListBuffer listBuffer = new ListBuffer();
        ListBuffer listBuffer2 = new ListBuffer();
        int i2 = 0;
        while (listBuffer.size() < i) {
            if (i2 > i * 2) {
                throw new Exception(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{StringUtil.EMPTY, " cannot return data, the trace is"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{eventHubNameAndPartition}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{" ", StringUtil.EMPTY})).s(Predef$.MODULE$.genericWrapArray(new Object[]{listBuffer2.toList()}))).toString());
            }
            Iterable<EventData> receive = eventHubsClientWrapper.receive(i - listBuffer.size());
            if (receive == null) {
                return listBuffer.toList();
            }
            List list = receive.toList();
            listBuffer2.$plus$eq(BoxesRunTime.boxToLong(list.length()));
            i2++;
            listBuffer.$plus$plus$eq(list);
        }
        return listBuffer.toList();
    }

    @DeveloperApi
    public Iterator<EventData> compute(Partition partition, TaskContext taskContext) {
        EventHubRDDPartition eventHubRDDPartition = (EventHubRDDPartition) partition;
        ProgressWriter progressWriter = new ProgressWriter(this.offsetParams.checkpointDir(), this.offsetParams.appName(), this.offsetParams.streamId(), this.offsetParams.eventHubNamespace(), eventHubRDDPartition.eventHubNameAndPartitionID(), this.org$apache$spark$streaming$eventhubs$EventHubRDD$$batchTime.milliseconds(), new Configuration());
        long fromOffset = eventHubRDDPartition.fromOffset();
        if (eventHubRDDPartition.fromSeq() >= eventHubRDDPartition.untilSeq()) {
            logInfo(new EventHubRDD$$anonfun$compute$1(this, eventHubRDDPartition));
            progressWriter.write(this.org$apache$spark$streaming$eventhubs$EventHubRDD$$batchTime.milliseconds(), eventHubRDDPartition.fromOffset(), eventHubRDDPartition.fromSeq());
            logInfo(new EventHubRDD$$anonfun$compute$2(this, eventHubRDDPartition, fromOffset));
            return package$.MODULE$.Iterator().apply(Nil$.MODULE$);
        }
        int untilSeq = (int) (eventHubRDDPartition.untilSeq() - eventHubRDDPartition.fromSeq());
        long currentTimeMillis = System.currentTimeMillis();
        logInfo(new EventHubRDD$$anonfun$compute$3(this, eventHubRDDPartition, untilSeq));
        EventHubsClientWrapper eventHubsClientWrapper = (EventHubsClientWrapper) this.eventHubReceiverCreator.apply((Map) this.eventHubsParamsMap.apply(eventHubRDDPartition.eventHubNameAndPartitionID().eventHubName()), BoxesRunTime.boxToInteger(eventHubRDDPartition.eventHubNameAndPartitionID().partitionId()), BoxesRunTime.boxToLong(fromOffset), eventHubRDDPartition.offsetType(), BoxesRunTime.boxToInteger(untilSeq));
        List<EventData> wrappingReceive = wrappingReceive(eventHubRDDPartition.eventHubNameAndPartitionID(), eventHubsClientWrapper, untilSeq);
        logInfo(new EventHubRDD$$anonfun$compute$4(this, currentTimeMillis, wrappingReceive));
        EventData eventData = (EventData) wrappingReceive.last();
        long j = new StringOps(Predef$.MODULE$.augmentString(eventData.getSystemProperties().getOffset())).toLong();
        long sequenceNumber = eventData.getSystemProperties().getSequenceNumber();
        progressWriter.write(this.org$apache$spark$streaming$eventhubs$EventHubRDD$$batchTime.milliseconds(), j, sequenceNumber);
        logInfo(new EventHubRDD$$anonfun$compute$5(this, eventHubRDDPartition, j, sequenceNumber));
        eventHubsClientWrapper.close();
        return wrappingReceive.iterator();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public EventHubRDD(SparkContext sparkContext, Map<String, Map<String, String>> map, List<OffsetRange> list, Time time, OffsetStoreParams offsetStoreParams, Function5<Map<String, String>, Object, Object, Enumeration.Value, Object, EventHubsClientWrapper> function5) {
        super(sparkContext, Nil$.MODULE$, ClassTag$.MODULE$.apply(EventData.class));
        this.eventHubsParamsMap = map;
        this.offsetRanges = list;
        this.org$apache$spark$streaming$eventhubs$EventHubRDD$$batchTime = time;
        this.offsetParams = offsetStoreParams;
        this.eventHubReceiverCreator = function5;
    }
}
