package org.apache.spark.sql.execution.datasources.v2;

import org.apache.spark.SparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.PhysicalWriteInfoImpl;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.errors.QueryExecutionErrors$;
import org.apache.spark.util.LongAccumulator;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: WriteDeltaExec.scala */
@ScalaSignature(bytes = "\u0006\u0001y3q!\u0002\u0004\u0011\u0002\u0007\u0005Q\u0003C\u0003\u001f\u0001\u0011\u0005q\u0004C\u0003'\u0001\u0019\u0005q\u0005C\u0003B\u0001\u0011E#\tC\u0006U\u0001A\u0005\u0019\u0011!A\u0005\nUS&\u0001I#yi\u0016tG-\u001a3We\u0015C\u0018n\u001d;j]\u001e$\u0016M\u00197f/JLG/Z#yK\u000eT!a\u0002\u0005\u0002\u0005Y\u0014$BA\u0005\u000b\u0003-!\u0017\r^1t_V\u00148-Z:\u000b\u0005-a\u0011!C3yK\u000e,H/[8o\u0015\tia\"A\u0002tc2T!a\u0004\t\u0002\u000bM\u0004\u0018M]6\u000b\u0005E\u0011\u0012AB1qC\u000eDWMC\u0001\u0014\u0003\ry'oZ\u0002\u0001+\t1RfE\u0002\u0001/m\u0001\"\u0001G\r\u000e\u0003\u0019I!A\u0007\u0004\u0003\u001bY\u00134i\\7nC:$W\t_3d!\tAB$\u0003\u0002\u001e\r\tAbKM#ySN$\u0018N\\4UC\ndWm\u0016:ji\u0016,\u00050Z2\u0002\r\u0011Jg.\u001b;%)\u0005\u0001\u0003CA\u0011%\u001b\u0005\u0011#\"A\u0012\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0015\u0012#\u0001B+oSR\f1b\u001e:ji&tw\rV1tWV\t\u0001\u0006E\u0002\u0019S-J!A\u000b\u0004\u0003!]\u0013\u0018\u000e^5oON\u0003\u0018M]6UCN\\\u0007C\u0001\u0017.\u0019\u0001!QA\f\u0001C\u0002=\u0012\u0011aV\t\u0003aM\u0002\"!I\u0019\n\u0005I\u0012#a\u0002(pi\"Lgn\u001a\t\u0004ieZT\"A\u001b\u000b\u0005Y:\u0014!B<sSR,'B\u0001\u001d\r\u0003%\u0019wN\u001c8fGR|'/\u0003\u0002;k\tQA)\u0019;b/JLG/\u001a:\u0011\u0005qzT\"A\u001f\u000b\u0005yb\u0011\u0001C2bi\u0006d\u0017p\u001d;\n\u0005\u0001k$aC%oi\u0016\u0014h.\u00197S_^\f1b\u001e:ji\u0016<\u0016\u000e\u001e5WeQ\u00111i\u0014\t\u0004\t2[dBA#K\u001d\t1\u0015*D\u0001H\u0015\tAE#\u0001\u0004=e>|GOP\u0005\u0002G%\u00111JI\u0001\ba\u0006\u001c7.Y4f\u0013\tieJA\u0002TKFT!a\u0013\u0012\t\u000bA\u001b\u0001\u0019A)\u0002\u0015\t\fGo\u00195Xe&$X\r\u0005\u00025%&\u00111+\u000e\u0002\u000b\u0005\u0006$8\r[,sSR,\u0017AE:va\u0016\u0014He\u001d9be.\u001cuN\u001c;fqR,\u0012A\u0016\t\u0003/bk\u0011AD\u0005\u00033:\u0011Ab\u00159be.\u001cuN\u001c;fqRL!a\u0017/\u0002\u0019M\u0004\u0018M]6D_:$X\r\u001f;\n\u0005uS!!C*qCJ\\\u0007\u000b\\1o\u0001")
/* loaded from: input_file:org/apache/spark/sql/execution/datasources/v2/ExtendedV2ExistingTableWriteExec.class */
public interface ExtendedV2ExistingTableWriteExec<W extends DataWriter<InternalRow>> extends V2ExistingTableWriteExec {
    /* synthetic */ SparkContext org$apache$spark$sql$execution$datasources$v2$ExtendedV2ExistingTableWriteExec$$super$sparkContext();

