package net.solarnetwork.node.upload.mqtt;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeCreator;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.solarnetwork.codec.JsonUtils;
import net.solarnetwork.common.mqtt.BaseMqttConnectionService;
import net.solarnetwork.common.mqtt.BasicMqttMessage;
import net.solarnetwork.common.mqtt.BasicMqttProperty;
import net.solarnetwork.common.mqtt.MqttBasicCount;
import net.solarnetwork.common.mqtt.MqttConnection;
import net.solarnetwork.common.mqtt.MqttConnectionFactory;
import net.solarnetwork.common.mqtt.MqttConnectionObserver;
import net.solarnetwork.common.mqtt.MqttMessage;
import net.solarnetwork.common.mqtt.MqttMessageHandler;
import net.solarnetwork.common.mqtt.MqttPropertyType;
import net.solarnetwork.common.mqtt.MqttQos;
import net.solarnetwork.common.mqtt.MqttVersion;
import net.solarnetwork.domain.InstructionStatus;
import net.solarnetwork.domain.datum.BasicStreamDatum;
import net.solarnetwork.domain.datum.Datum;
import net.solarnetwork.domain.datum.DatumProperties;
import net.solarnetwork.domain.datum.ObjectDatumKind;
import net.solarnetwork.domain.datum.ObjectDatumStreamMetadata;
import net.solarnetwork.node.domain.datum.NodeDatum;
import net.solarnetwork.node.reactor.BasicInstruction;
import net.solarnetwork.node.reactor.BasicInstructionStatus;
import net.solarnetwork.node.reactor.Instruction;
import net.solarnetwork.node.reactor.InstructionExecutionService;
import net.solarnetwork.node.reactor.InstructionStatus;
import net.solarnetwork.node.reactor.ReactorService;
import net.solarnetwork.node.service.DatumEvents;
import net.solarnetwork.node.service.DatumMetadataService;
import net.solarnetwork.node.service.IdentityService;
import net.solarnetwork.node.service.UploadService;
import net.solarnetwork.service.OptionalService;
import net.solarnetwork.service.PingTest;
import net.solarnetwork.service.PingTestResult;
import net.solarnetwork.settings.SettingSpecifier;
import net.solarnetwork.settings.SettingSpecifierProvider;
import net.solarnetwork.settings.support.BasicTitleSettingSpecifier;
import net.solarnetwork.util.StatTracker;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.LoggerFactory;
import org.springframework.context.MessageSource;
import org.springframework.util.DigestUtils;

/* loaded from: input_file:net/solarnetwork/node/upload/mqtt/MqttUploadService.class */
public class MqttUploadService extends BaseMqttConnectionService implements UploadService, MqttMessageHandler, MqttConnectionObserver, SettingSpecifierProvider {
    public static final String JSON_MIME_TYPE = "application/json";
    public static final String NODE_INSTRUCTION_TOPIC_TEMPLATE = "node/%s/instr";
    public static final String NODE_DATUM_TOPIC_TEMPLATE = "node/%s/datum";
    public static final boolean DEFAULT_INCLUDE_VERSION_TAG = true;
    public static final String TAG_VERSION_2 = "_v2";
    public static final MqttVersion DEFAULT_MQTT_VERSION = MqttVersion.Mqtt5;
    public static final String LOG_SOURCE_ID = "log";
    public static final String LOG_SOURCE_ID_PREFIX = "log/";
    private final ObjectMapper objectMapper;
    private final IdentityService identityService;
    private final OptionalService<ReactorService> reactorServiceOpt;
    private final OptionalService<InstructionExecutionService> instructionExecutionServiceOpt;
    private final OptionalService<EventAdmin> eventAdminOpt;
    private final OptionalService<DatumMetadataService> datumMetadataServiceOpt;
    private Executor executor;
    private boolean includeVersionTag;
    private CompletableFuture<?> startupFuture;

    public MqttUploadService(MqttConnectionFactory mqttConnectionFactory, ObjectMapper objectMapper, IdentityService identityService, OptionalService<ReactorService> optionalService, OptionalService<InstructionExecutionService> optionalService2, OptionalService<EventAdmin> optionalService3, OptionalService<DatumMetadataService> optionalService4) {
        super(mqttConnectionFactory, new StatTracker("SolarIn/MQTT", (String) null, LoggerFactory.getLogger(MqttUploadService.class), 100));
        this.includeVersionTag = true;
        this.objectMapper = objectMapper;
        this.identityService = identityService;
        this.reactorServiceOpt = optionalService;
        this.instructionExecutionServiceOpt = optionalService2;
        this.eventAdminOpt = optionalService3;
        this.datumMetadataServiceOpt = optionalService4;
        setPublishQos(MqttQos.AtLeastOnce);
        getMqttConfig().setUid("SolarIn/MQTT");
        getMqttConfig().setVersion(DEFAULT_MQTT_VERSION);
        getMqttConfig().getProperties().addProperty(new BasicMqttProperty(MqttPropertyType.TOPIC_ALIAS_MAXIMUM, 7));
        setDisplayName("SolarIn/MQTT");
    }

