package com.microsoft.azure.cosmosdb.spark;

import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark;
import com.microsoft.azure.cosmosdb.spark.LoggingTrait;
import com.microsoft.azure.cosmosdb.spark.config.Config;
import com.microsoft.azure.cosmosdb.spark.config.Config$;
import com.microsoft.azure.cosmosdb.spark.config.CosmosDBConfig$;
import com.microsoft.azure.cosmosdb.spark.rdd.CosmosDBRDD;
import com.microsoft.azure.cosmosdb.spark.rdd.JavaCosmosDBRDD;
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils;
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils$;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.spark.Partition;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.FilePartition;
import org.slf4j.Logger;
import rx.Observable;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Product;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.Map$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.api.TypeTags;
import scala.runtime.BooleanRef;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.util.Random;

/* compiled from: CosmosDBSpark.scala */
/* loaded from: input_file:com/microsoft/azure/cosmosdb/spark/CosmosDBSpark$.class */
public final class CosmosDBSpark$ implements LoggingTrait, Serializable {
    public static final CosmosDBSpark$ MODULE$ = null;
    private final String defaultSource;
    private Option<Object> lastUpsertSetting;
    private Option<Object> lastWritingBatchSize;
    private final Random random;
    private transient Logger com$microsoft$azure$cosmosdb$spark$LoggingTrait$$log_;

