/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.manager.datapoint;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.sql.Date;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.openremote.agent.protocol.ProtocolDatapointService;
import org.openremote.container.timer.TimerService;
import org.openremote.container.util.MapAccess;
import org.openremote.manager.asset.AssetProcessingException;
import org.openremote.manager.asset.AssetStorageService;
import org.openremote.manager.asset.OutdatedAttributeEvent;
import org.openremote.manager.datapoint.AbstractDatapointService;
import org.openremote.manager.datapoint.AssetDatapointResourceImpl;
import org.openremote.manager.event.ClientEventService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.manager.web.ManagerWebService;
import org.openremote.model.Container;
import org.openremote.model.asset.Asset;
import org.openremote.model.attribute.Attribute;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.attribute.AttributeRef;
import org.openremote.model.attribute.AttributeWriteFailure;
import org.openremote.model.datapoint.AssetDatapoint;
import org.openremote.model.datapoint.DatapointQueryTooLargeException;
import org.openremote.model.query.AssetQuery;
import org.openremote.model.query.filter.AttributePredicate;
import org.openremote.model.query.filter.NameValuePredicate;
import org.openremote.model.util.Pair;
import org.openremote.model.util.UniqueIdentifierGenerator;
import org.openremote.model.value.MetaHolder;
import org.openremote.model.value.MetaItemType;
import org.openremote.model.value.NameHolder;