    public String getDisplayName() {
        URI serverUri = getMqttConfig().getServerUri();
        return serverUri == null ? super.getDisplayName() : String.format("%s @ %s", super.getDisplayName(), serverUri);
    }

    public synchronized Future<?> startup() {
        if (this.startupFuture != null) {
            return this.startupFuture;
        }
        final Executor executor = this.executor;
        final CompletableFuture<?> completableFuture = new CompletableFuture<>();
        Runnable runnable = new Runnable() { // from class: net.solarnetwork.node.upload.mqtt.MqttUploadService.1
            @Override // java.lang.Runnable
            public void run() {
                String mqttClientId = MqttUploadService.this.getMqttClientId();
                URI mqttUri = MqttUploadService.this.getMqttUri();
                if (mqttClientId == null || mqttUri == null) {
                    MqttUploadService.this.log.info("Node ID or SolarIn/MQTT URI not available yet, waiting to try to connect to SolarIn/MQTT.");
                    try {
                        Thread.sleep(60000L);
                    } catch (InterruptedException e) {
                    }
                    if (executor != null) {
                        executor.execute(this);
                        return;
                    } else {
                        run();
                        return;
                    }
                }
                MqttUploadService.this.getMqttConfig().setClientId(mqttClientId);
                MqttUploadService.this.getMqttConfig().setServerUri(mqttUri);
                try {
                    try {
                        MqttUploadService.super.startup().get();
                        completableFuture.complete(null);
                        synchronized (MqttUploadService.this) {
                            if (MqttUploadService.this.startupFuture == completableFuture) {
                                MqttUploadService.this.startupFuture = null;
                            }
                        }
                    } catch (Throwable th) {
                        synchronized (MqttUploadService.this) {
                            if (MqttUploadService.this.startupFuture == completableFuture) {
                                MqttUploadService.this.startupFuture = null;
                            }
                            throw th;
                        }
                    }
                } catch (Exception e2) {
                    completableFuture.completeExceptionally(e2);
                    synchronized (MqttUploadService.this) {
                        if (MqttUploadService.this.startupFuture == completableFuture) {
                            MqttUploadService.this.startupFuture = null;
                        }
                    }
                }
            }
        };
        if (executor != null) {
            executor.execute(runnable);
        } else {
            runnable.run();
        }
        this.startupFuture = completableFuture;
        return completableFuture;
    }

