/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.pulsar;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.shade.com.google.common.cache.CacheBuilder;
import org.apache.pulsar.shade.com.google.common.cache.CacheLoader;
import org.apache.pulsar.shade.com.google.common.cache.LoadingCache;
import org.apache.pulsar.shade.com.google.common.cache.RemovalListener;
import org.apache.pulsar.shade.com.google.common.cache.RemovalNotification;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.Tuple2;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

public final class CachedConsumer$
implements Logging {
    public static CachedConsumer$ MODULE$;
    private long cacheExpireTimeout;
    private LoadingCache<Tuple2<String, String>, Consumer<GenericRecord>> guavaCache;
    private PulsarClient org$apache$spark$sql$pulsar$CachedConsumer$$client;
    private final long defaultCacheExpireTimeout;
    private CacheLoader<Tuple2<String, String>, Consumer<GenericRecord>> cacheLoader;
    private RemovalListener<Tuple2<String, String>, Consumer<GenericRecord>> removalListener;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    static {
        new CachedConsumer$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public PulsarClient org$apache$spark$sql$pulsar$CachedConsumer$$client() {
        return this.org$apache$spark$sql$pulsar$CachedConsumer$$client;
    }

    private void org$apache$spark$sql$pulsar$CachedConsumer$$client_$eq(PulsarClient x$1) {
        this.org$apache$spark$sql$pulsar$CachedConsumer$$client = x$1;
    }

    private long defaultCacheExpireTimeout() {
        return this.defaultCacheExpireTimeout;
    }

    private long cacheExpireTimeout$lzycompute() {
        CachedConsumer$ cachedConsumer$ = this;
        synchronized (cachedConsumer$) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                long l;
                Option option = Option$.MODULE$.apply((Object)SparkEnv$.MODULE$.get()).map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)CachedConsumer$.$anonfun$cacheExpireTimeout$1(x$1)));
                if (option instanceof Some) {
                    long timeout;
                    Some some = (Some)option;
                    l = timeout = BoxesRunTime.unboxToLong((Object)some.value());
                } else if (None$.MODULE$.equals(option)) {
                    l = this.defaultCacheExpireTimeout();
                } else {
                    throw new MatchError((Object)option);
                }
                this.cacheExpireTimeout = l;
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.cacheExpireTimeout;
    }

    private long cacheExpireTimeout() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.cacheExpireTimeout$lzycompute();
        }
        return this.cacheExpireTimeout;
    }

    private CacheLoader<Tuple2<String, String>, Consumer<GenericRecord>> cacheLoader() {
        return this.cacheLoader;
    }

    private RemovalListener<Tuple2<String, String>, Consumer<GenericRecord>> removalListener() {
        return this.removalListener;
    }

    private LoadingCache<Tuple2<String, String>, Consumer<GenericRecord>> guavaCache$lzycompute() {
        CachedConsumer$ cachedConsumer$ = this;
        synchronized (cachedConsumer$) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.guavaCache = CacheBuilder.newBuilder().expireAfterAccess(this.cacheExpireTimeout(), TimeUnit.MILLISECONDS).removalListener(this.removalListener()).build(this.cacheLoader());
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        this.cacheLoader = null;
        this.removalListener = null;
        return this.guavaCache;
    }

    private LoadingCache<Tuple2<String, String>, Consumer<GenericRecord>> guavaCache() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.guavaCache$lzycompute();
        }
        return this.guavaCache;
    }

    public Consumer<GenericRecord> getOrCreate(String topic, String subscription, PulsarClient client) {
        this.org$apache$spark$sql$pulsar$CachedConsumer$$client_$eq(client);
        Try try_ = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> MODULE$.guavaCache().get((Tuple2<String, String>)new Tuple2((Object)topic, (Object)subscription)));
        if (try_ instanceof Success) {
            Success success = (Success)try_;
            Consumer consumer = (Consumer)success.value();
            return consumer;
        }
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable exception = failure.exception();
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(54).append("Failed to create consumer to topic ").append(topic).append(" with subscription ").append(subscription).toString());
            throw exception;
        }
        throw new MatchError((Object)try_);
    }

    public void close(String topic, String subscription) {
        this.guavaCache().invalidate(new Tuple2((Object)topic, (Object)subscription));
    }

    public void clear() {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Cleaning up Consumer Cache.");
        this.guavaCache().invalidateAll();
    }

    public static final /* synthetic */ long $anonfun$cacheExpireTimeout$1(SparkEnv x$1) {
        return x$1.conf().getTimeAsMs("spark.pulsar.client.cache.timeout", new StringBuilder(2).append(MODULE$.defaultCacheExpireTimeout()).append("ms").toString());
    }

    private CachedConsumer$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
        this.org$apache$spark$sql$pulsar$CachedConsumer$$client = null;
        this.defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10L);
        this.cacheLoader = new CacheLoader<Tuple2<String, String>, Consumer<GenericRecord>>(){

            public Consumer<GenericRecord> load(Tuple2<String, String> k) {
                Tuple2<String, String> tuple2 = k;
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                String topic = (String)tuple2._1();
                String subscription = (String)tuple2._2();
                Tuple2 tuple22 = new Tuple2((Object)topic, (Object)subscription);
                String topic2 = (String)tuple22._1();
                String subscription2 = (String)tuple22._2();
                Try try_ = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> CachedConsumer$.MODULE$.org$apache$spark$sql$pulsar$CachedConsumer$$client().newConsumer(new AutoConsumeSchema()).topic(topic2).subscriptionName(subscription2).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe());
                if (try_ instanceof Success) {
                    Success success = (Success)try_;
                    Consumer consumer = (Consumer)success.value();
                    return consumer;
                }
                if (try_ instanceof Failure) {
                    Failure failure = (Failure)try_;
                    Throwable exception = failure.exception();
                    CachedConsumer$.MODULE$.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(54).append("Failed to create consumer to topic ").append(topic2).append(" with subscription ").append(subscription2).toString());
                    throw exception;
                }
                throw new MatchError((Object)try_);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$load$1(java.lang.String java.lang.String ), $anonfun$load$2(java.lang.String java.lang.String )}, serializedLambda);
            }
        };
        this.removalListener = new RemovalListener<Tuple2<String, String>, Consumer<GenericRecord>>(){

            public void onRemoval(RemovalNotification<Tuple2<String, String>, Consumer<GenericRecord>> notification) {
                Try try_ = Try$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> ((Consumer)notification.getValue()).close());
                if (try_ instanceof Success) {
                    CachedConsumer$.MODULE$.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(20).append("Closed consumer for ").append(notification.getKey()).toString());
                    return;
                }
                if (try_ instanceof Failure) {
                    Failure failure = (Failure)try_;
                    Throwable exception = failure.exception();
                    CachedConsumer$.MODULE$.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(29).append("Failed to close consumer for ").append(notification.getKey()).toString(), exception);
                    return;
                }
                throw new MatchError((Object)try_);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$onRemoval$1(org.apache.pulsar.shade.com.google.common.cache.RemovalNotification ), $anonfun$onRemoval$2(org.apache.pulsar.shade.com.google.common.cache.RemovalNotification ), $anonfun$onRemoval$3(org.apache.pulsar.shade.com.google.common.cache.RemovalNotification )}, serializedLambda);
            }
        };
    }
}

