package org.apache.spark.sql.cosmosdb.util;

import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.spark.AsyncCosmosDBConnection;
import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRowConverter$;
import cosmosdb_connector_shaded.rx.Observable;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.immutable.Nil$;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;

/* JADX INFO: Add missing generic type declarations: [D] */
/* compiled from: StreamingUtils.scala */
/* loaded from: input_file:org/apache/spark/sql/cosmosdb/util/StreamingWriteTask$$anonfun$importStreamingData$1.class */
public final class StreamingWriteTask$$anonfun$importStreamingData$1<D> extends AbstractFunction1<D, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final StructType schema$1;
    private final boolean upsert$1;
    private final int writingBatchSize$1;
    private final int writingBatchDelayMs$1;
    private final AsyncCosmosDBConnection asyncConnection$1;
    private final ObjectRef observables$1;
    private final ObjectRef createDocumentObs$1;
    private final IntRef batchSize$1;

    public final void apply(D d) {
        if (!(d instanceof InternalRow)) {
            throw new IllegalStateException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"InternalRow expected from structured stream"})).s(Nil$.MODULE$));
        }
        Document document = new Document(CosmosDBRowConverter$.MODULE$.internalRowToJSONObject((InternalRow) d, this.schema$1).toString());
        if (this.upsert$1) {
            this.createDocumentObs$1.elem = this.asyncConnection$1.upsertDocument(document, null);
        } else {
            this.createDocumentObs$1.elem = this.asyncConnection$1.createDocument(document, null);
        }
        ((ArrayList) this.observables$1.elem).add((Observable) this.createDocumentObs$1.elem);
        this.batchSize$1.elem++;
        if (this.batchSize$1.elem % this.writingBatchSize$1 == 0) {
            Observable.merge((ArrayList) this.observables$1.elem).toBlocking().last();
            if (this.writingBatchDelayMs$1 > 0) {
                TimeUnit.MILLISECONDS.sleep(this.writingBatchDelayMs$1);
            }
            ((ArrayList) this.observables$1.elem).clear();
            this.batchSize$1.elem = 0;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* renamed from: apply, reason: collision with other method in class */
    public final /* bridge */ /* synthetic */ Object m1358apply(Object obj) {
        apply((StreamingWriteTask$$anonfun$importStreamingData$1<D>) obj);
        return BoxedUnit.UNIT;
    }

    public StreamingWriteTask$$anonfun$importStreamingData$1(StreamingWriteTask streamingWriteTask, StructType structType, boolean z, int i, int i2, AsyncCosmosDBConnection asyncCosmosDBConnection, ObjectRef objectRef, ObjectRef objectRef2, IntRef intRef) {
        this.schema$1 = structType;
        this.upsert$1 = z;
        this.writingBatchSize$1 = i;
        this.writingBatchDelayMs$1 = i2;
        this.asyncConnection$1 = asyncCosmosDBConnection;
        this.observables$1 = objectRef;
        this.createDocumentObs$1 = objectRef2;
        this.batchSize$1 = intRef;
    }
}