    public synchronized void shutdown() {
        if (this.startupFuture != null) {
            try {
                this.startupFuture.cancel(true);
            } catch (Exception e) {
            }
            this.startupFuture = null;
        }
        super.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getMqttClientId() {
        Long nodeId = this.identityService.getNodeId();
        if (nodeId != null) {
            return nodeId.toString();
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public URI getMqttUri() {
        try {
            return new URI(this.identityService.getSolarInMqttUrl());
        } catch (NullPointerException e) {
            return null;
        } catch (URISyntaxException e2) {
            this.log.error("Invalid MQTT URL: " + this.identityService.getSolarInMqttUrl());
            return null;
        }
    }

    public String getKey() {
        return "MqttUploadService:" + getMqttConfig().getServerUri();
    }

    public String uploadDatum(NodeDatum nodeDatum) {
        MqttConnection connection;
        Throwable th;
        ObjectDatumStreamMetadata datumStreamMetadata;
        Long nodeId = this.identityService.getNodeId();
        DatumMetadataService datumMetadataService = (DatumMetadataService) OptionalService.service(this.datumMetadataServiceOpt);
        if (nodeId == null || (connection = connection()) == null) {
            return null;
        }
        String format = String.format(NODE_DATUM_TOPIC_TEMPLATE, nodeId);
        try {
            byte[] bArr = null;
            ObjectNode valueToTree = this.objectMapper.valueToTree(nodeDatum);
            ObjectDatumKind kind = nodeDatum.getKind();
            Long objectId = kind == ObjectDatumKind.Node ? nodeId : nodeDatum.getObjectId();
            if (datumMetadataService != null && (datumStreamMetadata = datumMetadataService.getDatumStreamMetadata(kind, objectId, nodeDatum.getSourceId())) != null) {
                try {
                    DatumProperties propertiesFrom = DatumProperties.propertiesFrom(nodeDatum, datumStreamMetadata);
                    if (propertiesFrom != null) {
                        bArr = this.objectMapper.writeValueAsBytes(new BasicStreamDatum(datumStreamMetadata.getStreamId(), nodeDatum.getTimestamp(), propertiesFrom));
                    }
                } catch (IllegalArgumentException e) {
                    if (canLogForDatum(nodeDatum)) {
                        this.log.debug("Unable to post datum as stream datum, falling back to general datum: " + e.getMessage());
                    }
                }
            }
            if (bArr == null) {
                if (this.includeVersionTag) {
                    JsonNode path = valueToTree.path("t");
                    JsonNode jsonNode = null;
                    if (path.isArray()) {
                        jsonNode = (ArrayNode) path;
                    } else if (path.isNull() || path.isMissingNode()) {
                        jsonNode = ((JsonNodeCreator) valueToTree).arrayNode(1);
                        valueToTree.set("t", jsonNode);
                    }
                    if (jsonNode != null) {
                        boolean z = false;
                        Iterator it = jsonNode.iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                break;
                            }
                            if (TAG_VERSION_2.equals(((JsonNode) it.next()).textValue())) {
                                z = true;
                                break;
                            }
                        }
                        if (!z) {
                            jsonNode.add(TAG_VERSION_2);
                        }
                    }
                }
                bArr = this.objectMapper.writeValueAsBytes(valueToTree);
            }
            if (bArr != null && bArr.length > 0) {
                connection.publish(new BasicMqttMessage(format, false, getPublishQos(), bArr)).get(getMqttConfig().getConnectTimeoutSeconds(), TimeUnit.SECONDS);
                getMqttStats().increment(kind == ObjectDatumKind.Location ? SolarInCountStat.LocationDatumPosted : SolarInCountStat.NodeDatumPosted);
                postDatumUploadedEvent(nodeDatum, valueToTree);
                if (canLogForDatum(nodeDatum)) {
                    this.log.info("Uploaded datum via MQTT: {}", nodeDatum);
                }
            }
            return DigestUtils.md5DigestAsHex(String.format("%tQ;%s;%d;%s", nodeDatum.getTimestamp(), nodeDatum.getSourceId(), objectId, kind).getBytes());
        } catch (IOException | InterruptedException | ExecutionException | TimeoutException e2) {
            if (!canLogForDatum(nodeDatum)) {
                return null;
            }
            Throwable th2 = e2;
            while (true) {
                th = th2;
                if (th.getCause() == null) {
                    break;
                }
                th2 = th.getCause();
            }
            String message = th instanceof TimeoutException ? "timeout" : th.getMessage();
            if (this.log.isDebugEnabled()) {
                this.log.warn("Error posting datum {} via MQTT @ {}, falling back to batch mode", new Object[]{nodeDatum, getMqttConfig().getServerUri(), e2});
                return null;
            }
            this.log.warn("Error posting datum {} via MQTT @ {}, falling back to batch mode: {}", new Object[]{nodeDatum, getMqttConfig().getServerUri(), message});
            return null;
        }
    }

    private static boolean canLogForDatum(NodeDatum nodeDatum) {
        return (LOG_SOURCE_ID.equalsIgnoreCase(nodeDatum.getSourceId()) || nodeDatum.getSourceId().startsWith(LOG_SOURCE_ID_PREFIX)) ? false : true;
    }

    private void postDatumUploadedEvent(Datum datum, JsonNode jsonNode) {
        postEvent(DatumEvents.datumEvent("net/solarnetwork/node/service/UploadService/DATUM_UPLOADED", datum.getClass(), JsonUtils.getStringMapFromTree(jsonNode)));
    }

    private void postEvent(Event event) {
        EventAdmin eventAdmin = this.eventAdminOpt != null ? (EventAdmin) this.eventAdminOpt.service() : null;
        if (eventAdmin == null || event == null) {
            return;
        }
        eventAdmin.postEvent(event);
    }

    private boolean publishInstructionAck(MqttConnection mqttConnection, Long l, Instruction instruction) {
        Throwable th;
        if (mqttConnection == null || l == null || instruction == null || instruction.getStatus() == null) {
            return false;
        }
        ReactorService reactorService = (ReactorService) OptionalService.service(this.reactorServiceOpt);
        String format = String.format(NODE_DATUM_TOPIC_TEMPLATE, l);
        InstructionStatus status = instruction.getStatus();
        try {
            mqttConnection.publish(new BasicMqttMessage(format, false, getPublishQos(), this.objectMapper.writeValueAsBytes(status))).get(getMqttConfig().getConnectTimeoutSeconds(), TimeUnit.SECONDS);
            getMqttStats().increment(SolarInCountStat.InstructionStatusPosted);
            this.log.info("Posted Instruction {} [{}] acknowledgement status: {}", new Object[]{status.getInstructionId(), instruction.getTopic(), instruction.getInstructionState()});
            return true;
        } catch (Exception e) {
            Throwable th2 = e;
            while (true) {
                th = th2;
                if (th.getCause() == null) {
                    break;
                }
                th2 = th.getCause();
            }
            String message = th instanceof TimeoutException ? "timeout" : e.getMessage();
            if (reactorService != null) {
                this.log.warn("Error posting instruction status {} via MQTT @ {}, will defer acknowledgement: {}", new Object[]{instruction, getMqttConfig().getServerUri(), message});
                return false;
            }
            this.log.error("Error posting instruction status {} via MQTT @ {}: {}", new Object[]{instruction, getMqttConfig().getServerUri(), message});
            return false;
        }
    }

    private void postInstructionAck(ReactorService reactorService, MqttConnection mqttConnection, Long l, Instruction instruction) {
        if (instruction == null) {
            return;
        }
        if (mqttConnection == null || l == null) {
            if (reactorService != null) {
                reactorService.storeInstruction(instruction);
            }
        } else if (!publishInstructionAck(mqttConnection, l, instruction)) {
            if (reactorService != null) {
                reactorService.storeInstruction(instruction);
            }
        } else {
            if (reactorService == null || instruction.getId() == null || instruction.getStatus() == null) {
                return;
            }
            reactorService.storeInstruction(new BasicInstruction(instruction, instruction.getStatus().newCopyWithAcknowledgedState(instruction.getStatus().getInstructionState())));
        }
    }

    public void onMqttMessage(MqttMessage mqttMessage) {
        Executor executor = this.executor;
        if (executor != null) {
            executor.execute(() -> {
                handleMqttMessage(mqttMessage);
            });
        } else {
            handleMqttMessage(mqttMessage);
        }
    }

    private void handleMqttMessage(MqttMessage mqttMessage) {
        net.solarnetwork.domain.Instruction instruction;
        InstructionStatus processInstruction;
        String topic = mqttMessage.getTopic();
        ReactorService reactorService = (ReactorService) OptionalService.service(this.reactorServiceOpt);
        if (reactorService == null) {
            return;
        }
        InstructionExecutionService instructionExecutionService = (InstructionExecutionService) OptionalService.service(this.instructionExecutionServiceOpt);
        String solarInBaseUrl = this.identityService.getSolarInBaseUrl();
        try {
            JsonNode path = this.objectMapper.readTree(mqttMessage.getPayload()).path("instructions");
            if (path == null || !path.isArray()) {
                return;
            }
            MqttConnection connection = connection();
            Long nodeId = this.identityService.getNodeId();
            Iterator it = path.iterator();
            while (it.hasNext()) {
                JsonNode jsonNode = (JsonNode) it.next();
                try {
                    instruction = (net.solarnetwork.domain.Instruction) this.objectMapper.treeToValue(jsonNode, net.solarnetwork.domain.Instruction.class);
                } catch (Exception e) {
                    this.log.warn("Unable to accept instruction JSON [{}]: {}", jsonNode, e.toString());
                }
                if (instruction != null) {
                    Instruction from = BasicInstruction.from(instruction, solarInBaseUrl);
                    if (this.log.isInfoEnabled()) {
                        this.log.info("Instruction {} {} received with parameters: {}", new Object[]{from.getId(), from.getTopic(), from.getParameterMap()});
                    }
                    getMqttStats().increment(SolarInCountStat.InstructionsReceived);
                    Instant executionDate = from.getExecutionDate();
                    boolean z = executionDate != null && executionDate.isAfter(Instant.now());
                    if (instructionExecutionService == null || z) {
                        processInstruction = reactorService.processInstruction(from);
                        if (z) {
                            this.log.info("Deferred instruction {} {} saved as {} state for execution @ {}", new Object[]{from.getId(), from.getTopic(), processInstruction.getInstructionState(), executionDate});
                        }
                    } else {
                        from = new BasicInstruction(from, new BasicInstructionStatus(from.getId(), InstructionStatus.InstructionState.Executing, Instant.now()));
                        reactorService.storeInstruction(from);
                        publishInstructionAck(connection, nodeId, from);
                        processInstruction = instructionExecutionService.executeInstruction(from);
                        if (processInstruction == null) {
                            this.log.info("No handler available for instruction {} {}: deferring to Received state", from.getId(), from.getTopic());
                            processInstruction = new BasicInstructionStatus(from.getId(), InstructionStatus.InstructionState.Received, Instant.now());
                        }
                    }
                    if (processInstruction == null) {
                        processInstruction = new BasicInstructionStatus(from.getId(), InstructionStatus.InstructionState.Declined, Instant.now());
                    }
                    BasicInstruction basicInstruction = new BasicInstruction(from, processInstruction);
                    reactorService.storeInstruction(basicInstruction);
                    postInstructionAck(reactorService, connection, nodeId, basicInstruction);
                }
            }
        } catch (IOException | RuntimeException e2) {
            this.log.error("Error handling MQTT message on topic {}", topic, e2);
        }
    }

    public String getPingTestName() {
        return getDisplayName();
    }

    public PingTest.Result performPingTest() throws Exception {
        PingTest.Result performPingTest = super.performPingTest();
        LinkedHashMap linkedHashMap = new LinkedHashMap(8);
        if (performPingTest.getProperties() != null) {
            linkedHashMap.putAll(performPingTest.getProperties());
        }
        linkedHashMap.putAll(getMqttStats().allCounts());
        return new PingTestResult(performPingTest.isSuccess(), performPingTest.getMessage(), linkedHashMap);
    }

    public void onMqttServerConnectionLost(MqttConnection mqttConnection, boolean z, Throwable th) {
    }

    public void onMqttServerConnectionEstablished(MqttConnection mqttConnection, boolean z) {
        Throwable th;
        Long nodeId = this.identityService.getNodeId();
        if (nodeId == null) {
            return;
        }
        String format = String.format(NODE_INSTRUCTION_TOPIC_TEMPLATE, nodeId);
        try {
            mqttConnection.subscribe(format, getSubscribeQos(), (MqttMessageHandler) null).get(getMqttConfig().getConnectTimeoutSeconds(), TimeUnit.SECONDS);
        } catch (Exception e) {
            Throwable th2 = e;
            while (true) {
                th = th2;
                if (th.getCause() == null) {
                    break;
                } else {
                    th2 = th.getCause();
                }
            }
            this.log.error("Error subscribing to MQTT topic {} @ {}: {}", new Object[]{format, getMqttConfig().getServerUri(), th instanceof TimeoutException ? "timeout" : th.getMessage()});
        }
    }

    public String getSettingUid() {
        return "net.solarnetwork.node.upload.mqtt";
    }

    public List<SettingSpecifier> getSettingSpecifiers() {
        return Collections.singletonList(new BasicTitleSettingSpecifier("status", getStatusMessage(), true, true));
    }

    private String getStatusMessage() {
        MqttConnection connection = connection();
        boolean isEstablished = connection != null ? connection.isEstablished() : false;
        MessageSource messageSource = getMessageSource();
        Object[] objArr = new Object[1];
        objArr[0] = isEstablished ? "connected" : "disconnected";
        String message = messageSource.getMessage(String.format("status.%s", objArr), (Object[]) null, Locale.getDefault());
        URI mqttUri = getMqttUri();
        StatTracker mqttStats = getMqttStats();
        MessageSource messageSource2 = getMessageSource();
        Object[] objArr2 = new Object[8];
        objArr2[0] = message;
        objArr2[1] = mqttUri != null ? mqttUri : "N/A";
        objArr2[2] = Long.valueOf(mqttStats.get(SolarInCountStat.NodeDatumPosted));
        objArr2[3] = Long.valueOf(mqttStats.get(SolarInCountStat.LocationDatumPosted));
        objArr2[4] = Long.valueOf(mqttStats.get(MqttBasicCount.PayloadBytesDelivered));
        objArr2[5] = Long.valueOf(mqttStats.get(SolarInCountStat.InstructionsReceived));
        objArr2[6] = Long.valueOf(mqttStats.get(SolarInCountStat.InstructionStatusPosted));
        objArr2[7] = Long.valueOf(mqttStats.get(MqttBasicCount.PayloadBytesReceived));
        return messageSource2.getMessage("status.msg", objArr2, Locale.getDefault());
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }

    public boolean isIncludeVersionTag() {
        return this.includeVersionTag;
    }

    public void setIncludeVersionTag(boolean z) {
        this.includeVersionTag = z;
    }

    public void setMqttVersion(MqttVersion mqttVersion) {
        getMqttConfig().setVersion(mqttVersion != null ? mqttVersion : DEFAULT_MQTT_VERSION);
    }
}