    WritingSparkTask<W> writingTask();

    default Seq<InternalRow> writeWithV2(BatchWrite batchWrite) {
        RDD execute = query().execute();
        RDD parallelize = execute.partitions().length == 0 ? org$apache$spark$sql$execution$datasources$v2$ExtendedV2ExistingTableWriteExec$$super$sparkContext().parallelize(Nil$.MODULE$, 1, ClassTag$.MODULE$.apply(InternalRow.class)) : execute;
        WritingSparkTask<W> writingTask = writingTask();
        DataWriterFactory createBatchWriterFactory = batchWrite.createBatchWriterFactory(new PhysicalWriteInfoImpl(parallelize.getNumPartitions()));
        boolean useCommitCoordinator = batchWrite.useCommitCoordinator();
        WriterCommitMessage[] writerCommitMessageArr = new WriterCommitMessage[parallelize.partitions().length];
        LongAccumulator longAccumulator = new LongAccumulator();
        ((Logging) this).logInfo(() -> {
            return new StringBuilder(76).append("Start processing data source write support: ").append(batchWrite).append(". ").append("The input RDD has ").append(writerCommitMessageArr.length).append(" partitions.").toString();
        });
        Map customMetrics = customMetrics();
        try {
            org$apache$spark$sql$execution$datasources$v2$ExtendedV2ExistingTableWriteExec$$super$sparkContext().runJob(parallelize, (taskContext, iterator) -> {
                return writingTask.run(createBatchWriterFactory, taskContext, iterator, useCommitCoordinator, customMetrics);
            }, new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(parallelize.partitions())).indices(), (obj, dataWritingSparkTaskResult) -> {
                $anonfun$writeWithV2$3(writerCommitMessageArr, longAccumulator, batchWrite, BoxesRunTime.unboxToInt(obj), dataWritingSparkTaskResult);
                return BoxedUnit.UNIT;
            }, ClassTag$.MODULE$.apply(DataWritingSparkTaskResult.class));
            ((Logging) this).logInfo(() -> {
                return new StringBuilder(41).append("Data source write support ").append(batchWrite).append(" is committing.").toString();
            });
            batchWrite.commit(writerCommitMessageArr);
            ((Logging) this).logInfo(() -> {
                return new StringBuilder(37).append("Data source write support ").append(batchWrite).append(" committed.").toString();
            });
            commitProgress_$eq(new Some(new StreamWriterCommitProgress(Predef$.MODULE$.Long2long(longAccumulator.value()))));
            return Nil$.MODULE$;
        } catch (Throwable th) {
            ((Logging) this).logError(() -> {
                return new StringBuilder(39).append("Data source write support ").append(batchWrite).append(" is aborting.").toString();
            });
            try {
                batchWrite.abort(writerCommitMessageArr);
                ((Logging) this).logError(() -> {
                    return new StringBuilder(35).append("Data source write support ").append(batchWrite).append(" aborted.").toString();
                });
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (unapply.isEmpty()) {
                    throw th;
                }
                throw QueryExecutionErrors$.MODULE$.writingJobAbortedError((Throwable) unapply.get());
            } catch (Throwable th2) {
                ((Logging) this).logError(() -> {
                    return new StringBuilder(43).append("Data source write support ").append(batchWrite).append(" failed to abort.").toString();
                });
                th.addSuppressed(th2);
                throw QueryExecutionErrors$.MODULE$.writingJobFailedError(th);
            }
        }
    }

    static /* synthetic */ void $anonfun$writeWithV2$3(WriterCommitMessage[] writerCommitMessageArr, LongAccumulator longAccumulator, BatchWrite batchWrite, int i, DataWritingSparkTaskResult dataWritingSparkTaskResult) {
        WriterCommitMessage writerCommitMessage = dataWritingSparkTaskResult.writerCommitMessage();
        writerCommitMessageArr[i] = writerCommitMessage;
        longAccumulator.add(dataWritingSparkTaskResult.numRows());
        batchWrite.onDataWriterCommit(writerCommitMessage);
    }

    static void $init$(ExtendedV2ExistingTableWriteExec extendedV2ExistingTableWriteExec) {
    }
}
