package io.amient.affinity.spark;

import io.amient.affinity.core.serde.AbstractSerde;
import io.amient.affinity.core.storage.LogStorage;
import io.amient.affinity.core.util.TimeRange;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.util.LongAccumulator;
import scala.Function0;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

/* compiled from: LogRDD.scala */
/* loaded from: input_file:io/amient/affinity/spark/LogRDD$.class */
public final class LogRDD$ implements Serializable {
    public static final LogRDD$ MODULE$ = null;

    static {
        new LogRDD$();
    }

    public <POS extends Comparable<POS>> LogRDD<POS> apply(Function0<LogStorage<POS>> function0, TimeRange timeRange, SparkContext sparkContext) {
        return new LogRDD<>(sparkContext, function0, timeRange, false);
    }

    public <POS extends Comparable<POS>> TimeRange apply$default$2() {
        return TimeRange.UNBOUNDED;
    }

    public <K, V> void append(Function0<AbstractSerde<Object>> function0, Function0<LogStorage<?>> function02, RDD<Tuple2<K, V>> rdd, ClassTag<K> classTag, ClassTag<V> classTag2, SparkContext sparkContext) {
        append(function0, function0, function02, rdd, classTag, classTag2, sparkContext);
    }

    public <K, V> void append(Function0<AbstractSerde<? super K>> function0, Function0<AbstractSerde<? super V>> function02, Function0<LogStorage<?>> function03, RDD<Tuple2<K, V>> rdd, ClassTag<K> classTag, ClassTag<V> classTag2, SparkContext sparkContext) {
        LongAccumulator longAccumulator = new LongAccumulator();
        sparkContext.register(longAccumulator);
        sparkContext.runJob(rdd, new LogRDD$$anonfun$append$1(function0, function02, function03, longAccumulator), ClassTag$.MODULE$.Unit());
    }

    private Object readResolve() {
        return MODULE$;
    }

    public final void io$amient$affinity$spark$LogRDD$$updatePartition$1(TaskContext taskContext, Iterator iterator, Function0 function0, Function0 function02, Function0 function03, LongAccumulator longAccumulator) {
        LogStorage logStorage = (LogStorage) function03.apply();
        AbstractSerde abstractSerde = (AbstractSerde) function0.apply();
        try {
            iterator.map(new LogRDD$$anonfun$5(abstractSerde, (AbstractSerde) function02.apply())).foreach(new LogRDD$$anonfun$io$amient$affinity$spark$LogRDD$$updatePartition$1$1(longAccumulator, logStorage));
            logStorage.flush();
            logStorage.close();
            try {
                abstractSerde.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                abstractSerde.close();
                throw th;
            } finally {
            }
        }
    }

    private LogRDD$() {
        MODULE$ = this;
    }
}
