package org.apache.camel.component.mqtt;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.Promise;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/mqtt/MQTTEndpoint.class */
public class MQTTEndpoint extends DefaultEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(MQTTEndpoint.class);
    private static final int PUBLISH_MAX_RECONNECT_ATTEMPTS = 3;
    private CallbackConnection connection;
    private final MQTTConfiguration configuration;
    private volatile boolean connected;
    private final List<MQTTConsumer> consumers;

    public MQTTEndpoint(String str, MQTTComponent mQTTComponent, MQTTConfiguration mQTTConfiguration) {
        super(str, mQTTComponent);
        this.consumers = new CopyOnWriteArrayList();
        this.configuration = mQTTConfiguration;
    }

    public Consumer createConsumer(Processor processor) throws Exception {
        MQTTConsumer mQTTConsumer = new MQTTConsumer(this, processor);
        configureConsumer(mQTTConsumer);
        return mQTTConsumer;
    }

    public Producer createProducer() throws Exception {
        return new MQTTProducer(this);
    }

    public MQTTConfiguration getConfiguration() {
        return this.configuration;
    }

    protected void doStart() throws Exception {
        super.doStart();
        createConnection();
    }

    protected void createConnection() {
        this.connection = this.configuration.callbackConnection();
        this.connection.listener(new Listener() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.1
            public void onConnected() {
                MQTTEndpoint.this.connected = true;
                MQTTEndpoint.LOG.info("MQTT Connection connected to {}", MQTTEndpoint.this.configuration.getHost());
            }

            public void onDisconnected() {
                MQTTEndpoint.this.connected = false;
                MQTTEndpoint.LOG.debug("MQTT Connection disconnected from {}", MQTTEndpoint.this.configuration.getHost());
            }

            public void onPublish(UTF8Buffer uTF8Buffer, Buffer buffer, Runnable runnable) {
                if (!MQTTEndpoint.this.consumers.isEmpty()) {
                    Exchange createExchange = MQTTEndpoint.this.createExchange();
                    createExchange.getIn().setBody(buffer.toByteArray());
                    createExchange.setProperty(MQTTEndpoint.this.configuration.getMqttTopicPropertyName(), uTF8Buffer.toString());
                    Iterator it = MQTTEndpoint.this.consumers.iterator();
                    while (it.hasNext()) {
                        ((MQTTConsumer) it.next()).processExchange(createExchange);
                    }
                }
                if (runnable != null) {
                    runnable.run();
                }
            }

            public void onFailure(Throwable th) {
                MQTTEndpoint.this.connected = false;
                MQTTEndpoint.LOG.warn("Connection to " + MQTTEndpoint.this.configuration.getHost() + " failure due " + th.getMessage() + ". Forcing a disconnect to re-connect on next attempt.");
                MQTTEndpoint.this.connection.disconnect(new Callback<Void>() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.1.1
                    public void onSuccess(Void r2) {
                    }

                    public void onFailure(Throwable th2) {
                        MQTTEndpoint.LOG.debug("Failed to disconnect from " + MQTTEndpoint.this.configuration.getHost() + ". This exception is ignored.", th2);
                    }
                });
            }
        });
    }

    protected void doStop() throws Exception {
        super.doStop();
        if (this.connection != null) {
            final Promise promise = new Promise();
            this.connection.getDispatchQueue().execute(new Task() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.2
                public void run() {
                    MQTTEndpoint.this.connection.disconnect(new Callback<Void>() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.2.1
                        public void onSuccess(Void r4) {
                            promise.onSuccess(r4);
                        }

                        public void onFailure(Throwable th) {
                            promise.onFailure(th);
                        }
                    });
                }
            });
            promise.await(this.configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connect() throws Exception {
        final Promise promise = new Promise();
        this.connection.connect(new Callback<Void>() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.3
            public void onSuccess(Void r9) {
                MQTTEndpoint.LOG.debug("Connected to {}", MQTTEndpoint.this.configuration.getHost());
                String subscribeTopicName = MQTTEndpoint.this.configuration.getSubscribeTopicName();
                String trim = subscribeTopicName != null ? subscribeTopicName.trim() : null;
                if (trim != null && !trim.isEmpty()) {
                    MQTTEndpoint.this.connection.subscribe(new Topic[]{new Topic(trim, MQTTEndpoint.this.configuration.getQoS())}, new Callback<byte[]>() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.3.1
                        public void onSuccess(byte[] bArr) {
                            promise.onSuccess(bArr);
                            MQTTEndpoint.this.connected = true;
                        }

                        public void onFailure(Throwable th) {
                            promise.onFailure(th);
                            MQTTEndpoint.this.connection.disconnect((Callback) null);
                            MQTTEndpoint.this.connected = false;
                        }
                    });
                } else {
                    promise.onSuccess(r9);
                    MQTTEndpoint.this.connected = true;
                }
            }

            public void onFailure(Throwable th) {
                MQTTEndpoint.LOG.warn("Failed to connect to " + MQTTEndpoint.this.configuration.getHost() + " due " + th.getMessage());
                promise.onFailure(th);
                MQTTEndpoint.this.connection.disconnect((Callback) null);
                MQTTEndpoint.this.connected = false;
            }
        });
        LOG.info("Connecting to {} using {} seconds timeout", this.configuration.getHost(), Integer.valueOf(this.configuration.getConnectWaitInSeconds()));
        promise.await(this.configuration.getConnectWaitInSeconds(), TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConnected() {
        return this.connected;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(final String str, final byte[] bArr, final QoS qoS, final boolean z, final Callback<Void> callback) throws Exception {
        boolean isConnected = isConnected();
        int i = 0;
        TimeoutException timeoutException = null;
        while (!isConnected && i <= PUBLISH_MAX_RECONNECT_ATTEMPTS) {
            i++;
            try {
                LOG.warn("#{} attempt to re-create connection to {} before publishing", Integer.valueOf(i), this.configuration.getHost());
                createConnection();
                connect();
            } catch (TimeoutException e) {
                timeoutException = e;
                LOG.debug("Timed out after {} seconds after {} attempt to re-create connection to {}", new Object[]{Integer.valueOf(this.configuration.getConnectWaitInSeconds()), Integer.valueOf(i), this.configuration.getHost()});
            } catch (Throwable th) {
                callback.onFailure(th);
                return;
            }
            isConnected = isConnected();
        }
        if (i <= PUBLISH_MAX_RECONNECT_ATTEMPTS || isConnected()) {
            this.connection.getDispatchQueue().execute(new Task() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.4
                public void run() {
                    MQTTEndpoint.LOG.debug("Publishing to {}", MQTTEndpoint.this.configuration.getHost());
                    MQTTEndpoint.this.connection.publish(str, bArr, qoS, z, callback);
                }
            });
        } else {
            LOG.warn("Cannot re-connect to {} after {} attempts", this.configuration.getHost(), Integer.valueOf(i));
            callback.onFailure(timeoutException);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addConsumer(MQTTConsumer mQTTConsumer) {
        this.consumers.add(mQTTConsumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeConsumer(MQTTConsumer mQTTConsumer) {
        this.consumers.remove(mQTTConsumer);
    }

    public boolean isSingleton() {
        return true;
    }
}
