package org.apache.s2graph.loader.subscriber;

import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.RDD$;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.Array$;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.math.Numeric$LongIsIntegral$;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction2;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: WalLogStat.scala */
/* loaded from: input_file:org/apache/s2graph/loader/subscriber/WalLogStat$$anonfun$run$1.class */
public final class WalLogStat$$anonfun$run$1 extends AbstractFunction2<RDD<Tuple2<String, String>>, Time, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    public final String brokerList$1;
    public final String dbUrl$1;
    public final String statTopic$1;
    public final Map kafkaParams$1;
    private final Producer statProducer$1;

    public final void apply(RDD<Tuple2<String, String>> rdd, Time time) {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges();
        long milliseconds = time.milliseconds();
        RDD mapPartitions = rdd.mapPartitions(new WalLogStat$$anonfun$run$1$$anonfun$2(this), rdd.mapPartitions$default$2(), ClassTag$.MODULE$.apply(String.class));
        Tuple2[] tuple2Arr = (Tuple2[]) RDD$.MODULE$.rddToPairRDDFunctions(mapPartitions.map(new WalLogStat$$anonfun$run$1$$anonfun$3(this), ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.Long(), Ordering$String$.MODULE$).reduceByKey(new WalLogStat$$anonfun$run$1$$anonfun$1(this)).collect();
        this.statProducer$1.send(Predef$.MODULE$.wrapRefArray((KeyedMessage[]) Predef$.MODULE$.refArrayOps(tuple2Arr).map(new WalLogStat$$anonfun$run$1$$anonfun$5(this, milliseconds, BoxesRunTime.unboxToLong(Predef$.MODULE$.longArrayOps((long[]) Predef$.MODULE$.refArrayOps(tuple2Arr).map(new WalLogStat$$anonfun$run$1$$anonfun$4(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Long()))).sum(Numeric$LongIsIntegral$.MODULE$))), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(KeyedMessage.class)))));
        mapPartitions.mapPartitionsWithIndex(new WalLogStat$$anonfun$run$1$$anonfun$apply$2(this, offsetRanges), mapPartitions.mapPartitionsWithIndex$default$2(), ClassTag$.MODULE$.Nothing()).foreach(new WalLogStat$$anonfun$run$1$$anonfun$apply$3(this));
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
        apply((RDD<Tuple2<String, String>>) obj, (Time) obj2);
        return BoxedUnit.UNIT;
    }

    public WalLogStat$$anonfun$run$1(String str, String str2, String str3, Map map, Producer producer) {
        this.brokerList$1 = str;
        this.dbUrl$1 = str2;
        this.statTopic$1 = str3;
        this.kafkaParams$1 = map;
        this.statProducer$1 = producer;
    }
}