public class AssetDatapointService
extends AbstractDatapointService<AssetDatapoint>
implements ProtocolDatapointService {
    public static final String OR_DATA_POINTS_MAX_AGE_DAYS = "OR_DATA_POINTS_MAX_AGE_DAYS";
    public static final int OR_DATA_POINTS_MAX_AGE_DAYS_DEFAULT = 31;
    public static final String OR_DATA_POINTS_EXPORT_LIMIT = "OR_DATA_POINTS_EXPORT_LIMIT";
    public static final int OR_DATA_POINTS_EXPORT_LIMIT_DEFAULT = 1000000;
    private static final Logger LOG = Logger.getLogger(AssetDatapointService.class.getName());
    protected static final String EXPORT_STORAGE_DIR_NAME = "datapoint";
    protected int maxDatapointAgeDays;
    protected int datapointExportLimit;
    protected Path exportPath;

    @Override
    public void init(Container container) throws Exception {
        super.init(container);
        ((ManagerWebService)container.getService(ManagerWebService.class)).addApiSingleton((Object)new AssetDatapointResourceImpl((TimerService)container.getService(TimerService.class), (ManagerIdentityService)container.getService(ManagerIdentityService.class), (AssetStorageService)container.getService(AssetStorageService.class), this));
        this.maxDatapointAgeDays = MapAccess.getInteger((Map)container.getConfig(), (String)OR_DATA_POINTS_MAX_AGE_DAYS, (int)31);
        if (this.maxDatapointAgeDays <= 0) {
            LOG.warning("OR_DATA_POINTS_MAX_AGE_DAYS value is not a valid value so data points won't be auto purged");
        } else {
            LOG.log(Level.INFO, "Data point purge interval days = " + this.maxDatapointAgeDays);
        }
        this.datapointExportLimit = MapAccess.getInteger((Map)container.getConfig(), (String)OR_DATA_POINTS_EXPORT_LIMIT, (int)1000000);
        if (this.datapointExportLimit <= 0) {
            LOG.warning("OR_DATA_POINTS_EXPORT_LIMIT value is not a valid value so the export data points won't be limited");
        } else {
            LOG.log(Level.INFO, "Data point export limit = " + this.datapointExportLimit);
        }
        Path storageDir = this.persistenceService.getStorageDir();
        this.exportPath = storageDir.resolve(EXPORT_STORAGE_DIR_NAME);
        Files.createDirectories(this.exportPath, new FileAttribute[0]);
        if (!this.exportPath.toFile().setWritable(true, false)) {
            LOG.log(Level.WARNING, "Failed to set export dir write flag; data export may not work");
        }
    }

    public void start(Container container) throws Exception {
        if (this.maxDatapointAgeDays > 0) {
            this.dataPointsPurgeScheduledFuture = this.scheduledExecutorService.scheduleAtFixedRate(this::purgeDataPoints, this.getFirstPurgeMillis(this.timerService.getNow()), Duration.ofDays(1L).toMillis(), TimeUnit.MILLISECONDS);
        }
        ClientEventService clientEventService = (ClientEventService)container.getService(ClientEventService.class);
        clientEventService.addSubscription(AttributeEvent.class, null, this::onAttributeEvent);
        clientEventService.addSubscription(OutdatedAttributeEvent.class, null, this::onOutdatedAttributeEvent);
    }

    public static boolean attributeIsStoreDatapoint(MetaHolder attributeInfo) {
        return attributeInfo.getMetaValue(MetaItemType.STORE_DATA_POINTS).orElse(attributeInfo.hasMeta(MetaItemType.AGENT_LINK));
    }

    public void onAttributeEvent(AttributeEvent attributeEvent) {
        if (AssetDatapointService.attributeIsStoreDatapoint((MetaHolder)attributeEvent) && attributeEvent.getValue().isPresent()) {
            try {
                this.upsertValue(attributeEvent.getId(), attributeEvent.getName(), attributeEvent.getValue().orElse(null), Instant.ofEpochMilli(attributeEvent.getTimestamp()).atZone(ZoneId.systemDefault()).toLocalDateTime());
            }
            catch (Exception e) {
                throw new AssetProcessingException(AttributeWriteFailure.STATE_STORAGE_FAILED, "Failed to insert or update asset data point for attribute: " + String.valueOf(attributeEvent), e);
            }
        }
    }

    public void onOutdatedAttributeEvent(OutdatedAttributeEvent outdatedAttributeEvent) {
        this.onAttributeEvent(outdatedAttributeEvent.getEvent());
    }

    @Override
    protected Class<AssetDatapoint> getDatapointClass() {
        return AssetDatapoint.class;
    }

    @Override
    protected String getDatapointTableName() {
        return "asset_datapoint";
    }

    @Override
    protected Logger getLogger() {
        return LOG;
    }

    protected void purgeDataPoints() {
        LOG.info("Running data points purge daily task");
        try {
            List<Asset<?>> assets = this.assetStorageService.findAll(new AssetQuery().attributes(new AttributePredicate[]{new AttributePredicate().meta(new NameValuePredicate[]{new NameValuePredicate((NameHolder)MetaItemType.DATA_POINTS_MAX_AGE_DAYS, null)})}));
            List attributes = assets.stream().map(asset -> asset.getAttributes().stream().filter(assetAttribute -> assetAttribute.hasMeta(MetaItemType.DATA_POINTS_MAX_AGE_DAYS)).map(assetAttribute -> new Pair((Object)asset.getId(), assetAttribute)).collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
            LOG.fine("Purging data points of attributes that use default max age days of " + this.maxDatapointAgeDays);
            this.persistenceService.doTransaction(em -> em.createQuery("delete from AssetDatapoint dp where dp.timestamp < :dt" + this.buildWhereClause(attributes, true)).setParameter("dt", (Object)Date.from(this.timerService.getNow().truncatedTo(ChronoUnit.DAYS).minus(this.maxDatapointAgeDays, ChronoUnit.DAYS))).executeUpdate());
            if (!attributes.isEmpty()) {
                Map<Integer, List<Pair>> ageAttributeRefMap = attributes.stream().collect(Collectors.groupingBy(attributeRef -> ((Attribute)attributeRef.value).getMetaValue(MetaItemType.DATA_POINTS_MAX_AGE_DAYS).orElse(this.maxDatapointAgeDays)));
                ageAttributeRefMap.forEach((age, attrs) -> {
                    LOG.fine("Purging data points of " + attrs.size() + " attributes that use a max age of " + age);
                    try {
                        this.persistenceService.doTransaction(em -> em.createQuery("delete from AssetDatapoint dp where dp.timestamp < :dt" + this.buildWhereClause((List<Pair<String, Attribute<?>>>)attrs, false)).setParameter("dt", (Object)Date.from(this.timerService.getNow().truncatedTo(ChronoUnit.DAYS).minus(age.intValue(), ChronoUnit.DAYS))).executeUpdate());
                    }
                    catch (Exception e) {
                        LOG.log(Level.SEVERE, "An error occurred whilst deleting data points, this should not happen", e);
                    }
                });
            }
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Failed to run data points purge", e);
        }
        try {
            long oneDayMillis = 86400000L;
            File[] obsoleteExports = this.exportPath.toFile().listFiles(file -> file.isFile() && file.getName().endsWith("csv") && file.lastModified() < this.timerService.getCurrentTimeMillis() - oneDayMillis);
            if (obsoleteExports != null) {
                Arrays.stream(obsoleteExports).forEach(file -> {
                    boolean success = false;
                    try {
                        success = file.delete();
                    }
                    catch (SecurityException e) {
                        LOG.log(Level.WARNING, "Cannot access the export file to delete it", e);
                    }
                    if (!success) {
                        LOG.log(Level.WARNING, "Failed to delete obsolete export '" + file.getName() + "'");
                    }
                });
            }
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Failed to purge old exports", e);
        }
    }

    protected String buildWhereClause(List<Pair<String, Attribute<?>>> attributes, boolean negate) {
        if (attributes.isEmpty()) {
            return "";
        }
        String whereStr = attributes.stream().map(attributeRef -> "('" + (String)attributeRef.key + "','" + ((Attribute)attributeRef.value).getName() + "')").collect(Collectors.joining(","));
        return " and (dp.assetId, dp.attributeName) " + (negate ? "not " : "") + "in (" + whereStr + ")";
    }

    public ScheduledFuture<File> exportDatapoints(AttributeRef[] attributeRefs, long fromTimestamp, long toTimestamp) {
        try {
            String query = this.getSelectExportQuery(attributeRefs, fromTimestamp, toTimestamp);
            if (this.canQueryDatapoints(query, null, this.datapointExportLimit)) {
                return this.doExportDatapoints(attributeRefs, fromTimestamp, toTimestamp);
            }
            throw new RuntimeException("Could not export datapoints.");
        }
        catch (DatapointQueryTooLargeException dqex) {
            String msg = "Could not export data points. It exceeds the data limit of " + this.datapointExportLimit + " data points.";
            this.getLogger().log(Level.WARNING, msg, dqex);
            throw dqex;
        }
    }

    protected ScheduledFuture<File> doExportDatapoints(AttributeRef[] attributeRefs, long fromTimestamp, long toTimestamp) {
        return this.scheduledExecutorService.schedule(() -> {
            String fileName = UniqueIdentifierGenerator.generateId() + ".csv";
            StringBuilder sb = new StringBuilder("copy (").append(this.getSelectExportQuery(attributeRefs, fromTimestamp, toTimestamp)).append(") to '/storage/").append(EXPORT_STORAGE_DIR_NAME).append("/").append(fileName).append("' delimiter ',' CSV HEADER;");
            this.persistenceService.doTransaction(em -> em.createNativeQuery(sb.toString()).executeUpdate());
            return this.exportPath.resolve(fileName).toFile();
        }, 0L, TimeUnit.MILLISECONDS);
    }

    protected String getSelectExportQuery(AttributeRef[] attributeRefs, long fromTimestamp, long toTimestamp) {
        return String.format("select ad.timestamp, a.name, ad.attribute_name, value from asset_datapoint ad, asset a where ad.entity_id = a.id and ad.timestamp >= to_timestamp(%d) and ad.timestamp <= to_timestamp(%d) and (", fromTimestamp / 1000L, toTimestamp / 1000L) + Arrays.stream(attributeRefs).map(attributeRef -> String.format("(ad.entity_id = '%s' and ad.attribute_name = '%s')", attributeRef.getId(), attributeRef.getName())).collect(Collectors.joining(" or ")) + ")";
    }
}

