package org.apache.s2graph.counter.loader.stream;

import com.typesafe.config.Config;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringDecoder;
import org.apache.s2graph.counter.config.S2CounterConfig;
import org.apache.s2graph.counter.loader.config.StreamingConfig$;
import org.apache.s2graph.spark.config.S2ConfigFactory$;
import org.apache.s2graph.spark.spark.HashMapParam$;
import org.apache.s2graph.spark.spark.SparkApp;
import org.apache.s2graph.spark.spark.WithKafka;
import org.apache.spark.Logging;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.kafka.StreamHelper;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.HashMap$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ExactCounterStreaming.scala */
/* loaded from: input_file:org/apache/s2graph/counter/loader/stream/ExactCounterStreaming$.class */
public final class ExactCounterStreaming$ implements SparkApp, WithKafka {
    public static final ExactCounterStreaming$ MODULE$ = null;
    private Config config;
    private S2CounterConfig s2Config;
    private String className;
    private Producer<String, String> producer;
    private final Set<String> inputTopics;
    private final String strInputTopics;
    private final String groupId;
    private final Map<String, String> kafkaParam;
    private final StreamHelper streamHelper;
    private String[] org$apache$s2graph$spark$spark$SparkApp$$_args;
    private StreamHelper org$apache$s2graph$spark$spark$SparkApp$$streamHelper;
    private transient Logger org$apache$spark$Logging$$log_;
    private volatile byte bitmap$0;

