/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bahir.sql.streaming.mqtt;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bahir.sql.streaming.mqtt.MQTTUtils$;
import org.apache.bahir.sql.streaming.mqtt.Retry$;
import org.apache.bahir.utils.Logging;
import org.apache.bahir.utils.Logging$class;
import org.apache.spark.SparkEnv$;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.Tuple4;
import scala.Tuple6;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.math.Ordering;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

public final class CachedMQTTClient$
implements Logging {
    public static final CachedMQTTClient$ MODULE$;
    private long cacheExpireTimeout;
    private int connectAttempts;
    private long connectBackoff;
    private final CacheLoader<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> cacheLoader;
    private final Object removalListener;
    private LoadingCache<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> cache;
    private final Logger log;
    private volatile byte bitmap$0;

    static {
        new CachedMQTTClient$();
    }

    private long cacheExpireTimeout$lzycompute() {
        CachedMQTTClient$ cachedMQTTClient$ = this;
        synchronized (cachedMQTTClient$) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.cacheExpireTimeout = SparkEnv$.MODULE$.get().conf().getTimeAsMs("spark.mqtt.connection.cache.timeout", "10m");
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
            return this.cacheExpireTimeout;
        }
    }

    private int connectAttempts$lzycompute() {
        CachedMQTTClient$ cachedMQTTClient$ = this;
        synchronized (cachedMQTTClient$) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.connectAttempts = SparkEnv$.MODULE$.get().conf().getInt("spark.mqtt.client.connect.attempts", -1);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
            return this.connectAttempts;
        }
    }

    private long connectBackoff$lzycompute() {
        CachedMQTTClient$ cachedMQTTClient$ = this;
        synchronized (cachedMQTTClient$) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                this.connectBackoff = SparkEnv$.MODULE$.get().conf().getTimeAsMs("spark.mqtt.client.connect.backoff", "5s");
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
            return this.connectBackoff;
        }
    }

    private LoadingCache cache$lzycompute() {
        CachedMQTTClient$ cachedMQTTClient$ = this;
        synchronized (cachedMQTTClient$) {
            if ((byte)(this.bitmap$0 & 8) == 0) {
                this.cache = CacheBuilder.newBuilder().expireAfterAccess(this.cacheExpireTimeout(), TimeUnit.MILLISECONDS).removalListener(this.removalListener()).build(this.cacheLoader());
                this.bitmap$0 = (byte)(this.bitmap$0 | 8);
            }
            // MONITOREXIT @DISABLED, blocks:[0, 1] lbl8 : MonitorExitStatement: MONITOREXIT : var1_1
            this.cacheLoader = null;
            this.removalListener = null;
            return this.cache;
        }
    }

    @Override
    public final Logger log() {
        return this.log;
    }

    @Override
    public final void org$apache$bahir$utils$Logging$_setter_$log_$eq(Logger x$1) {
        this.log = x$1;
    }

    private long cacheExpireTimeout() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.cacheExpireTimeout$lzycompute() : this.cacheExpireTimeout;
    }

    private int connectAttempts() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.connectAttempts$lzycompute() : this.connectAttempts;
    }

    private long connectBackoff() {
        return (byte)(this.bitmap$0 & 4) == 0 ? this.connectBackoff$lzycompute() : this.connectBackoff;
    }

    private CacheLoader<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> cacheLoader() {
        return this.cacheLoader;
    }

    private Object removalListener() {
        return this.removalListener;
    }

    private LoadingCache<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> cache() {
        return (byte)(this.bitmap$0 & 8) == 0 ? this.cache$lzycompute() : this.cache;
    }

    public Tuple2<MqttClient, MqttClientPersistence> org$apache$bahir$sql$streaming$mqtt$CachedMQTTClient$$createMqttClient(Map<String, String> config) {
        Tuple6<String, String, String, MqttClientPersistence, MqttConnectOptions, Object> tuple6 = MQTTUtils$.MODULE$.parseConfigParams(config);
        if (tuple6 != null) {
            Tuple4 tuple4;
            String brokerUrl = (String)tuple6._1();
            String clientId = (String)tuple6._2();
            MqttClientPersistence persistence = (MqttClientPersistence)tuple6._4();
            MqttConnectOptions mqttConnectOptions = (MqttConnectOptions)tuple6._5();
            Tuple4 tuple42 = tuple4 = new Tuple4((Object)brokerUrl, (Object)clientId, (Object)persistence, (Object)mqttConnectOptions);
            String brokerUrl2 = (String)tuple42._1();
            String clientId2 = (String)tuple42._2();
            MqttClientPersistence persistence2 = (MqttClientPersistence)tuple42._3();
            MqttConnectOptions mqttConnectOptions2 = (MqttConnectOptions)tuple42._4();
            MqttClient client = new MqttClient(brokerUrl2, clientId2, persistence2);
            MqttCallbackExtended callback = new MqttCallbackExtended(){

                public synchronized void messageArrived(String topic, MqttMessage message) {
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                }

                public void connectionLost(Throwable cause) {
                    CachedMQTTClient$.MODULE$.log().warn("Connection to mqtt server lost.", cause);
                }

                public void connectComplete(boolean reconnect, String serverURI) {
                    CachedMQTTClient$.MODULE$.log().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Connect complete ", ". Is it a reconnect?: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{serverURI, BoxesRunTime.boxToBoolean((boolean)reconnect)})));
                }
            };
            client.setCallback((MqttCallback)callback);
            Retry$.MODULE$.apply(this.connectAttempts(), this.connectBackoff(), (Seq<Class<?>>)Predef$.MODULE$.wrapRefArray((Object[])new Class[]{MqttException.class}), new Serializable(mqttConnectOptions2, client){
                public static final long serialVersionUID = 0L;
                private final MqttConnectOptions mqttConnectOptions$1;
                private final MqttClient client$1;

                public final void apply() {
                    this.apply$mcV$sp();
                }

                public void apply$mcV$sp() {
                    this.client$1.connect(this.mqttConnectOptions$1);
                }
                {
                    this.mqttConnectOptions$1 = mqttConnectOptions$1;
                    this.client$1 = client$1;
                }
            });
            return new Tuple2((Object)client, (Object)persistence2);
        }
        throw new MatchError(tuple6);
    }

    public void org$apache$bahir$sql$streaming$mqtt$CachedMQTTClient$$closeMqttClient(Seq<Tuple2<String, String>> params, MqttClient client, MqttClientPersistence persistence) {
        try {
            client.disconnect();
            persistence.close();
            client.close();
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            this.log().warn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Error while closing MQTT client ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{e.getMessage()})), e);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public MqttClient getOrCreate(Map<String, String> parameters) {
        try {
            return (MqttClient)((Tuple2)this.cache().get(this.mapToSeq(parameters)))._1();
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            boolean bl = throwable2 instanceof ExecutionException ? true : (throwable2 instanceof UncheckedExecutionException ? true : throwable2 instanceof ExecutionError);
            if (bl && throwable2.getCause() != null) {
                throw throwable2.getCause();
            }
            throw throwable;
        }
    }

    public void close(Map<String, String> parameters) {
        this.cache().invalidate(this.mapToSeq(parameters));
    }

    public void clear() {
        this.log().debug("Cleaning MQTT client cache");
        this.cache().invalidateAll();
    }

    private Seq<Tuple2<String, String>> mapToSeq(Map<String, String> parameters) {
        return (Seq)parameters.toSeq().sortBy((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(Tuple2<String, String> x) {
                return (String)x._1();
            }
        }, (Ordering)Ordering.String$.MODULE$);
    }

    private CachedMQTTClient$() {
        MODULE$ = this;
        Logging$class.$init$(this);
        this.cacheLoader = new CacheLoader<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>>(){

            public Tuple2<MqttClient, MqttClientPersistence> load(Seq<Tuple2<String, String>> config) {
                CachedMQTTClient$.MODULE$.log().debug(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Creating new MQTT client with params: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{config})));
                return CachedMQTTClient$.MODULE$.org$apache$bahir$sql$streaming$mqtt$CachedMQTTClient$$createMqttClient((Map<String, String>)((Map)Predef$.MODULE$.Map().apply((Seq)config.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Tuple2<String, String> apply(Tuple2<String, String> s) {
                        return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(s._1()), s._2());
                    }
                }, Seq$.MODULE$.canBuildFrom()))));
            }
        };
        this.removalListener = new RemovalListener<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>>(){

            public void onRemoval(RemovalNotification<Seq<Tuple2<String, String>>, Tuple2<MqttClient, MqttClientPersistence>> notification) {
                Seq params = (Seq)notification.getKey();
                MqttClient client = (MqttClient)((Tuple2)notification.getValue())._1();
                MqttClientPersistence persistence = (MqttClientPersistence)((Tuple2)notification.getValue())._2();
                CachedMQTTClient$.MODULE$.log().debug(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Evicting MQTT client ", " params: ", ", due to ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{client, params, notification.getCause()})));
                CachedMQTTClient$.MODULE$.org$apache$bahir$sql$streaming$mqtt$CachedMQTTClient$$closeMqttClient((Seq<Tuple2<String, String>>)params, client, persistence);
            }
        };
    }
}

