package tech.ytsaurus.spyt.streaming;

import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.fs.Path;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import tech.ytsaurus.client.CompoundClient;
import tech.ytsaurus.spyt.format.YtDynamicTableWriter;
import tech.ytsaurus.spyt.format.conf.SparkYtWriteConfiguration;
import tech.ytsaurus.spyt.format.conf.SparkYtWriteConfiguration$;
import tech.ytsaurus.spyt.fs.path.YPathEnriched$;
import tech.ytsaurus.spyt.wrapper.client.YtClientConfigurationConverter$;
import tech.ytsaurus.spyt.wrapper.client.YtClientProvider$;

/* compiled from: YtStreamingSink.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055b\u0001\u0002\t\u0012\u0001iA\u0001\u0002\u000f\u0001\u0003\u0002\u0003\u0006I!\u000f\u0005\t{\u0001\u0011\t\u0011)A\u0005}!A1\n\u0001B\u0001B\u0003%A\nC\u0003P\u0001\u0011\u0005\u0001\u000bC\u0004W\u0001\t\u0007I\u0011B,\t\ra\u0003\u0001\u0015!\u0003?\u0011\u001dI\u0006A1A\u0005\niCa!\u0019\u0001!\u0002\u0013Y\u0006b\u00022\u0001\u0005\u0004%Ia\u0019\u0005\u0007Y\u0002\u0001\u000b\u0011\u00023\t\u000f5\u0004\u0001\u0019!C\u0005]\"91\u000f\u0001a\u0001\n\u0013!\bB\u0002>\u0001A\u0003&q\u000e\u0003\u0004��\u0001\u0011\u0005\u0013\u0011\u0001\u0005\b\u0003\u0007\u0001A\u0011IA\u0003\u0005=IFo\u0015;sK\u0006l\u0017N\\4TS:\\'B\u0001\n\u0014\u0003%\u0019HO]3b[&twM\u0003\u0002\u0015+\u0005!1\u000f]=u\u0015\t1r#\u0001\u0005ziN\fWO];t\u0015\u0005A\u0012\u0001\u0002;fG\"\u001c\u0001a\u0005\u0003\u00017\r\u0012\u0004C\u0001\u000f\"\u001b\u0005i\"B\u0001\u0010 \u0003\u0011a\u0017M\\4\u000b\u0003\u0001\nAA[1wC&\u0011!%\b\u0002\u0007\u001f\nTWm\u0019;\u0011\u0005\u0011\u0002T\"A\u0013\u000b\u0005I1#BA\u0014)\u0003%)\u00070Z2vi&|gN\u0003\u0002*U\u0005\u00191/\u001d7\u000b\u0005-b\u0013!B:qCJ\\'BA\u0017/\u0003\u0019\t\u0007/Y2iK*\tq&A\u0002pe\u001eL!!M\u0013\u0003\tMKgn\u001b\t\u0003gYj\u0011\u0001\u000e\u0006\u0003k)\n\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003oQ\u0012q\u0001T8hO&tw-\u0001\u0006tc2\u001cuN\u001c;fqR\u0004\"AO\u001e\u000e\u0003!J!\u0001\u0010\u0015\u0003\u0015M\u000bFjQ8oi\u0016DH/A\u0005rk\u0016,X\rU1uQB\u0011q\b\u0013\b\u0003\u0001\u001a\u0003\"!\u0011#\u000e\u0003\tS!aQ\r\u0002\rq\u0012xn\u001c;?\u0015\u0005)\u0015!B:dC2\f\u0017BA$E\u0003\u0019\u0001&/\u001a3fM&\u0011\u0011J\u0013\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005\u001d#\u0015A\u00039be\u0006lW\r^3sgB!q(\u0014 ?\u0013\tq%JA\u0002NCB\fa\u0001P5oSRtD\u0003B)T)V\u0003\"A\u0015\u0001\u000e\u0003EAQ\u0001\u000f\u0003A\u0002eBQ!\u0010\u0003A\u0002yBQa\u0013\u0003A\u00021\u000b!!\u001b3\u0016\u0003y\n1!\u001b3!\u0003\tIH/F\u0001\\!\tav,D\u0001^\u0015\tqV#\u0001\u0004dY&,g\u000e^\u0005\u0003Av\u0013abQ8na>,h\u000eZ\"mS\u0016tG/A\u0002zi\u0002\nqa^\"p]\u001aLw-F\u0001e!\t)'.D\u0001g\u0015\t9\u0007.\u0001\u0003d_:4'BA5\u0014\u0003\u00191wN]7bi&\u00111N\u001a\u0002\u001a'B\f'o[-u/JLG/Z\"p]\u001aLw-\u001e:bi&|g.\u0001\u0005x\u0007>tg-[4!\u00035a\u0017\r^3ti\n\u000bGo\u00195JIV\tq\u000e\u0005\u0002qc6\tA)\u0003\u0002s\t\n!Aj\u001c8h\u0003Ea\u0017\r^3ti\n\u000bGo\u00195JI~#S-\u001d\u000b\u0003kb\u0004\"\u0001\u001d<\n\u0005]$%\u0001B+oSRDq!\u001f\u0007\u0002\u0002\u0003\u0007q.A\u0002yIE\na\u0002\\1uKN$()\u0019;dQ&#\u0007\u0005\u000b\u0002\u000eyB\u0011\u0001/`\u0005\u0003}\u0012\u0013\u0001B^8mCRLG.Z\u0001\ti>\u001cFO]5oOR\ta(\u0001\u0005bI\u0012\u0014\u0015\r^2i)\u0015)\u0018qAA\u0006\u0011\u0019\tIa\u0004a\u0001_\u00069!-\u0019;dQ&#\u0007bBA\u0007\u001f\u0001\u0007\u0011qB\u0001\u0005I\u0006$\u0018\r\u0005\u0003\u0002\u0012\u0005\u001db\u0002BA\n\u0003GqA!!\u0006\u0002\"9!\u0011qCA\u0010\u001d\u0011\tI\"!\b\u000f\u0007\u0005\u000bY\"C\u00010\u0013\tic&\u0003\u0002,Y%\u0011\u0011FK\u0005\u0004\u0003KA\u0013a\u00029bG.\fw-Z\u0005\u0005\u0003S\tYCA\u0005ECR\fgI]1nK*\u0019\u0011Q\u0005\u0015")
/* loaded from: input_file:tech/ytsaurus/spyt/streaming/YtStreamingSink.class */
public class YtStreamingSink implements Sink, Logging {
    private final SQLContext sqlContext;
    private final String queuePath;
    private final Map<String, String> parameters;
    private final String id;
    private final CompoundClient yt;
    private final SparkYtWriteConfiguration wConfig;
    private volatile long latestBatchId;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public String name() {
        return Sink.name$(this);
    }

