/*
 * Decompiled with CFR 0.152.
 */
package de.dentrassi.flow.component.csv;

import de.dentrassi.flow.ComponentContext;
import de.dentrassi.flow.spi.DataPlugOut;
import de.dentrassi.flow.spi.component.AbstractComponent;
import de.dentrassi.flow.spi.component.EventContext;
import de.dentrassi.flow.spi.component.ValueRequest;
import de.dentrassi.flow.spi.component.ValueResult;
import io.glutamate.time.Durations;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CsvTimeSeriesReader
extends AbstractComponent {
    private static final Logger logger = LoggerFactory.getLogger(CsvTimeSeriesReader.class);
    private ComponentContext.SharedResource<ScheduledExecutorService> executor;
    private String file;
    private String timestampColumn;
    private CSVParser parser;
    private Iterator<CSVRecord> iterator;
    private CSVRecord record;
    private CSVRecord nextRecord;
    private TimeUnit timestampUnit;
    private ComponentContext context;
    private Long durationMultipliedBy;
    private Long durationDividedBy;
    private Instant nextUpdate;

    public CsvTimeSeriesReader() {
        this.registerTriggerIn("open", this::open);
        this.registerTriggerIn("close", this::close);
        this.registerDataIn("file", String.class, () -> this.getInitializer("file"), () -> null, this::setFile);
        this.registerDataIn("timestampColumn", String.class, () -> this.getInitializer("timestampColumn"), () -> null, this::setTimestampColumn);
        this.registerDataIn("timestampUnit", String.class, () -> this.getInitializer("timestampUnit", "MS"), () -> null, this::setTimestampUnit);
        this.registerDataIn("durationMultipliedBy", Long.class, () -> this.getInitializerLong("durationMultipliedBy", null), () -> null, this::setDurationMultipliedBy);
        this.registerDataIn("durationDividedBy", Long.class, () -> this.getInitializerLong("durationDividedBy", null), () -> null, this::setDurationDividedBy);
        this.registerDataOut("record", this::getRecord);
        this.registerTriggerOut("updated");
        this.registerTriggerOut("completed");
    }

    public void start(Map<String, String> initializers, ComponentContext context, EventContext event) {
        super.start(initializers, context, event);
        this.context = context;
        this.executor = context.createSharedResource(((Object)((Object)this)).getClass().getName(), "executor", ScheduledExecutorService.class, () -> Executors.newSingleThreadScheduledExecutor(), ExecutorService::shutdown);
    }

    public void stop() {
        this.executor.close();
        super.stop();
    }

    private void setFile(String file) {
        this.file = file;
    }

    private void setTimestampColumn(String timestampColumn) {
        this.timestampColumn = timestampColumn;
    }

    private void setDurationMultipliedBy(Long durationMultipliedBy) {
        this.durationMultipliedBy = durationMultipliedBy;
    }

    private void setDurationDividedBy(Long durationDividedBy) {
        this.durationDividedBy = durationDividedBy;
    }

    private void setTimestampUnit(String timestampUnit) {
        if (timestampUnit == null || timestampUnit.isEmpty()) {
            this.timestampUnit = TimeUnit.SECONDS;
        } else {
            switch (timestampUnit.toUpperCase()) {
                case "MICRO": 
                case "MICROS": {
                    this.timestampUnit = TimeUnit.MICROSECONDS;
                    break;
                }
                case "MS": 
                case "MILLIS": {
                    this.timestampUnit = TimeUnit.MILLISECONDS;
                    break;
                }
                case "S": 
                case "SEC": {
                    this.timestampUnit = TimeUnit.SECONDS;
                    break;
                }
                case "MIN": {
                    this.timestampUnit = TimeUnit.MINUTES;
                    break;
                }
                default: {
                    this.timestampUnit = TimeUnit.valueOf(timestampUnit.toUpperCase());
                }
            }
        }
    }

    public void open() {
        logger.info("Opening CSV");
        this.updateAllData();
        if (this.file == null) {
            logger.debug("No file configured");
            return;
        }
        Path path = Paths.get(this.file, new String[0]);
        if (!Files.isReadable(path)) {
            logger.info("File {} is not readable", (Object)this.file);
            return;
        }
        try {
            this.parser = CSVParser.parse((Path)path, (Charset)StandardCharsets.UTF_8, (CSVFormat)CSVFormat.DEFAULT.withFirstRecordAsHeader());
            this.iterator = this.parser.iterator();
            if (this.iterator.hasNext()) {
                this.nextRecord = this.iterator.next();
                this.readNext();
            } else {
                this.fireEndOfFile();
            }
        }
        catch (IOException e) {
            logger.info("Failed to open reader", (Throwable)e);
        }
    }

    public void close() {
        this.iterator = null;
        if (this.parser != null) {
            try {
                this.parser.close();
            }
            catch (IOException e) {
                logger.info("Failed to close reader", (Throwable)e);
            }
            this.parser = null;
        }
    }

    protected void readNext() {
        Duration duration;
        if (this.parser == null) {
            return;
        }
        this.setCurrentRecord(this.nextRecord);
        this.fireUpdated();
        if (!this.iterator.hasNext()) {
            this.nextRecord = null;
            logger.info("Reached end of input");
            this.fireEndOfFile();
            return;
        }
        this.nextRecord = this.iterator.next();
        try {
            Instant now = Instant.ofEpochMilli(this.fromRecord(this.record));
            Instant next = Instant.ofEpochMilli(this.fromRecord(this.nextRecord));
            duration = Duration.between(now, next);
        }
        catch (Exception e) {
            logger.info("Failed to parse timstamps", (Throwable)e);
            duration = Duration.ofMillis(1L);
        }
        this.triggerNextRead(duration);
    }

    private void triggerNextRead(Duration originalDuration) {
        Duration duration = originalDuration;
        if (duration.isNegative()) {
            duration = Duration.ZERO;
            logger.debug("Zeroed negative duration");
        } else {
            duration = originalDuration;
            if (this.durationMultipliedBy != null) {
                duration = duration.multipliedBy(this.durationMultipliedBy);
            }
            if (this.durationDividedBy != null) {
                duration = duration.dividedBy(this.durationDividedBy);
            }
            logger.debug("Applied transformations: {} -> {}", (Object)originalDuration, (Object)duration);
        }
        logger.debug("Schedule next read in: {}", (Object)duration);
        Instant now = Instant.now();
        if (this.nextUpdate != null) {
            Duration diff = Duration.between(this.nextUpdate, now);
            Duration corrected = duration.minus(diff);
            logger.debug("Correcting duration - {} -> {} (diff: {})", new Object[]{duration, corrected, diff});
            duration = corrected;
        }
        this.nextUpdate = now.plus(duration);
        Durations.consume((Duration)duration, (delay, unit) -> ((ScheduledExecutorService)this.executor.get()).schedule(() -> this.context.run(this::readNext), delay, unit));
    }

    private void setCurrentRecord(CSVRecord record) {
        logger.debug("Current record: {}", (Object)record);
        this.record = record;
    }

    private long fromRecord(CSVRecord record) throws Exception {
        try {
            long ts = Long.parseLong(record.get(this.timestampColumn));
            logger.trace("Raw TS value: {}", (Object)ts);
            return TimeUnit.MILLISECONDS.convert(ts, this.timestampUnit);
        }
        catch (Exception e) {
            throw new Exception("Failed to parse timestamp", e);
        }
    }

    private void fireUpdated() {
        this.triggerOut("updated");
    }

    private void fireEndOfFile() {
        this.triggerOut("completed");
    }

    public void connectDataOut(String portName, DataPlugOut plug) {
        if (portName.startsWith("record/") && this.getDataPortOut(portName) == null) {
            String name = portName.substring("record/".length());
            this.registerDataOut(portName, () -> this.getRecord(name));
        }
        super.connectDataOut(portName, plug);
    }

    private ValueResult getRecord(ValueRequest request) {
        return ValueResult.of((Object[])new Object[]{this.record, this.record.toMap()});
    }

    private String getRecord(String columnName) {
        if (this.record == null) {
            return null;
        }
        return this.record.get(columnName);
    }
}