    static {
        new CosmosDBSpark$();
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public Logger com$microsoft$azure$cosmosdb$spark$LoggingTrait$$log_() {
        return this.com$microsoft$azure$cosmosdb$spark$LoggingTrait$$log_;
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void com$microsoft$azure$cosmosdb$spark$LoggingTrait$$log__$eq(Logger logger) {
        this.com$microsoft$azure$cosmosdb$spark$LoggingTrait$$log_ = logger;
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public String logName() {
        return LoggingTrait.Cclass.logName(this);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public Logger log() {
        return LoggingTrait.Cclass.log(this);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logInfo(Function0<String> function0) {
        LoggingTrait.Cclass.logInfo(this, function0);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logDebug(Function0<String> function0) {
        LoggingTrait.Cclass.logDebug(this, function0);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logTrace(Function0<String> function0) {
        LoggingTrait.Cclass.logTrace(this, function0);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logWarning(Function0<String> function0) {
        LoggingTrait.Cclass.logWarning(this, function0);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logError(Function0<String> function0) {
        LoggingTrait.Cclass.logError(this, function0);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logInfo(Function0<String> function0, Throwable th) {
        LoggingTrait.Cclass.logInfo(this, function0, th);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logDebug(Function0<String> function0, Throwable th) {
        LoggingTrait.Cclass.logDebug(this, function0, th);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logTrace(Function0<String> function0, Throwable th) {
        LoggingTrait.Cclass.logTrace(this, function0, th);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logWarning(Function0<String> function0, Throwable th) {
        LoggingTrait.Cclass.logWarning(this, function0, th);
    }

    @Override // com.microsoft.azure.cosmosdb.spark.LoggingTrait
    public void logError(Function0<String> function0, Throwable th) {
        LoggingTrait.Cclass.logError(this, function0, th);
    }

    public String defaultSource() {
        return this.defaultSource;
    }

    public Option<Object> lastUpsertSetting() {
        return this.lastUpsertSetting;
    }

    public void lastUpsertSetting_$eq(Option<Object> option) {
        this.lastUpsertSetting = option;
    }

    public Option<Object> lastWritingBatchSize() {
        return this.lastWritingBatchSize;
    }

    public void lastWritingBatchSize_$eq(Option<Object> option) {
        this.lastWritingBatchSize = option;
    }

    public Random random() {
        return this.random;
    }

    public CosmosDBSpark.Builder builder() {
        return new CosmosDBSpark.Builder();
    }

    public CosmosDBRDD load(SparkContext sparkContext) {
        return load(sparkContext, Config$.MODULE$.apply(sparkContext));
    }

    public CosmosDBRDD load(SparkContext sparkContext, Config config) {
        return builder().sparkContext(sparkContext).config(config).build().toRDD();
    }

    public <D extends Product> Dataset<Row> load(SparkSession sparkSession, TypeTags.TypeTag<D> typeTag) {
        return load(sparkSession, Config$.MODULE$.apply(sparkSession), typeTag);
    }

    public <D extends Product> Dataset<Row> load(SparkSession sparkSession, Config config, TypeTags.TypeTag<D> typeTag) {
        return builder().sparkSession(sparkSession).config(config).build().toDF(typeTag);
    }

    public <D> Dataset<D> load(SparkSession sparkSession, Config config, Class<D> cls) {
        return builder().sparkSession(sparkSession).config(config).build().toDS(cls);
    }

    public <D> void save(RDD<D> rdd, ClassTag<D> classTag) {
        save(rdd, Config$.MODULE$.apply(rdd.sparkContext()), classTag);
    }

    public <D> void save(RDD<D> rdd, Config config, ClassTag<D> classTag) {
        IntRef create = IntRef.create(0);
        try {
            create.elem = rdd.getNumPartitions();
        } catch (Throwable unused) {
        }
        BooleanRef create2 = BooleanRef.create(false);
        BooleanRef create3 = BooleanRef.create(false);
        ObjectRef create4 = ObjectRef.create(Map$.MODULE$.apply(Nil$.MODULE$));
        try {
            Partition[] partitions = rdd.partitions();
            create2.elem = partitions.length > 0 && (partitions[0] instanceof ADLFilePartition);
            create3.elem = partitions.length > 0 && (partitions[0] instanceof FilePartition);
            if (create2.elem || create3.elem) {
                Predef$.MODULE$.refArrayOps(partitions).foreach(new CosmosDBSpark$$anonfun$save$1(create4));
            }
        } catch (Throwable unused2) {
        }
        Map map = HdfsUtils$.MODULE$.getConfigurationMap(rdd.sparkContext().hadoopConfiguration()).toMap(Predef$.MODULE$.$conforms());
        CosmosDBConnection cosmosDBConnection = new CosmosDBConnection(config);
        IntRef create5 = IntRef.create(0);
        create5.elem = cosmosDBConnection.getCollectionThroughput();
        rdd.mapPartitionsWithIndex(new CosmosDBSpark$$anonfun$1(config, classTag, create, create2, create3, create4, map, create5), true, classTag).collect();
    }

    private <D> void bulkUpdate(Iterator<D> iterator, CosmosDBConnection cosmosDBConnection, int i, int i2, Option<String> option, ClassTag<D> classTag, ClassTag<D> classTag2) {
        DocumentBulkExecutor documentBulkImporter = cosmosDBConnection.getDocumentBulkImporter(i, option);
        ArrayList arrayList = new ArrayList(i2);
        ArrayList arrayList2 = new ArrayList(i2);
        ObjectRef create = ObjectRef.create((Object) null);
        iterator.foreach(new CosmosDBSpark$$anonfun$bulkUpdate$1(i2, documentBulkImporter, arrayList, arrayList2, create));
        if (arrayList.size() > 0) {
            create.elem = documentBulkImporter.updateAll(arrayList);
        }
        if (arrayList2.size() > 0) {
            create.elem = documentBulkImporter.updateAllWithPatch(arrayList2);
        }
    }

    private <D> void bulkImport(Iterator<D> iterator, CosmosDBConnection cosmosDBConnection, int i, int i2, Option<String> option, Option<String> option2, boolean z, ClassTag<D> classTag) {
        DocumentBulkExecutor documentBulkImporter = cosmosDBConnection.getDocumentBulkImporter(i, option2);
        ArrayList arrayList = new ArrayList(i2);
        ObjectRef create = ObjectRef.create((Object) null);
        iterator.foreach(new CosmosDBSpark$$anonfun$bulkImport$1(i2, option, z, documentBulkImporter, arrayList, create));
        if (arrayList.size() > 0) {
            create.elem = documentBulkImporter.importAll(arrayList, z);
        }
    }

    private <D> void importWithRxJava(Iterator<D> iterator, CosmosDBConnection cosmosDBConnection, Integer num, long j, Option<String> option, boolean z, ClassTag<D> classTag) {
        ObjectRef create = ObjectRef.create(new ArrayList(Predef$.MODULE$.Integer2int(num)));
        iterator.foreach(new CosmosDBSpark$$anonfun$importWithRxJava$1(cosmosDBConnection, num, j, option, z, create, ObjectRef.create((Object) null), IntRef.create(0)));
        if (((ArrayList) create.elem).isEmpty()) {
            return;
        }
        Observable.merge((ArrayList) create.elem).toBlocking().last();
    }

    public <D> Iterator<D> com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$saveFilePartition(Iterator<D> iterator, Config config, int i, FilePartition filePartition, Map<String, String> map, int i2, ClassTag<D> classTag) {
        CosmosDBConnection cosmosDBConnection = new CosmosDBConnection(config);
        Option option = config.get(CosmosDBConfig$.MODULE$.WritingBatchId(), ClassTag$.MODULE$.apply(String.class));
        Option option2 = config.get(CosmosDBConfig$.MODULE$.adlFileCheckpointPath(), ClassTag$.MODULE$.apply(String.class));
        ObjectRef create = ObjectRef.create((Object) null);
        if (option2.isDefined()) {
            create.elem = new HdfsUtils(map);
        }
        Option option3 = config.get(CosmosDBConfig$.MODULE$.CosmosDBFileStoreCollection(), ClassTag$.MODULE$.apply(String.class));
        ObjectRef create2 = ObjectRef.create((Object) null);
        if (option3.isDefined()) {
            create2.elem = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"/dbs/", "/colls/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{(String) config.get(CosmosDBConfig$.MODULE$.Database(), ClassTag$.MODULE$.apply(String.class)).get(), option3.get()}));
        }
        IntRef create3 = IntRef.create(0);
        filePartition.files().foreach(new CosmosDBSpark$$anonfun$com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$saveFilePartition$1(cosmosDBConnection, option, option2, create, option3, create2, create3));
        if (create3.elem == filePartition.files().size()) {
            return new ListBuffer().iterator();
        }
        Iterator<D> savePartition = savePartition(cosmosDBConnection, iterator, config, i, i2, classTag);
        if (option2.isDefined()) {
            filePartition.files().foreach(new CosmosDBSpark$$anonfun$com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$saveFilePartition$2(option, option2, create));
        } else if (option3.isDefined()) {
            filePartition.files().foreach(new CosmosDBSpark$$anonfun$com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$saveFilePartition$3(cosmosDBConnection, option, create2));
        }
        return savePartition;
    }

    public <D> Iterator<D> com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$saveAdlPartition(Iterator<D> iterator, Config config, int i, String str, Map<String, String> map, int i2, ClassTag<D> classTag) {
        CosmosDBConnection cosmosDBConnection = new CosmosDBConnection(config);
        Iterator<D> savePartition = savePartition(cosmosDBConnection, iterator, config, i, i2, classTag);
        Option option = config.get(CosmosDBConfig$.MODULE$.adlFileCheckpointPath(), ClassTag$.MODULE$.apply(String.class));
        Option option2 = config.get(CosmosDBConfig$.MODULE$.WritingBatchId(), ClassTag$.MODULE$.apply(String.class));
        if (option.isDefined()) {
            ADLConnection$.MODULE$.markAdlFileProcessed(new HdfsUtils(map), (String) option.get(), str, (String) option2.get());
        } else {
            Option option3 = config.get(CosmosDBConfig$.MODULE$.CosmosDBFileStoreCollection(), ClassTag$.MODULE$.apply(String.class));
            if (option3.isDefined()) {
                ADLConnection$.MODULE$.markAdlFileStatus(cosmosDBConnection, new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"/dbs/", "/colls/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{(String) config.get(CosmosDBConfig$.MODULE$.Database(), ClassTag$.MODULE$.apply(String.class)).get(), option3.get()})), str, (String) option2.get(), false, true);
            }
        }
        return savePartition;
    }

    public <D> Iterator<D> com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$savePartition(Iterator<D> iterator, Config config, int i, int i2, ClassTag<D> classTag) {
        return savePartition(new CosmosDBConnection(config), iterator, config, i, i2, classTag);
    }

    private <D> Iterator<D> savePartition(CosmosDBConnection cosmosDBConnection, Iterator<D> iterator, Config config, int i, int i2, ClassTag<D> classTag) {
        CosmosDBConnection cosmosDBConnection2 = new CosmosDBConnection(config);
        boolean z = new StringOps(Predef$.MODULE$.augmentString((String) config.getOrElse(CosmosDBConfig$.MODULE$.Upsert(), new CosmosDBSpark$$anonfun$2()))).toBoolean();
        int i3 = new StringOps(Predef$.MODULE$.augmentString((String) config.getOrElse(CosmosDBConfig$.MODULE$.WritingBatchSize(), new CosmosDBSpark$$anonfun$3()))).toInt();
        int i4 = new StringOps(Predef$.MODULE$.augmentString((String) config.getOrElse(CosmosDBConfig$.MODULE$.WritingBatchDelayMs(), new CosmosDBSpark$$anonfun$4()))).toInt();
        Option<String> option = config.get(CosmosDBConfig$.MODULE$.RootPropertyToSave(), ClassTag$.MODULE$.apply(String.class));
        boolean z2 = new StringOps(Predef$.MODULE$.augmentString((String) config.get(CosmosDBConfig$.MODULE$.BulkUpdate(), ClassTag$.MODULE$.apply(String.class)).getOrElse(new CosmosDBSpark$$anonfun$5()))).toBoolean();
        boolean z3 = new StringOps(Predef$.MODULE$.augmentString((String) config.get(CosmosDBConfig$.MODULE$.BulkImport(), ClassTag$.MODULE$.apply(String.class)).getOrElse(new CosmosDBSpark$$anonfun$6()))).toBoolean();
        int i5 = new StringOps(Predef$.MODULE$.augmentString((String) config.get(CosmosDBConfig$.MODULE$.ClientInitDelay(), ClassTag$.MODULE$.apply(String.class)).getOrElse(new CosmosDBSpark$$anonfun$7()))).toInt();
        Option<String> option2 = config.get(CosmosDBConfig$.MODULE$.PartitionKeyDefinition(), ClassTag$.MODULE$.apply(String.class));
        if ((i / i5) + (i % i5 > 0 ? 1 : 0) > 0) {
            TimeUnit.SECONDS.sleep(random().nextInt(r0));
        }
        lastUpsertSetting_$eq(new Some(BoxesRunTime.boxToBoolean(z)));
        lastWritingBatchSize_$eq(new Some(BoxesRunTime.boxToInteger(i3)));
        if (iterator.nonEmpty()) {
            if (z2) {
                logDebug(new CosmosDBSpark$$anonfun$savePartition$1());
                bulkUpdate(iterator, cosmosDBConnection2, i2, i3, option2, classTag, classTag);
            } else if (z3) {
                logDebug(new CosmosDBSpark$$anonfun$savePartition$2());
                bulkImport(iterator, cosmosDBConnection2, i2, i3, option, option2, z, classTag);
            } else {
                logDebug(new CosmosDBSpark$$anonfun$savePartition$3());
                importWithRxJava(iterator, cosmosDBConnection2, Predef$.MODULE$.int2Integer(i3), i4, option, z, classTag);
            }
        }
        return new ListBuffer().iterator();
    }

    public <D> void save(Dataset<D> dataset, ClassTag<D> classTag) {
        save(dataset, Config$.MODULE$.apply(dataset.sparkSession().sparkContext().getConf()), classTag);
    }

    public <D> void save(Dataset<D> dataset, Config config, ClassTag<D> classTag) {
        save(dataset.rdd(), config, classTag);
    }

    public void save(DataFrameWriter<?> dataFrameWriter) {
        dataFrameWriter.format(defaultSource()).save();
    }

    public void save(DataFrameWriter<?> dataFrameWriter, Config config) {
        dataFrameWriter.format(defaultSource()).options(config.asOptions()).save();
    }

    public DataFrameReader read(SparkSession sparkSession) {
        return sparkSession.read().format(defaultSource());
    }

    public DataFrameWriter<Row> write(Dataset<Row> dataset) {
        return dataset.write().format(defaultSource());
    }

    public JavaCosmosDBRDD load(JavaSparkContext javaSparkContext) {
        return builder().javaSparkContext(javaSparkContext).build().toJavaRDD();
    }

    public JavaCosmosDBRDD load(JavaSparkContext javaSparkContext, Config config) {
        return builder().javaSparkContext(javaSparkContext).config(config).build().toJavaRDD();
    }

    public void save(JavaRDD<Document> javaRDD) {
        save(javaRDD, Document.class);
    }

    public <D> void save(JavaRDD<D> javaRDD, Class<D> cls) {
        save(javaRDD.rdd(), ct$1(cls));
    }

    public void save(JavaRDD<Document> javaRDD, Config config) {
        save(javaRDD, config, Document.class);
    }

    public <D> void save(JavaRDD<D> javaRDD, Config config, Class<D> cls) {
        save(javaRDD.rdd(), config, ct$2(cls));
    }

    public CosmosDBSpark apply(SparkSession sparkSession, Config config) {
        return new CosmosDBSpark(sparkSession, config);
    }

    public Option<Tuple2<SparkSession, Config>> unapply(CosmosDBSpark cosmosDBSpark) {
        return cosmosDBSpark == null ? None$.MODULE$ : new Some(new Tuple2(cosmosDBSpark.sparkSession(), cosmosDBSpark.readConfig()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private final ClassTag ct$1(Class cls) {
        return ClassTag$.MODULE$.apply(cls);
    }

    private final ClassTag ct$2(Class cls) {
        return ClassTag$.MODULE$.apply(cls);
    }

    private CosmosDBSpark$() {
        MODULE$ = this;
        com$microsoft$azure$cosmosdb$spark$LoggingTrait$$log__$eq(null);
        this.defaultSource = DefaultSource.class.getCanonicalName();
        this.random = new Random(System.nanoTime());
    }
}