    public StructType schema() {
        return Sink.schema$(this);
    }

    public Set<TableCapability> capabilities() {
        return Sink.capabilities$(this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private String id() {
        return this.id;
    }

    private CompoundClient yt() {
        return this.yt;
    }

    private SparkYtWriteConfiguration wConfig() {
        return this.wConfig;
    }

    private long latestBatchId() {
        return this.latestBatchId;
    }

    private void latestBatchId_$eq(long j) {
        this.latestBatchId = j;
    }

    public String toString() {
        return "YtStreamingSink";
    }

    public void addBatch(long j, Dataset<Row> dataset) {
        if (j <= latestBatchId()) {
            logInfo(() -> {
                return new StringBuilder(33).append("Skipping already committed batch ").append(j).toString();
            });
            return;
        }
        Row[] rowArr = (Row[]) dataset.collect();
        YtDynamicTableWriter ytDynamicTableWriter = new YtDynamicTableWriter(YPathEnriched$.MODULE$.fromPath(new Path(this.queuePath), YPathEnriched$.MODULE$.fromPath$default$2()), dataset.schema(), wConfig(), this.parameters, yt());
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(rowArr)).foreach(row -> {
            $anonfun$addBatch$2(ytDynamicTableWriter, row);
            return BoxedUnit.UNIT;
        });
        ytDynamicTableWriter.close();
        log().debug(new StringBuilder(14).append("Wrote ").append(rowArr.length).append(" records").toString());
        latestBatchId_$eq(j);
    }

    public static final /* synthetic */ void $anonfun$addBatch$2(YtDynamicTableWriter ytDynamicTableWriter, Row row) {
        ytDynamicTableWriter.write(row.toSeq());
    }

    public YtStreamingSink(SQLContext sQLContext, String str, Map<String, String> map) {
        this.sqlContext = sQLContext;
        this.queuePath = str;
        this.parameters = map;
        Sink.$init$(this);
        Logging.$init$(this);
        this.id = new StringBuilder(18).append("YtStreamingSource-").append(UUID.randomUUID()).toString();
        this.yt = YtClientProvider$.MODULE$.ytClient(() -> {
            return YtClientConfigurationConverter$.MODULE$.ytClientConfiguration(this.sqlContext.sparkSession());
        }, id());
        SparkYtWriteConfiguration apply = SparkYtWriteConfiguration$.MODULE$.apply(sQLContext);
        this.wConfig = apply.copy(apply.copy$default$1(), apply.copy$default$2(), Integer.MAX_VALUE, apply.copy$default$4(), apply.copy$default$5());
        this.latestBatchId = -1L;
    }
}
