/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.comet.execution.shuffle;

import java.io.File;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.comet.CometConf$;
import org.apache.comet.CometExecIterator;
import org.apache.comet.serde.OperatorOuterClass;
import org.apache.comet.serde.PartitioningOuterClass;
import org.apache.comet.serde.QueryPlanSerde$;
import org.apache.spark.SparkEnv$;
import org.apache.spark.TaskContext;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning;
import org.apache.spark.sql.catalyst.plans.physical.Partitioning;
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition$;
import org.apache.spark.sql.comet.CometExec$;
import org.apache.spark.sql.comet.CometMetricNode;
import org.apache.spark.sql.comet.CometMetricNode$;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.execution.metric.SQLMetric$;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import scala.Array$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Product2;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.MapLike;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0001\u0005=f\u0001\u0002\f\u0018\u0001\u0019B\u0001B\u0010\u0001\u0003\u0002\u0003\u0006Ia\u0010\u0005\t\u0013\u0002\u0011\t\u0011)A\u0005\u0015\"AA\f\u0001B\u0001B\u0003%Q\f\u0003\u0005p\u0001\t\u0005\t\u0015!\u0003q\u0011!\u0019\bA!A!\u0002\u0013\u0001\b\u0002\u0003;\u0001\u0005\u0003\u0005\u000b\u0011B;\t\u0011a\u0004!\u0011!Q\u0001\neD\u0001\" \u0001\u0003\u0002\u0003\u0006IA \u0005\b\u0003\u0007\u0001A\u0011AA\u0003\u0011%\tY\u0002\u0001b\u0001\n\u0013\ti\u0002C\u0004\u0002 \u0001\u0001\u000b\u0011\u00029\t\u0017\u0005\u0005\u0002\u00011AA\u0002\u0013\u0005\u00111\u0005\u0005\f\u0003W\u0001\u0001\u0019!a\u0001\n\u0003\ti\u0003C\u0006\u0002:\u0001\u0001\r\u0011!Q!\n\u0005\u0015\u0002bCA\u001e\u0001\u0001\u0007\t\u0019!C\u0001\u0003{A1\"a\u0013\u0001\u0001\u0004\u0005\r\u0011\"\u0001\u0002N!Y\u0011\u0011\u000b\u0001A\u0002\u0003\u0005\u000b\u0015BA \u0011\u001d\t\u0019\u0006\u0001C!\u0003+Bq!a\u001a\u0001\t\u0013\tI\u0007C\u0004\u0002\u0018\u0002!\t%!'\t\u000f\u0005-\u0006\u0001\"\u0011\u0002.\nA2i\\7fi:\u000bG/\u001b<f'\",hM\u001a7f/JLG/\u001a:\u000b\u0005aI\u0012aB:ik\u001a4G.\u001a\u0006\u00035m\t\u0011\"\u001a=fGV$\u0018n\u001c8\u000b\u0005qi\u0012!B2p[\u0016$(B\u0001\u0010 \u0003\r\u0019\u0018\u000f\u001c\u0006\u0003A\u0005\nQa\u001d9be.T!AI\u0012\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005!\u0013aA8sO\u000e\u0001QcA\u00140yM\u0011\u0001\u0001\u000b\t\u0005S-j3(D\u0001+\u0015\tAr$\u0003\u0002-U\ti1\u000b[;gM2,wK]5uKJ\u0004\"AL\u0018\r\u0001\u0011)\u0001\u0007\u0001b\u0001c\t\t1*\u0005\u00023qA\u00111GN\u0007\u0002i)\tQ'A\u0003tG\u0006d\u0017-\u0003\u00028i\t9aj\u001c;iS:<\u0007CA\u001a:\u0013\tQDGA\u0002B]f\u0004\"A\f\u001f\u0005\u000bu\u0002!\u0019A\u0019\u0003\u0003Y\u000b!c\\;uaV$\b+\u0019:uSRLwN\\5oOB\u0011\u0001iR\u0007\u0002\u0003*\u0011!iQ\u0001\ta\"L8/[2bY*\u0011A)R\u0001\u0006a2\fgn\u001d\u0006\u0003\rv\t\u0001bY1uC2L8\u000f^\u0005\u0003\u0011\u0006\u0013A\u0002U1si&$\u0018n\u001c8j]\u001e\f\u0001c\\;uaV$\u0018\t\u001e;sS\n,H/Z:\u0011\u0007-\u001bfK\u0004\u0002M#:\u0011Q\nU\u0007\u0002\u001d*\u0011q*J\u0001\u0007yI|w\u000e\u001e \n\u0003UJ!A\u0015\u001b\u0002\u000fA\f7m[1hK&\u0011A+\u0016\u0002\u0004'\u0016\f(B\u0001*5!\t9&,D\u0001Y\u0015\tIV)A\u0006fqB\u0014Xm]:j_:\u001c\u0018BA.Y\u0005%\tE\u000f\u001e:jEV$X-A\u0004nKR\u0014\u0018nY:\u0011\ty\u0013W\r\u001b\b\u0003?\u0002\u0004\"!\u0014\u001b\n\u0005\u0005$\u0014A\u0002)sK\u0012,g-\u0003\u0002dI\n\u0019Q*\u00199\u000b\u0005\u0005$\u0004C\u00010g\u0013\t9GM\u0001\u0004TiJLgn\u001a\t\u0003S6l\u0011A\u001b\u0006\u0003W2\fa!\\3ue&\u001c'B\u0001\u000e\u001e\u0013\tq'NA\u0005T#2kU\r\u001e:jG\u0006Aa.^7QCJ$8\u000f\u0005\u00024c&\u0011!\u000f\u000e\u0002\u0004\u0013:$\u0018!C:ik\u001a4G.Z%e\u0003\u0015i\u0017\r]%e!\t\u0019d/\u0003\u0002xi\t!Aj\u001c8h\u0003\u001d\u0019wN\u001c;fqR\u0004\"A_>\u000e\u0003}I!\u0001`\u0010\u0003\u0017Q\u000b7o[\"p]R,\u0007\u0010^\u0001\u0010[\u0016$(/[2t%\u0016\u0004xN\u001d;feB\u0011\u0011f`\u0005\u0004\u0003\u0003Q#aG*ik\u001a4G.Z,sSR,W*\u001a;sS\u000e\u001c(+\u001a9peR,'/\u0001\u0004=S:LGO\u0010\u000b\u0013\u0003\u000f\tY!!\u0004\u0002\u0010\u0005E\u00111CA\u000b\u0003/\tI\u0002E\u0003\u0002\n\u0001i3(D\u0001\u0018\u0011\u0015q\u0014\u00021\u0001@\u0011\u0015I\u0015\u00021\u0001K\u0011\u0015a\u0016\u00021\u0001^\u0011\u0015y\u0017\u00021\u0001q\u0011\u0015\u0019\u0018\u00021\u0001q\u0011\u0015!\u0018\u00021\u0001v\u0011\u0015A\u0018\u00021\u0001z\u0011\u0015i\u0018\u00021\u0001\u007f\u00035yeIR*F)~cUIT$U\u0011V\t\u0001/\u0001\bP\r\u001a\u001bV\tV0M\u000b:;E\u000b\u0013\u0011\u0002!A\f'\u000f^5uS>tG*\u001a8hi\"\u001cXCAA\u0013!\u0011\u0019\u0014qE;\n\u0007\u0005%BGA\u0003BeJ\f\u00170\u0001\u000bqCJ$\u0018\u000e^5p]2+gn\u001a;ig~#S-\u001d\u000b\u0005\u0003_\t)\u0004E\u00024\u0003cI1!a\r5\u0005\u0011)f.\u001b;\t\u0013\u0005]R\"!AA\u0002\u0005\u0015\u0012a\u0001=%c\u0005\t\u0002/\u0019:uSRLwN\u001c'f]\u001e$\bn\u001d\u0011\u0002\u00135\f\u0007o\u0015;biV\u001cXCAA !\u0011\t\t%a\u0012\u000e\u0005\u0005\r#bAA#?\u0005I1o\u00195fIVdWM]\u0005\u0005\u0003\u0013\n\u0019EA\u0005NCB\u001cF/\u0019;vg\u0006iQ.\u00199Ti\u0006$Xo]0%KF$B!a\f\u0002P!I\u0011q\u0007\t\u0002\u0002\u0003\u0007\u0011qH\u0001\u000b[\u0006\u00048\u000b^1ukN\u0004\u0013!B<sSR,G\u0003BA\u0018\u0003/Bq!!\u0017\u0013\u0001\u0004\tY&\u0001\u0004j]B,Ho\u001d\t\u0006\u0017\u0006u\u0013\u0011M\u0005\u0004\u0003?*&\u0001C%uKJ\fGo\u001c:\u0011\u000bM\n\u0019'L\u001e\n\u0007\u0005\u0015DG\u0001\u0005Qe>$Wo\u0019;3\u000359W\r\u001e(bi&4X\r\u00157b]R1\u00111NAH\u0003'\u0003B!!\u001c\u0002\n:!\u0011qNAB\u001d\u0011\t\t(! \u000f\t\u0005M\u00141\u0010\b\u0005\u0003k\nIHD\u0002N\u0003oJ\u0011\u0001J\u0005\u0003E\rJ!\u0001H\u0011\n\t\u0005}\u0014\u0011Q\u0001\u0006g\u0016\u0014H-\u001a\u0006\u00039\u0005JA!!\"\u0002\b\u0006\u0011r\n]3sCR|'oT;uKJ\u001cE.Y:t\u0015\u0011\ty(!!\n\t\u0005-\u0015Q\u0012\u0002\t\u001fB,'/\u0019;pe*!\u0011QQAD\u0011\u0019\t\tj\u0005a\u0001K\u0006AA-\u0019;b\r&dW\r\u0003\u0004\u0002\u0016N\u0001\r!Z\u0001\nS:$W\r\u001f$jY\u0016\fAa\u001d;paR!\u00111TAQ!\u0015\u0019\u0014QTA \u0013\r\ty\n\u000e\u0002\u0007\u001fB$\u0018n\u001c8\t\u000f\u0005\rF\u00031\u0001\u0002&\u000691/^2dKN\u001c\bcA\u001a\u0002(&\u0019\u0011\u0011\u0016\u001b\u0003\u000f\t{w\u000e\\3b]\u0006\u0019r-\u001a;QCJ$\u0018\u000e^5p]2+gn\u001a;igR\u0011\u0011Q\u0005")
public class CometNativeShuffleWriter<K, V>
extends ShuffleWriter<K, V> {
    private final Partitioning outputPartitioning;
    private final Seq<Attribute> outputAttributes;
    private final Map<String, SQLMetric> metrics;
    private final int numParts;
    private final int shuffleId;
    private final long mapId;
    private final TaskContext context;
    private final ShuffleWriteMetricsReporter metricsReporter;
    private final int OFFSET_LENGTH;
    private long[] partitionLengths;
    private MapStatus mapStatus;

    private int OFFSET_LENGTH() {
        return this.OFFSET_LENGTH;
    }

    public long[] partitionLengths() {
        return this.partitionLengths;
    }

    public void partitionLengths_$eq(long[] x$1) {
        this.partitionLengths = x$1;
    }

    public MapStatus mapStatus() {
        return this.mapStatus;
    }

    public void mapStatus_$eq(MapStatus x$1) {
        this.mapStatus = x$1;
    }

    public void write(Iterator<Product2<K, V>> inputs) {
        IndexShuffleBlockResolver shuffleBlockResolver = (IndexShuffleBlockResolver)SparkEnv$.MODULE$.get().shuffleManager().shuffleBlockResolver();
        File dataFile = shuffleBlockResolver.getDataFile(this.shuffleId, this.mapId);
        File indexFile = shuffleBlockResolver.getIndexFile(this.shuffleId, this.mapId, shuffleBlockResolver.getIndexFile$default$3());
        String tempDataFilename = dataFile.getPath().replace(".data", ".data.tmp");
        String tempIndexFilename = indexFile.getPath().replace(".index", ".index.tmp");
        Path tempDataFilePath = Paths.get(tempDataFilename, new String[0]);
        Path tempIndexFilePath = Paths.get(tempIndexFilename, new String[0]);
        OperatorOuterClass.Operator nativePlan = this.getNativePlan(tempDataFilename, tempIndexFilename);
        Seq detailedMetrics = (Seq)new .colon.colon((Object)"elapsed_compute", (List)new .colon.colon((Object)"encode_time", (List)new .colon.colon((Object)"repart_time", (List)new .colon.colon((Object)"mempool_time", (List)new .colon.colon((Object)"input_batches", (List)new .colon.colon((Object)"spill_count", (List)new .colon.colon((Object)"spilled_bytes", (List)Nil$.MODULE$)))))));
        SQLMetric metricsOutputRows = new SQLMetric("outputRows", SQLMetric$.MODULE$.$lessinit$greater$default$2());
        SQLMetric metricsWriteTime = new SQLMetric("writeTime", SQLMetric$.MODULE$.$lessinit$greater$default$2());
        Map nativeSQLMetrics = ((MapLike)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"output_rows"), (Object)metricsOutputRows), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"data_size"), this.metrics.apply((Object)"dataSize")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"write_time"), (Object)metricsWriteTime)}))).$plus$plus((GenTraversableOnce)this.metrics.filterKeys((Function1 & Serializable & scala.Serializable)elem -> BoxesRunTime.boxToBoolean((boolean)detailedMetrics.contains(elem))));
        CometMetricNode nativeMetrics = CometMetricNode$.MODULE$.apply((Map<String, SQLMetric>)nativeSQLMetrics);
        Iterator newInputs = inputs.map((Function1 & Serializable & scala.Serializable)x$1 -> x$1._2());
        CometExecIterator cometIter = CometExec$.MODULE$.getCometIterator((Seq<Iterator<ColumnarBatch>>)((Seq)new .colon.colon((Object)newInputs, (List)Nil$.MODULE$)), this.outputAttributes.length(), nativePlan, nativeMetrics, this.numParts, this.context.partitionId());
        while (cometIter.hasNext()) {
            cometIter.next();
        }
        cometIter.close();
        LongRef offset = LongRef.create((long)0L);
        this.partitionLengths_$eq((long[])new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(Files.readAllBytes(tempIndexFilePath))).grouped(this.OFFSET_LENGTH()).drop(1).map((Function1 & Serializable & scala.Serializable)indexBytes -> BoxesRunTime.boxToLong((long)CometNativeShuffleWriter.$anonfun$write$3(offset, indexBytes))).toArray(ClassTag$.MODULE$.Long()));
        Files.delete(tempIndexFilePath);
        this.metricsReporter.incBytesWritten(Files.size(tempDataFilePath));
        this.metricsReporter.incRecordsWritten(metricsOutputRows.value());
        this.metricsReporter.incWriteTime(metricsWriteTime.value());
        shuffleBlockResolver.writeMetadataFileAndCommit(this.shuffleId, this.mapId, this.partitionLengths(), (long[])Array$.MODULE$.empty(ClassTag$.MODULE$.Long()), tempDataFilePath.toFile());
        this.mapStatus_$eq(MapStatus$.MODULE$.apply(SparkEnv$.MODULE$.get().blockManager().shuffleServerId(), this.partitionLengths(), this.mapId));
    }

    private OperatorOuterClass.Operator getNativePlan(String dataFile, String indexFile) {
        OperatorOuterClass.Scan.Builder scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("ShuffleWriterInput");
        OperatorOuterClass.Operator.Builder opBuilder = OperatorOuterClass.Operator.newBuilder();
        Seq scanTypes = (Seq)this.outputAttributes.flatten((Function1 & Serializable & scala.Serializable)attr -> Option$.MODULE$.option2Iterable(QueryPlanSerde$.MODULE$.serializeDataType(attr.dataType())));
        if (scanTypes.length() == this.outputAttributes.length()) {
            OperatorOuterClass.ShuffleWriter.Builder builder;
            scanBuilder.addAllFields((java.lang.Iterable)JavaConverters$.MODULE$.asJavaIterableConverter((Iterable)scanTypes).asJava());
            OperatorOuterClass.ShuffleWriter.Builder shuffleWriterBuilder = OperatorOuterClass.ShuffleWriter.newBuilder();
            shuffleWriterBuilder.setOutputDataFile(dataFile);
            shuffleWriterBuilder.setOutputIndexFile(indexFile);
            shuffleWriterBuilder.setEnableFastEncoding(BoxesRunTime.unboxToBoolean((Object)CometConf$.MODULE$.COMET_SHUFFLE_ENABLE_FAST_ENCODING().get()));
            if (SparkEnv$.MODULE$.get().conf().getBoolean("spark.shuffle.compress", true)) {
                OperatorOuterClass.CompressionCodec compressionCodec;
                String string = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get();
                if ("zstd".equals(string)) {
                    compressionCodec = OperatorOuterClass.CompressionCodec.Zstd;
                } else if ("lz4".equals(string)) {
                    compressionCodec = OperatorOuterClass.CompressionCodec.Lz4;
                } else if ("snappy".equals(string)) {
                    compressionCodec = OperatorOuterClass.CompressionCodec.Snappy;
                } else {
                    throw new UnsupportedOperationException(new StringBuilder(15).append("invalid codec: ").append(string).toString());
                }
                OperatorOuterClass.CompressionCodec codec = compressionCodec;
                builder = shuffleWriterBuilder.setCodec(codec);
            } else {
                builder = shuffleWriterBuilder.setCodec(OperatorOuterClass.CompressionCodec.None);
            }
            shuffleWriterBuilder.setCompressionLevel(BoxesRunTime.unboxToInt((Object)CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL().get()));
            Partitioning partitioning = this.outputPartitioning;
            if (partitioning instanceof HashPartitioning) {
                HashPartitioning hashPartitioning = (HashPartitioning)this.outputPartitioning;
                PartitioningOuterClass.HashRepartition.Builder partitioning2 = PartitioningOuterClass.HashRepartition.newBuilder();
                partitioning2.setNumPartitions(this.outputPartitioning.numPartitions());
                Seq partitionExprs = (Seq)hashPartitioning.expressions().flatMap((Function1 & Serializable & scala.Serializable)e -> Option$.MODULE$.option2Iterable(QueryPlanSerde$.MODULE$.exprToProto((Expression)e, $this.outputAttributes, QueryPlanSerde$.MODULE$.exprToProto$default$3())), Seq$.MODULE$.canBuildFrom());
                if (partitionExprs.length() != hashPartitioning.expressions().length()) {
                    throw new UnsupportedOperationException(new StringBuilder(31).append("Partitioning ").append(hashPartitioning).append(" is not supported.").toString());
                }
                partitioning2.addAllHashExpression((java.lang.Iterable)JavaConverters$.MODULE$.asJavaIterableConverter((Iterable)partitionExprs).asJava());
                PartitioningOuterClass.Partitioning.Builder partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder();
                shuffleWriterBuilder.setPartitioning(partitioningBuilder.setHashPartition(partitioning2).build());
            } else if (SinglePartition$.MODULE$.equals(partitioning)) {
                PartitioningOuterClass.SinglePartition.Builder partitioning3 = PartitioningOuterClass.SinglePartition.newBuilder();
                PartitioningOuterClass.Partitioning.Builder partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder();
                shuffleWriterBuilder.setPartitioning(partitioningBuilder.setSinglePartition(partitioning3).build());
            } else {
                throw new UnsupportedOperationException(new StringBuilder(31).append("Partitioning ").append(this.outputPartitioning).append(" is not supported.").toString());
            }
            OperatorOuterClass.Operator.Builder shuffleWriterOpBuilder = OperatorOuterClass.Operator.newBuilder();
            return shuffleWriterOpBuilder.setShuffleWriter(shuffleWriterBuilder).addChildren(opBuilder.setScan(scanBuilder).build()).build();
        }
        throw new UnsupportedOperationException(new StringBuilder(62).append(this.outputAttributes).append(" contains unsupported data types for CometShuffleExchangeExec.").toString());
    }

    public Option<MapStatus> stop(boolean success) {
        if (success) {
            return new Some((Object)this.mapStatus());
        }
        return None$.MODULE$;
    }

    public long[] getPartitionLengths() {
        return this.partitionLengths();
    }

    public static final /* synthetic */ long $anonfun$write$3(LongRef offset$1, byte[] indexBytes) {
        long partitionOffset = ByteBuffer.wrap(indexBytes).order(ByteOrder.LITTLE_ENDIAN).getLong();
        long partitionLength = partitionOffset - offset$1.elem;
        offset$1.elem = partitionOffset;
        return partitionLength;
    }

    public CometNativeShuffleWriter(Partitioning outputPartitioning, Seq<Attribute> outputAttributes, Map<String, SQLMetric> metrics, int numParts, int shuffleId, long mapId, TaskContext context, ShuffleWriteMetricsReporter metricsReporter) {
        this.outputPartitioning = outputPartitioning;
        this.outputAttributes = outputAttributes;
        this.metrics = metrics;
        this.numParts = numParts;
        this.shuffleId = shuffleId;
        this.mapId = mapId;
        this.context = context;
        this.metricsReporter = metricsReporter;
        this.OFFSET_LENGTH = 8;
    }
}

