package org.apache.inlong.agent.plugin.sources.reader;

import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/reader/MqttReader.class */
public class MqttReader extends org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttReader.class);
    public static final String JOB_MQTT_USERNAME = "job.mqttJob.userName";
    public static final String JOB_MQTT_PASSWORD = "job.mqttJob.password";
    public static final String JOB_MQTT_SERVER_URI = "job.mqttJob.serverURI";
    public static final String JOB_MQTT_TOPIC = "job.mqttJob.topic";
    public static final String JOB_MQTT_CONNECTION_TIMEOUT = "job.mqttJob.connectionTimeOut";
    public static final String JOB_MQTT_KEEPALIVE_INTERVAL = "job.mqttJob.keepAliveInterval";
    public static final String JOB_MQTT_QOS = "job.mqttJob.qos";
    public static final String JOB_MQTT_CLEAN_SESSION = "job.mqttJob.cleanSession";
    public static final String JOB_MQTT_CLIENT_ID_PREFIX = "job.mqttJob.clientIdPrefix";
    public static final String JOB_MQTT_QUEUE_SIZE = "job.mqttJob.queueSize";
    public static final String JOB_MQTT_AUTOMATIC_RECONNECT = "job.mqttJob.automaticReconnect";
    public static final String JOB_MQTT_VERSION = "job.mqttJob.mqttVersion";
    private MqttClient client;
    private MqttConnectOptions options;
    private String serverURI;
    private String userName;
    private String password;
    private String topic;
    private int qos;
    private InstanceProfile jobProfile;
    private String instanceId;
    private String clientId;
    private LinkedBlockingQueue<DefaultMessage> mqttMessagesQueue;
    private boolean finished = false;
    private boolean destroyed = false;
    private boolean cleanSession = false;
    private boolean automaticReconnect = true;
    private int mqttVersion = 0;

    public MqttReader(String str) {
        this.topic = str;
    }

    private void setGlobalParamsValue(InstanceProfile instanceProfile) {
        this.mqttMessagesQueue = new LinkedBlockingQueue<>(instanceProfile.getInt(JOB_MQTT_QUEUE_SIZE, LogFileCollectTask.CORE_THREAD_SLEEP_TIME));
        this.instanceId = instanceProfile.getInstanceId();
        this.userName = instanceProfile.get(JOB_MQTT_USERNAME);
        this.password = instanceProfile.get(JOB_MQTT_PASSWORD);
        this.serverURI = instanceProfile.get(JOB_MQTT_SERVER_URI);
        this.topic = instanceProfile.get("job.mqttJob.topic");
        this.clientId = instanceProfile.get(JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_" + UUID.randomUUID();
        this.cleanSession = instanceProfile.getBoolean(JOB_MQTT_CLEAN_SESSION, false);
        this.automaticReconnect = instanceProfile.getBoolean(JOB_MQTT_AUTOMATIC_RECONNECT, true);
        this.qos = instanceProfile.getInt(JOB_MQTT_QOS, 1);
        this.mqttVersion = instanceProfile.getInt(JOB_MQTT_VERSION, 0);
        this.options = new MqttConnectOptions();
        this.options.setCleanSession(this.cleanSession);
        this.options.setConnectionTimeout(instanceProfile.getInt(JOB_MQTT_CONNECTION_TIMEOUT, 10));
        this.options.setKeepAliveInterval(instanceProfile.getInt(JOB_MQTT_KEEPALIVE_INTERVAL, 20));
        this.options.setUserName(this.userName);
        this.options.setPassword(this.password.toCharArray());
        this.options.setAutomaticReconnect(this.automaticReconnect);
        this.options.setMqttVersion(this.mqttVersion);
    }

    private void connect() {
        try {
            synchronized (MqttReader.class) {
                this.client = new MqttClient(this.serverURI, this.clientId, new MemoryPersistence());
                this.client.setCallback(new MqttCallback() { // from class: org.apache.inlong.agent.plugin.sources.reader.MqttReader.1
                    public void connectionLost(Throwable th) {
                        MqttReader.LOGGER.error("the mqtt jobId:{}, serverURI:{}, connection lost, {} ", new Object[]{MqttReader.this.instanceId, MqttReader.this.serverURI, th});
                        MqttReader.this.reconnect();
                    }

                    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                        HashMap hashMap = new HashMap();
                        hashMap.put("record.topic", str);
                        hashMap.put("record.messageId", String.valueOf(mqttMessage.getId()));
                        hashMap.put("record.qos", String.valueOf(mqttMessage.getQos()));
                        MqttReader.this.mqttMessagesQueue.put(new DefaultMessage(mqttMessage.getPayload(), hashMap));
                        AuditUtils.add(3, MqttReader.this.inlongGroupId, MqttReader.this.inlongStreamId, System.currentTimeMillis(), 1, r0.length);
                        MqttReader.this.readerMetric.pluginReadSuccessCount.incrementAndGet();
                        MqttReader.this.readerMetric.pluginReadCount.incrementAndGet();
                    }

                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    }
                });
                this.client.connect(this.options);
                this.client.subscribe(this.topic, this.qos);
            }
            LOGGER.info("the mqtt subscribe topic is [{}], qos is [{}]", this.topic, Integer.valueOf(this.qos));
        } catch (Exception e) {
            LOGGER.error("init mqtt client error {}. jobId:{},serverURI:{},clientId:{}", new Object[]{e, this.instanceId, this.serverURI, this.clientId});
        }
    }

    @Override // org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader
    public void init(InstanceProfile instanceProfile) {
        super.init(instanceProfile);
        this.jobProfile = instanceProfile;
        LOGGER.info("init mqtt reader with jobConf {}", instanceProfile.toJsonStr());
        setGlobalParamsValue(instanceProfile);
        connect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnect() {
        if (this.client.isConnected()) {
            return;
        }
        try {
            this.client.connect(this.options);
            LOGGER.info("the mqtt client reconnect success. jobId:{}, serverURI:{}, clientId:{}", new Object[]{this.instanceId, this.serverURI, this.clientId});
        } catch (Exception e) {
            LOGGER.error("reconnect mqtt client error {}. jobId:{}, serverURI:{}, clientId:{}", new Object[]{e, this.instanceId, this.serverURI, this.clientId});
        }
    }

    public Message read() {
        if (this.mqttMessagesQueue.isEmpty()) {
            return null;
        }
        return getMqttMessage();
    }

    private DefaultMessage getMqttMessage() {
        return this.mqttMessagesQueue.poll();
    }

    public boolean isFinished() {
        return this.finished;
    }

    public String getReadSource() {
        return this.instanceId;
    }

    public void setReadTimeout(long j) {
    }

    public void setWaitMillisecond(long j) {
    }

    public String getSnapshot() {
        return "";
    }

    public void finishRead() {
        this.finished = true;
    }

    public boolean isSourceExist() {
        return true;
    }

    private void disconnect() {
        try {
            this.client.disconnect();
        } catch (MqttException e) {
            LOGGER.error("disconnect mqtt client error {}. jobId:{},serverURI:{},clientId:{}", new Object[]{e, this.instanceId, this.serverURI, this.clientId});
        }
    }

    public void setReadSource(String str) {
        this.instanceId = str;
    }

    public void destroy() {
        synchronized (this) {
            if (!this.destroyed) {
                disconnect();
                this.destroyed = true;
            }
        }
    }
}
