package org.apache.griffin.measure.datasource.connector.streaming;

import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.Time;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.runtime.AbstractFunction2;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaStreamingDataConnector.scala */
/* loaded from: input_file:org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector$$anonfun$init$2.class */
public final class KafkaStreamingDataConnector$$anonfun$init$2 extends AbstractFunction2<RDD<Tuple2<Object, Object>>, Time, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ KafkaStreamingDataConnector $outer;

    public final void apply(RDD<Tuple2<Object, Object>> rdd, Time time) {
        Option<Dataset<Row>> option;
        RDD<Tuple2<Object, Object>> rdd2;
        long milliseconds = time.milliseconds();
        try {
            int defaultParallelism = rdd.sparkContext().defaultParallelism();
            if (defaultParallelism < rdd.getNumPartitions()) {
                boolean coalesce$default$2 = rdd.coalesce$default$2();
                Option coalesce$default$3 = rdd.coalesce$default$3();
                rdd2 = rdd.coalesce(defaultParallelism, coalesce$default$2, coalesce$default$3, rdd.coalesce$default$4(defaultParallelism, coalesce$default$2, coalesce$default$3));
            } else {
                rdd2 = rdd;
            }
            option = this.$outer.preProcess(this.$outer.transform(rdd2), milliseconds);
        } catch (Throwable th) {
            this.$outer.error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"streaming data connector error: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{th.getMessage()})));
            option = None$.MODULE$;
        }
        this.$outer.streamingCacheClientOpt().foreach(new KafkaStreamingDataConnector$$anonfun$init$2$$anonfun$apply$1(this, milliseconds, option));
    }

    @Override // scala.Function2
    /* renamed from: apply */
    public final /* bridge */ /* synthetic */ Object mo2599apply(Object obj, Object obj2) {
        apply((RDD<Tuple2<Object, Object>>) obj, (Time) obj2);
        return BoxedUnit.UNIT;
    }

    public KafkaStreamingDataConnector$$anonfun$init$2(KafkaStreamingDataConnector kafkaStreamingDataConnector) {
        if (kafkaStreamingDataConnector == null) {
            throw null;
        }
        this.$outer = kafkaStreamingDataConnector;
    }
}
