package org.apache.bahir.sql.streaming.mqtt;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bahir.utils.Logging;
import org.apache.spark.SparkEnv$;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.Tuple4;
import scala.Tuple6;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.math.Ordering$String$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: CachedMQTTClient.scala */
/* loaded from: input_file:org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient$.class */
public final class CachedMQTTClient$ implements Logging {
    public static final CachedMQTTClient$ MODULE$ = null;
    private long cacheExpireTimeout;
    private int connectAttempts;
    private long connectBackoff;
    private final CacheLoader<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> cacheLoader;
    private final Object removalListener;
    private LoadingCache<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> cache;
    private final Logger log;
    private volatile byte bitmap$0;

    static {
        new CachedMQTTClient$();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private long cacheExpireTimeout$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.cacheExpireTimeout = SparkEnv$.MODULE$.get().conf().getTimeAsMs("spark.mqtt.connection.cache.timeout", "10m");
                this.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.cacheExpireTimeout;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private int connectAttempts$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.connectAttempts = SparkEnv$.MODULE$.get().conf().getInt("spark.mqtt.client.connect.attempts", -1);
                this.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.connectAttempts;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private long connectBackoff$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 4)) == 0) {
                this.connectBackoff = SparkEnv$.MODULE$.get().conf().getTimeAsMs("spark.mqtt.client.connect.backoff", "5s");
                this.bitmap$0 = (byte) (this.bitmap$0 | 4);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.connectBackoff;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private LoadingCache cache$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 8)) == 0) {
                this.cache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeout(), TimeUnit.MILLISECONDS).removalListener(removalListener()).build(cacheLoader());
                this.bitmap$0 = (byte) (this.bitmap$0 | 8);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            this.cacheLoader = null;
            this.removalListener = null;
            return this.cache;
        }
    }

    @Override // org.apache.bahir.utils.Logging
    public final Logger log() {
        return this.log;
    }

    @Override // org.apache.bahir.utils.Logging
    public final void org$apache$bahir$utils$Logging$_setter_$log_$eq(Logger logger) {
        this.log = logger;
    }

    private long cacheExpireTimeout() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? cacheExpireTimeout$lzycompute() : this.cacheExpireTimeout;
    }

    private int connectAttempts() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? connectAttempts$lzycompute() : this.connectAttempts;
    }

    private long connectBackoff() {
        return ((byte) (this.bitmap$0 & 4)) == 0 ? connectBackoff$lzycompute() : this.connectBackoff;
    }

    private CacheLoader<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> cacheLoader() {
        return this.cacheLoader;
    }

    private Object removalListener() {
        return this.removalListener;
    }

    private LoadingCache<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> cache() {
        return ((byte) (this.bitmap$0 & 8)) == 0 ? cache$lzycompute() : this.cache;
    }

    public Tuple2<MqttClient, MqttClientPersistence> org$apache$bahir$sql$streaming$mqtt$CachedMQTTClient$$createMqttClient(Map<String, String> map) {
        Tuple6<String, String, String, MqttClientPersistence, MqttConnectOptions, Object> parseConfigParams = MQTTUtils$.MODULE$.parseConfigParams(map);
        if (parseConfigParams == null) {
            throw new MatchError(parseConfigParams);
        }
        Tuple4 tuple4 = new Tuple4((String) parseConfigParams._1(), (String) parseConfigParams._2(), (MqttClientPersistence) parseConfigParams._4(), (MqttConnectOptions) parseConfigParams._5());
        String str = (String) tuple4._1();
        String str2 = (String) tuple4._2();
        MqttClientPersistence mqttClientPersistence = (MqttClientPersistence) tuple4._3();
        MqttConnectOptions mqttConnectOptions = (MqttConnectOptions) tuple4._4();
        MqttClient mqttClient = new MqttClient(str, str2, mqttClientPersistence);
        mqttClient.setCallback(new MqttCallbackExtended() { // from class: org.apache.bahir.sql.streaming.mqtt.CachedMQTTClient$$anon$1
            public synchronized void messageArrived(String str3, MqttMessage mqttMessage) {
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }

            public void connectionLost(Throwable th) {
                CachedMQTTClient$.MODULE$.log().warn("Connection to mqtt server lost.", th);
            }

            public void connectComplete(boolean z, String str3) {
                CachedMQTTClient$.MODULE$.log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Connect complete ", ". Is it a reconnect?: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str3, BoxesRunTime.boxToBoolean(z)})));
            }
        });
        Retry$.MODULE$.apply(connectAttempts(), connectBackoff(), Predef$.MODULE$.wrapRefArray(new Class[]{MqttException.class}), new CachedMQTTClient$$anonfun$org$apache$bahir$sql$streaming$mqtt$CachedMQTTClient$$createMqttClient$1(mqttConnectOptions, mqttClient));
        return new Tuple2<>(mqttClient, mqttClientPersistence);
    }

    public void org$apache$bahir$sql$streaming$mqtt$CachedMQTTClient$$closeMqttClient(Seq<Tuple2<String, String>> seq, MqttClient mqttClient, MqttClientPersistence mqttClientPersistence) {
        try {
            mqttClient.disconnect();
            mqttClientPersistence.close();
            mqttClient.close();
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            Throwable th2 = (Throwable) unapply.get();
            log().warn(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error while closing MQTT client ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{th2.getMessage()})), th2);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public MqttClient getOrCreate(Map<String, String> map) {
        try {
            return (MqttClient) ((Tuple2) cache().get(mapToSeq(map)))._1();
        } catch (Throwable th) {
            if (!(th instanceof ExecutionException ? true : th instanceof UncheckedExecutionException ? true : th instanceof ExecutionError) || th.getCause() == null) {
                throw th;
            }
            throw th.getCause();
        }
    }

    public void close(Map<String, String> map) {
        cache().invalidate(mapToSeq(map));
    }

    public void clear() {
        log().debug("Cleaning MQTT client cache");
        cache().invalidateAll();
    }

    private Seq<Tuple2<String, String>> mapToSeq(Map<String, String> map) {
        return (Seq) map.toSeq().sortBy(new CachedMQTTClient$$anonfun$mapToSeq$1(), Ordering$String$.MODULE$);
    }

    private CachedMQTTClient$() {
        MODULE$ = this;
        org$apache$bahir$utils$Logging$_setter_$log_$eq(LoggerFactory.getLogger(new StringOps(Predef$.MODULE$.augmentString(getClass().getName())).stripSuffix("$")));
        this.cacheLoader = new CacheLoader<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>>() { // from class: org.apache.bahir.sql.streaming.mqtt.CachedMQTTClient$$anon$2
            public Tuple2<MqttClient, MqttClientPersistence> load(Seq<Tuple2<String, String>> seq) {
                CachedMQTTClient$.MODULE$.log().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Creating new MQTT client with params: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{seq})));
                return CachedMQTTClient$.MODULE$.org$apache$bahir$sql$streaming$mqtt$CachedMQTTClient$$createMqttClient((Map) Predef$.MODULE$.Map().apply((Seq) seq.map(new CachedMQTTClient$$anon$2$$anonfun$load$1(this), Seq$.MODULE$.canBuildFrom())));
            }
        };
        this.removalListener = new RemovalListener<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>>() { // from class: org.apache.bahir.sql.streaming.mqtt.CachedMQTTClient$$anon$3
            public void onRemoval(RemovalNotification<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> removalNotification) {
                Seq<Tuple2<String, String>> seq = (Seq) removalNotification.getKey();
                MqttClient mqttClient = (MqttClient) ((Tuple2) removalNotification.getValue())._1();
                MqttClientPersistence mqttClientPersistence = (MqttClientPersistence) ((Tuple2) removalNotification.getValue())._2();
                CachedMQTTClient$.MODULE$.log().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Evicting MQTT client ", " params: ", ", due to ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{mqttClient, seq, removalNotification.getCause()})));
                CachedMQTTClient$.MODULE$.org$apache$bahir$sql$streaming$mqtt$CachedMQTTClient$$closeMqttClient(seq, mqttClient, mqttClientPersistence);
            }
        };
    }
}