    static {
        new ExactCounterStreaming$();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private Config config$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.config = S2ConfigFactory$.MODULE$.config();
                this.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.config;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private S2CounterConfig s2Config$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.s2Config = new S2CounterConfig(config());
                this.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.s2Config;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private String className$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 4)) == 0) {
                this.className = new StringOps(Predef$.MODULE$.augmentString(getClass().getName())).stripSuffix("$");
                this.bitmap$0 = (byte) (this.bitmap$0 | 4);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.className;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private Producer producer$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 8)) == 0) {
                this.producer = getProducer(StreamingConfig$.MODULE$.KAFKA_BROKERS());
                this.bitmap$0 = (byte) (this.bitmap$0 | 8);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.producer;
        }
    }

    public ProducerConfig kafkaConf(String str) {
        return WithKafka.class.kafkaConf(this, str);
    }

    public ProducerConfig producerConfig(String str, String str2, String str3) {
        return WithKafka.class.producerConfig(this, str, str2, str3);
    }

    public <K, V> Producer<K, V> getProducer(ProducerConfig producerConfig) {
        return WithKafka.class.getProducer(this, producerConfig);
    }

    public <K, V> Producer<K, V> getProducer(String str) {
        return WithKafka.class.getProducer(this, str);
    }

    public int getPartKey(Object obj, int i) {
        return WithKafka.class.getPartKey(this, obj, i);
    }

    public String makeKafkaGroupId(String str, String str2) {
        return WithKafka.class.makeKafkaGroupId(this, str, str2);
    }

    public String producerConfig$default$2() {
        return WithKafka.class.producerConfig$default$2(this);
    }

    public String producerConfig$default$3() {
        return WithKafka.class.producerConfig$default$3(this);
    }

    public String[] org$apache$s2graph$spark$spark$SparkApp$$_args() {
        return this.org$apache$s2graph$spark$spark$SparkApp$$_args;
    }

    public void org$apache$s2graph$spark$spark$SparkApp$$_args_$eq(String[] strArr) {
        this.org$apache$s2graph$spark$spark$SparkApp$$_args = strArr;
    }

    public StreamHelper org$apache$s2graph$spark$spark$SparkApp$$streamHelper() {
        return this.org$apache$s2graph$spark$spark$SparkApp$$streamHelper;
    }

    public void org$apache$s2graph$spark$spark$SparkApp$$streamHelper_$eq(StreamHelper streamHelper) {
        this.org$apache$s2graph$spark$spark$SparkApp$$streamHelper = streamHelper;
    }

    public String[] args() {
        return SparkApp.class.args(this);
    }

    public String getArgs(int i) {
        return SparkApp.class.getArgs(this, i);
    }

    public void main(String[] strArr) {
        SparkApp.class.main(this, strArr);
    }

    public void validateArgument(Seq<String> seq) {
        SparkApp.class.validateArgument(this, seq);
    }

    public String buildKafkaGroupId(String str, String str2) {
        return SparkApp.class.buildKafkaGroupId(this, str, str2);
    }

    public StreamHelper getStreamHelper(Map<String, String> map) {
        return SparkApp.class.getStreamHelper(this, map);
    }

    public SparkConf sparkConf(String str) {
        return SparkApp.class.sparkConf(this, str);
    }

    public StreamingContext streamingContext(SparkConf sparkConf, Duration duration, Option<String> option) {
        return SparkApp.class.streamingContext(this, sparkConf, duration, option);
    }

    public DStream<Tuple2<String, String>> createKafkaPairStream(StreamingContext streamingContext, Map<String, String> map, String str, Option<Object> option) {
        return SparkApp.class.createKafkaPairStream(this, streamingContext, map, str, option);
    }

    public DStream<String> createKafkaValueStream(StreamingContext streamingContext, Map<String, String> map, String str, Option<Object> option) {
        return SparkApp.class.createKafkaValueStream(this, streamingContext, map, str, option);
    }

    public DStream<Tuple2<String, String>> createKafkaPairStreamMulti(StreamingContext streamingContext, Map<String, String> map, String str, int i, Option<Object> option) {
        return SparkApp.class.createKafkaPairStreamMulti(this, streamingContext, map, str, i, option);
    }

    public DStream<String> createKafkaValueStreamMulti(StreamingContext streamingContext, Map<String, String> map, String str, int i, Option<Object> option) {
        return SparkApp.class.createKafkaValueStreamMulti(this, streamingContext, map, str, i, option);
    }

    public Option<String> streamingContext$default$3() {
        return SparkApp.class.streamingContext$default$3(this);
    }

    public Option<Object> createKafkaPairStream$default$4() {
        return SparkApp.class.createKafkaPairStream$default$4(this);
    }

    public Option<Object> createKafkaValueStream$default$4() {
        return SparkApp.class.createKafkaValueStream$default$4(this);
    }

    public Option<Object> createKafkaPairStreamMulti$default$5() {
        return SparkApp.class.createKafkaPairStreamMulti$default$5(this);
    }

    public Option<Object> createKafkaValueStreamMulti$default$5() {
        return SparkApp.class.createKafkaValueStreamMulti$default$5(this);
    }

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public Config config() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? config$lzycompute() : this.config;
    }

    public S2CounterConfig s2Config() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? s2Config$lzycompute() : this.s2Config;
    }

    public String className() {
        return ((byte) (this.bitmap$0 & 4)) == 0 ? className$lzycompute() : this.className;
    }

    public Producer<String, String> producer() {
        return ((byte) (this.bitmap$0 & 8)) == 0 ? producer$lzycompute() : this.producer;
    }

    public Set<String> inputTopics() {
        return this.inputTopics;
    }

    public String strInputTopics() {
        return this.strInputTopics;
    }

    public String groupId() {
        return this.groupId;
    }

    public Map<String, String> kafkaParam() {
        return this.kafkaParam;
    }

    public StreamHelper streamHelper() {
        return this.streamHelper;
    }

    public void run() {
        validateArgument(Predef$.MODULE$.wrapRefArray(new String[]{"interval", "clear"}));
        Tuple2 tuple2 = new Tuple2(Durations$.MODULE$.seconds(new StringOps(Predef$.MODULE$.augmentString(args()[0])).toLong()), BoxesRunTime.boxToBoolean(new StringOps(Predef$.MODULE$.augmentString(args()[1])).toBoolean()));
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((Duration) tuple2._1(), BoxesRunTime.boxToBoolean(tuple2._2$mcZ$sp()));
        Duration duration = (Duration) tuple22._1();
        if (tuple22._2$mcZ$sp()) {
            streamHelper().kafkaHelper().consumerGroupCleanup();
        }
        StreamingContext streamingContext = streamingContext(sparkConf(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ": ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{strInputTopics(), className()}))), duration, streamingContext$default$3());
        streamHelper().createStream(streamingContext, inputTopics(), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class)).foreachRDD(new ExactCounterStreaming$$anonfun$run$1(streamingContext.sparkContext().accumulable(HashMap$.MODULE$.empty(), "Throughput", HashMapParam$.MODULE$.apply(new ExactCounterStreaming$$anonfun$1()))));
        streamingContext.start();
        streamingContext.awaitTermination();
    }

    private ExactCounterStreaming$() {
        MODULE$ = this;
        Logging.class.$init$(this);
        SparkApp.class.$init$(this);
        WithKafka.class.$init$(this);
        this.inputTopics = Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{StreamingConfig$.MODULE$.KAFKA_TOPIC_COUNTER()}));
        this.strInputTopics = inputTopics().mkString(",");
        this.groupId = buildKafkaGroupId(strInputTopics(), "counter_v2");
        this.kafkaParam = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group.id"), groupId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.broker.list"), StreamingConfig$.MODULE$.KAFKA_BROKERS()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("zookeeper.connect"), StreamingConfig$.MODULE$.KAFKA_ZOOKEEPER()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("zookeeper.connection.timeout.ms"), "10000")}));
        this.streamHelper = new StreamHelper(kafkaParam());
    }
}
