/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.comet.execution.shuffle;

import java.io.File;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.comet.CometExecIterator;
import org.apache.comet.serde.OperatorOuterClass;
import org.apache.comet.serde.PartitioningOuterClass;
import org.apache.comet.serde.QueryPlanSerde$;
import org.apache.spark.Partition;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkEnv$;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriteProcessor;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning;
import org.apache.spark.sql.catalyst.plans.physical.Partitioning;
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition$;
import org.apache.spark.sql.comet.CometExec$;
import org.apache.spark.sql.comet.CometMetricNode;
import org.apache.spark.sql.comet.CometMetricNode$;
import org.apache.spark.sql.comet.shims.ShimCometShuffleWriteProcessor;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter;
import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter$;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import scala.Array$;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005\u0005Ud\u0001\u0002\u0006\f\u0001iA\u0001B\n\u0001\u0003\u0002\u0003\u0006Ia\n\u0005\tc\u0001\u0011\t\u0011)A\u0005e!Aa\t\u0001B\u0001B\u0003%q\tC\u0003Z\u0001\u0011\u0005!\fC\u0004a\u0001\t\u0007I\u0011B1\t\r\u0019\u0004\u0001\u0015!\u0003c\u0011\u00159\u0007\u0001\"\u0015i\u0011\u0015\u0011\b\u0001\"\u0011t\u0011\u001d\t)\u0005\u0001C\u0001\u0003\u000f\u0012!dQ8nKR\u001c\u0006.\u001e4gY\u0016<&/\u001b;f!J|7-Z:t_JT!\u0001D\u0007\u0002\u000fMDWO\u001a4mK*\u0011abD\u0001\nKb,7-\u001e;j_:T!\u0001E\t\u0002\u000b\r|W.\u001a;\u000b\u0005I\u0019\u0012aA:rY*\u0011A#F\u0001\u0006gB\f'o\u001b\u0006\u0003-]\ta!\u00199bG\",'\"\u0001\r\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0007\u0001Y\u0002\u0005\u0005\u0002\u001d=5\tQD\u0003\u0002\r'%\u0011q$\b\u0002\u0016'\",hM\u001a7f/JLG/\u001a)s_\u000e,7o]8s!\t\tC%D\u0001#\u0015\t\u0019s\"A\u0003tQ&l7/\u0003\u0002&E\tq2\u000b[5n\u0007>lW\r^*ik\u001a4G.Z,sSR,\u0007K]8dKN\u001cxN]\u0001\u0013_V$\b/\u001e;QCJ$\u0018\u000e^5p]&tw\r\u0005\u0002)_5\t\u0011F\u0003\u0002+W\u0005A\u0001\u000f[=tS\u000e\fGN\u0003\u0002-[\u0005)\u0001\u000f\\1og*\u0011a&E\u0001\tG\u0006$\u0018\r\\=ti&\u0011\u0001'\u000b\u0002\r!\u0006\u0014H/\u001b;j_:LgnZ\u0001\u0011_V$\b/\u001e;BiR\u0014\u0018NY;uKN\u00042aM\u001fA\u001d\t!$H\u0004\u00026q5\taG\u0003\u000283\u00051AH]8pizJ\u0011!O\u0001\u0006g\u000e\fG.Y\u0005\u0003wq\nq\u0001]1dW\u0006<WMC\u0001:\u0013\tqtHA\u0002TKFT!a\u000f\u001f\u0011\u0005\u0005#U\"\u0001\"\u000b\u0005\rk\u0013aC3yaJ,7o]5p]NL!!\u0012\"\u0003\u0013\u0005#HO]5ckR,\u0017aB7fiJL7m\u001d\t\u0005\u00112{%K\u0004\u0002J\u0015B\u0011Q\u0007P\u0005\u0003\u0017r\na\u0001\u0015:fI\u00164\u0017BA'O\u0005\ri\u0015\r\u001d\u0006\u0003\u0017r\u0002\"\u0001\u0013)\n\u0005Es%AB*ue&tw\r\u0005\u0002T/6\tAK\u0003\u0002V-\u00061Q.\u001a;sS\u000eT!AD\t\n\u0005a#&!C*R\u00196+GO]5d\u0003\u0019a\u0014N\\5u}Q!1,\u00180`!\ta\u0006!D\u0001\f\u0011\u00151C\u00011\u0001(\u0011\u0015\tD\u00011\u00013\u0011\u00151E\u00011\u0001H\u00035yeIR*F)~cUIT$U\u0011V\t!\r\u0005\u0002dI6\tA(\u0003\u0002fy\t\u0019\u0011J\u001c;\u0002\u001d=3eiU#U?2+ej\u0012+IA\u0005)2M]3bi\u0016lU\r\u001e:jGN\u0014V\r]8si\u0016\u0014HCA5m!\ta\".\u0003\u0002l;\tY2\u000b[;gM2,wK]5uK6+GO]5dgJ+\u0007o\u001c:uKJDQ!\\\u0004A\u00029\fqaY8oi\u0016DH\u000f\u0005\u0002pa6\t1#\u0003\u0002r'\tYA+Y:l\u0007>tG/\u001a=u\u0003\u00159(/\u001b;f))!(0a\u0006\u00026\u0005}\u00121\t\t\u0003kbl\u0011A\u001e\u0006\u0003oN\t\u0011b]2iK\u0012,H.\u001a:\n\u0005e4(!C'baN#\u0018\r^;t\u0011\u0015Y\b\u00021\u0001}\u0003\u0019Ig\u000e];ugB\u001aQ0!\u0002\u0011\tMr\u0018\u0011A\u0005\u0003\u007f~\u0012\u0001\"\u0013;fe\u0006$xN\u001d\t\u0005\u0003\u0007\t)\u0001\u0004\u0001\u0005\u0017\u0005\u001d!0!A\u0001\u0002\u000b\u0005\u0011\u0011\u0002\u0002\u0004?\u0012*\u0014\u0003BA\u0006\u0003#\u00012aYA\u0007\u0013\r\ty\u0001\u0010\u0002\b\u001d>$\b.\u001b8h!\r\u0019\u00171C\u0005\u0004\u0003+a$aA!os\"9\u0011\u0011\u0004\u0005A\u0002\u0005m\u0011a\u00013faBB\u0011QDA\u0013\u0003W\t\t\u0004E\u0005p\u0003?\t\u0019#!\u000b\u00020%\u0019\u0011\u0011E\n\u0003#MCWO\u001a4mK\u0012+\u0007/\u001a8eK:\u001c\u0017\u0010\u0005\u0003\u0002\u0004\u0005\u0015B\u0001DA\u0014\u0003/\t\t\u0011!A\u0003\u0002\u0005%!aA0%mA!\u00111AA\u0016\t1\ti#a\u0006\u0002\u0002\u0003\u0005)\u0011AA\u0005\u0005\ryFe\u000e\t\u0005\u0003\u0007\t\t\u0004\u0002\u0007\u00024\u0005]\u0011\u0011!A\u0001\u0006\u0003\tIAA\u0002`IaBq!a\u000e\t\u0001\u0004\tI$A\u0003nCBLE\rE\u0002d\u0003wI1!!\u0010=\u0005\u0011auN\\4\t\r\u0005\u0005\u0003\u00021\u0001c\u0003!i\u0017\r]%oI\u0016D\b\"B7\t\u0001\u0004q\u0017!D4fi:\u000bG/\u001b<f!2\fg\u000e\u0006\u0004\u0002J\u00055\u0014\u0011\u000f\t\u0005\u0003\u0017\n9G\u0004\u0003\u0002N\u0005\u0005d\u0002BA(\u00037rA!!\u0015\u0002Z9!\u00111KA,\u001d\r)\u0014QK\u0005\u00021%\u0011acF\u0005\u0003!UIA!!\u0018\u0002`\u0005)1/\u001a:eK*\u0011\u0001#F\u0005\u0005\u0003G\n)'\u0001\nPa\u0016\u0014\u0018\r^8s\u001fV$XM]\"mCN\u001c(\u0002BA/\u0003?JA!!\u001b\u0002l\tAq\n]3sCR|'O\u0003\u0003\u0002d\u0005\u0015\u0004BBA8\u0013\u0001\u0007q*\u0001\u0005eCR\fg)\u001b7f\u0011\u0019\t\u0019(\u0003a\u0001\u001f\u0006I\u0011N\u001c3fq\u001aKG.\u001a")
public class CometShuffleWriteProcessor
extends ShuffleWriteProcessor
implements ShimCometShuffleWriteProcessor {
    private final Partitioning outputPartitioning;
    private final Seq<Attribute> outputAttributes;
    private final Map<String, SQLMetric> metrics;
    private final int OFFSET_LENGTH;

    @Override
    public MapStatus write(RDD<?> rdd, ShuffleDependency<?, ?, ?> dep, long mapId, TaskContext context, Partition partition) {
        return ShimCometShuffleWriteProcessor.write$(this, rdd, dep, mapId, context, partition);
    }

    private int OFFSET_LENGTH() {
        return this.OFFSET_LENGTH;
    }

    public ShuffleWriteMetricsReporter createMetricsReporter(TaskContext context) {
        return new SQLShuffleWriteMetricsReporter((ShuffleWriteMetricsReporter)context.taskMetrics().shuffleWriteMetrics(), this.metrics);
    }

    @Override
    public MapStatus write(Iterator<Object> inputs, ShuffleDependency<?, ?, ?> dep, long mapId, int mapIndex, TaskContext context) {
        ShuffleWriteMetricsReporter metricsReporter = this.createMetricsReporter(context);
        IndexShuffleBlockResolver shuffleBlockResolver = (IndexShuffleBlockResolver)SparkEnv$.MODULE$.get().shuffleManager().shuffleBlockResolver();
        File dataFile = shuffleBlockResolver.getDataFile(dep.shuffleId(), mapId);
        File indexFile = shuffleBlockResolver.getIndexFile(dep.shuffleId(), mapId, shuffleBlockResolver.getIndexFile$default$3());
        String tempDataFilename = dataFile.getPath().replace(".data", ".data.tmp");
        String tempIndexFilename = indexFile.getPath().replace(".index", ".index.tmp");
        Path tempDataFilePath = Paths.get(tempDataFilename, new String[0]);
        Path tempIndexFilePath = Paths.get(tempIndexFilename, new String[0]);
        OperatorOuterClass.Operator nativePlan = this.getNativePlan(tempDataFilename, tempIndexFilename);
        Map nativeSQLMetrics = (Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"output_rows"), this.metrics.apply((Object)SQLShuffleWriteMetricsReporter$.MODULE$.SHUFFLE_RECORDS_WRITTEN())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"data_size"), this.metrics.apply((Object)"dataSize")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"elapsed_compute"), this.metrics.apply((Object)SQLShuffleWriteMetricsReporter$.MODULE$.SHUFFLE_WRITE_TIME()))}));
        CometMetricNode nativeMetrics = CometMetricNode$.MODULE$.apply((Map<String, SQLMetric>)nativeSQLMetrics);
        Iterator newInputs = inputs.map((Function1 & Serializable)x$7 -> x$7._2());
        CometExecIterator cometIter = CometExec$.MODULE$.getCometIterator((Seq<Iterator<ColumnarBatch>>)new .colon.colon((Object)newInputs, (List)Nil$.MODULE$), this.outputAttributes.length(), nativePlan, nativeMetrics);
        while (cometIter.hasNext()) {
            cometIter.next();
        }
        LongRef offset = LongRef.create((long)0L);
        long[] partitionLengths = (long[])ArrayOps$.MODULE$.grouped$extension(Predef$.MODULE$.byteArrayOps(Files.readAllBytes(tempIndexFilePath)), this.OFFSET_LENGTH()).drop(1).map((Function1 & Serializable)indexBytes -> BoxesRunTime.boxToLong((long)CometShuffleWriteProcessor.$anonfun$write$2(offset, indexBytes))).toArray((ClassTag)ClassTag$.MODULE$.Long());
        metricsReporter.incBytesWritten(Files.size(tempDataFilePath));
        shuffleBlockResolver.writeMetadataFileAndCommit(dep.shuffleId(), mapId, partitionLengths, (long[])Array$.MODULE$.empty((ClassTag)ClassTag$.MODULE$.Long()), tempDataFilePath.toFile());
        return MapStatus$.MODULE$.apply(SparkEnv$.MODULE$.get().blockManager().shuffleServerId(), partitionLengths, mapId);
    }

    public OperatorOuterClass.Operator getNativePlan(String dataFile, String indexFile) {
        OperatorOuterClass.Scan.Builder scanBuilder = OperatorOuterClass.Scan.newBuilder();
        OperatorOuterClass.Operator.Builder opBuilder = OperatorOuterClass.Operator.newBuilder();
        Seq scanTypes = (Seq)this.outputAttributes.flatten((Function1 & Serializable)attr -> QueryPlanSerde$.MODULE$.serializeDataType(attr.dataType()));
        if (scanTypes.length() == this.outputAttributes.length()) {
            scanBuilder.addAllFields((java.lang.Iterable)JavaConverters$.MODULE$.asJavaIterableConverter((Iterable)scanTypes).asJava());
            OperatorOuterClass.ShuffleWriter.Builder shuffleWriterBuilder = OperatorOuterClass.ShuffleWriter.newBuilder();
            shuffleWriterBuilder.setOutputDataFile(dataFile);
            shuffleWriterBuilder.setOutputIndexFile(indexFile);
            Partitioning partitioning = this.outputPartitioning;
            if (partitioning instanceof HashPartitioning) {
                HashPartitioning hashPartitioning = (HashPartitioning)this.outputPartitioning;
                PartitioningOuterClass.HashRepartition.Builder partitioning2 = PartitioningOuterClass.HashRepartition.newBuilder();
                partitioning2.setNumPartitions(this.outputPartitioning.numPartitions());
                Seq partitionExprs = (Seq)hashPartitioning.expressions().flatMap((Function1 & Serializable)e -> QueryPlanSerde$.MODULE$.exprToProto((Expression)e, $this.outputAttributes, QueryPlanSerde$.MODULE$.exprToProto$default$3()));
                if (partitionExprs.length() != hashPartitioning.expressions().length()) {
                    throw new UnsupportedOperationException("Partitioning " + hashPartitioning + " is not supported.");
                }
                partitioning2.addAllHashExpression((java.lang.Iterable)JavaConverters$.MODULE$.asJavaIterableConverter((Iterable)partitionExprs).asJava());
                PartitioningOuterClass.Partitioning.Builder partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder();
                shuffleWriterBuilder.setPartitioning(partitioningBuilder.setHashPartition(partitioning2).build());
            } else if (SinglePartition$.MODULE$.equals(partitioning)) {
                PartitioningOuterClass.SinglePartition.Builder partitioning3 = PartitioningOuterClass.SinglePartition.newBuilder();
                PartitioningOuterClass.Partitioning.Builder partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder();
                shuffleWriterBuilder.setPartitioning(partitioningBuilder.setSinglePartition(partitioning3).build());
            } else {
                throw new UnsupportedOperationException("Partitioning " + this.outputPartitioning + " is not supported.");
            }
            OperatorOuterClass.Operator.Builder shuffleWriterOpBuilder = OperatorOuterClass.Operator.newBuilder();
            return shuffleWriterOpBuilder.setShuffleWriter(shuffleWriterBuilder).addChildren(opBuilder.setScan(scanBuilder).build()).build();
        }
        throw new UnsupportedOperationException(this.outputAttributes + " contains unsupported data types for CometShuffleExchangeExec.");
    }

    public static final /* synthetic */ long $anonfun$write$2(LongRef offset$1, byte[] indexBytes) {
        long partitionOffset = ByteBuffer.wrap(indexBytes).order(ByteOrder.LITTLE_ENDIAN).getLong();
        long partitionLength = partitionOffset - offset$1.elem;
        offset$1.elem = partitionOffset;
        return partitionLength;
    }

    public CometShuffleWriteProcessor(Partitioning outputPartitioning, Seq<Attribute> outputAttributes, Map<String, SQLMetric> metrics) {
        this.outputPartitioning = outputPartitioning;
        this.outputAttributes = outputAttributes;
        this.metrics = metrics;
        ShimCometShuffleWriteProcessor.$init$(this);
        this.OFFSET_LENGTH = 8;
    }
}

