package org.apache.spark.sql.connect.service;

import org.apache.spark.SparkEnv$;
import org.apache.spark.connect.proto.ExecutePlanResponse;
import org.apache.spark.connect.proto.Expression;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connect.common.DataTypeProtoConverter$;
import org.apache.spark.sql.connect.common.LiteralValueProtoConverter$;
import org.apache.spark.sql.connect.config.Connect$;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.arrow.ArrowConverters;
import org.apache.spark.sql.execution.arrow.ArrowConverters$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.ThreadUtils$;
import org.sparkproject.connect.grpc.stub.StreamObserver;
import org.sparkproject.connect.protobuf.ByteString;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Iterable;
import scala.collection.immutable.Iterable$;
import scala.collection.mutable.ArrayOps;
import scala.math.Numeric$IntIsIntegral$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.util.Try;

/* compiled from: SparkConnectStreamHandler.scala */
/* loaded from: input_file:org/apache/spark/sql/connect/service/SparkConnectStreamHandler$.class */
public final class SparkConnectStreamHandler$ {
    public static SparkConnectStreamHandler$ MODULE$;

    static {
        new SparkConnectStreamHandler$();
    }

    public Function1<Iterator<InternalRow>, Iterator<Tuple2<byte[], Object>>> rowToArrowConverter(StructType structType, int i, long j, String str) {
        return iterator -> {
            ArrowConverters.ArrowBatchWithSchemaIterator batchWithSchemaIterator = ArrowConverters$.MODULE$.toBatchWithSchemaIterator(iterator, structType, i, j, str);
            return batchWithSchemaIterator.map(bArr -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(bArr), BoxesRunTime.boxToLong(batchWithSchemaIterator.rowCountInLastBatch()));
            });
        };
    }

    public void processAsArrowBatches(String str, Dataset<Row> dataset, StreamObserver<ExecutePlanResponse> streamObserver) {
        SparkSession sparkSession = dataset.sparkSession();
        StructType schema = dataset.schema();
        int arrowMaxRecordsPerBatch = sparkSession.sessionState().conf().arrowMaxRecordsPerBatch();
        String sessionLocalTimeZone = sparkSession.sessionState().conf().sessionLocalTimeZone();
        long unboxToLong = (long) (BoxesRunTime.unboxToLong(SparkEnv$.MODULE$.get().conf().get(Connect$.MODULE$.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE())) * 0.7d);
        SQLExecution$.MODULE$.withNewExecutionId(dataset.queryExecution(), new Some("collectArrow"), () -> {
            Object[] objArr;
            RDD execute = dataset.queryExecution().executedPlan().execute();
            int numPartitions = execute.getNumPartitions();
            IntRef create = IntRef.create(0);
            if (numPartitions > 0) {
                RDD mapPartitionsInternal = execute.mapPartitionsInternal(MODULE$.rowToArrowConverter(schema, arrowMaxRecordsPerBatch, unboxToLong, sessionLocalTimeZone), execute.mapPartitionsInternal$default$2(), ClassTag$.MODULE$.apply(Tuple2.class));
                Object obj = new Object();
                ?? r0 = new Tuple2[numPartitions];
                ObjectRef create2 = ObjectRef.create(None$.MODULE$);
                sparkSession.sparkContext().submitJob(mapPartitionsInternal, iterator -> {
                    return (Tuple2[]) iterator.toArray(ClassTag$.MODULE$.apply(Tuple2.class));
                }, Seq$.MODULE$.range(BoxesRunTime.boxToInteger(0), BoxesRunTime.boxToInteger(numPartitions), Numeric$IntIsIntegral$.MODULE$), (obj2, tuple2Arr) -> {
                    $anonfun$processAsArrowBatches$2(obj, r0, BoxesRunTime.unboxToInt(obj2), tuple2Arr);
                    return BoxedUnit.UNIT;
                }, () -> {
                    return () -> {
                    };
                }).onComplete(r6 -> {
                    $anonfun$processAsArrowBatches$6(obj, create2, r6);
                    return BoxedUnit.UNIT;
                }, ThreadUtils$.MODULE$.sameThread());
                int i = 0;
                while (true) {
                    int i2 = i;
                    if (i2 >= numPartitions) {
                        break;
                    }
                    ?? r02 = obj;
                    synchronized (r02) {
                        r02 = r0[i2];
                        Object[] objArr2 = r02;
                        while (objArr2 == null && ((Option) create2.elem).isEmpty()) {
                            obj.wait();
                            objArr2 = r0[i2];
                        }
                        r0[i2] = 0;
                        ((Option) create2.elem).foreach(th -> {
                            throw th;
                        });
                        objArr = objArr2;
                    }
                    new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(objArr)).foreach(tuple2 -> {
                        $anonfun$processAsArrowBatches$9(str, streamObserver, create, tuple2);
                        return BoxedUnit.UNIT;
                    });
                    i = i2 + 1;
                }
            }
            if (create.elem == 0) {
                byte[] createEmptyArrowBatch = ArrowConverters$.MODULE$.createEmptyArrowBatch(schema, sessionLocalTimeZone);
                ExecutePlanResponse.Builder sessionId = ExecutePlanResponse.newBuilder().setSessionId(str);
                sessionId.setArrowBatch(ExecutePlanResponse.ArrowBatch.newBuilder().setRowCount(0L).setData(ByteString.copyFrom(createEmptyArrowBatch)).build());
                streamObserver.onNext(sessionId.build());
            }
        });
    }

    public ExecutePlanResponse sendSchemaToResponse(String str, StructType structType) {
        return ExecutePlanResponse.newBuilder().setSessionId(str).setSchema(DataTypeProtoConverter$.MODULE$.toConnectProtoType(structType)).build();
    }

    public ExecutePlanResponse sendMetricsToResponse(String str, Dataset<Row> dataset) {
        return ExecutePlanResponse.newBuilder().setSessionId(str).setMetrics(MetricGenerator$.MODULE$.buildMetrics(dataset.queryExecution().executedPlan())).build();
    }

    public ExecutePlanResponse sendObservedMetricsToResponse(String str, Dataset<Row> dataset) {
        return ExecutePlanResponse.newBuilder().setSessionId(str).addAllObservedMetrics((Iterable) JavaConverters$.MODULE$.asJavaIterableConverter((Iterable) dataset.queryExecution().observedMetrics().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str2 = (String) tuple2._1();
            Row row = (Row) tuple2._2();
            return ExecutePlanResponse.ObservedMetrics.newBuilder().setName(str2).addAllValues((Iterable) JavaConverters$.MODULE$.seqAsJavaListConverter((IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), row.length()).map(obj -> {
                return $anonfun$sendObservedMetricsToResponse$2(row, BoxesRunTime.unboxToInt(obj));
            }, IndexedSeq$.MODULE$.canBuildFrom())).asJava()).build();
        }, Iterable$.MODULE$.canBuildFrom())).asJava()).build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static final /* synthetic */ void $anonfun$processAsArrowBatches$2(Object obj, Tuple2[][] tuple2Arr, int i, Tuple2[] tuple2Arr2) {
        synchronized (obj) {
            tuple2Arr[i] = tuple2Arr2;
            obj.notify();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static final /* synthetic */ void $anonfun$processAsArrowBatches$7(Object obj, ObjectRef objectRef, Throwable th) {
        synchronized (obj) {
            objectRef.elem = new Some(th);
            obj.notify();
        }
    }

    public static final /* synthetic */ void $anonfun$processAsArrowBatches$6(Object obj, ObjectRef objectRef, Try r6) {
        r6.failed().foreach(th -> {
            $anonfun$processAsArrowBatches$7(obj, objectRef, th);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$processAsArrowBatches$9(String str, StreamObserver streamObserver, IntRef intRef, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        byte[] bArr = (byte[]) tuple2._1();
        long _2$mcJ$sp = tuple2._2$mcJ$sp();
        ExecutePlanResponse.Builder sessionId = ExecutePlanResponse.newBuilder().setSessionId(str);
        sessionId.setArrowBatch(ExecutePlanResponse.ArrowBatch.newBuilder().setRowCount(_2$mcJ$sp).setData(ByteString.copyFrom(bArr)).build());
        streamObserver.onNext(sessionId.build());
        intRef.elem++;
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Expression.Literal $anonfun$sendObservedMetricsToResponse$2(Row row, int i) {
        return LiteralValueProtoConverter$.MODULE$.toLiteralProto(row.apply(i));
    }

    private SparkConnectStreamHandler$() {
        MODULE$ = this;
    }
}
