package org.apache.streampipes.extensions.connectors.influx.adapter;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
import org.apache.streampipes.extensions.api.connect.IEventCollector;
import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext;
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.connectors.influx.shared.InfluxConfigs;
import org.apache.streampipes.extensions.connectors.influx.shared.InfluxKeys;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbStreamAdapter.class */
public class InfluxDbStreamAdapter implements StreamPipesAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(InfluxDbStreamAdapter.class);
    public static final String ID = "org.apache.streampipes.connect.iiot.adapters.influxdb.stream";
    private static final String POLLING_INTERVAL = "pollingInterval";
    private InfluxDbClient influxDbClient;
    private Thread pollingThread;
    private int pollingInterval;

    /* loaded from: input_file:org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbStreamAdapter$PollingThread.class */
    public static class PollingThread implements Runnable {
        private final int pollingInterval;
        private final InfluxDbClient influxDbClient;
        private final IEventCollector collector;

        PollingThread(InfluxDbStreamAdapter influxDbStreamAdapter, int i, IEventCollector iEventCollector) throws AdapterException {
            this.pollingInterval = i;
            this.collector = iEventCollector;
            this.influxDbClient = influxDbStreamAdapter.getInfluxDbClient();
            this.influxDbClient.connect();
            this.influxDbClient.loadColumns();
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!this.influxDbClient.isConnected()) {
                InfluxDbStreamAdapter.LOG.warn("Cannot start PollingThread, when the client is not connected");
                return;
            }
            try {
                String newestTimestamp = getNewestTimestamp();
                while (!Thread.interrupted()) {
                    try {
                        Thread.sleep(this.pollingInterval);
                        List<List<Object>> query = this.influxDbClient.query("SELECT " + this.influxDbClient.getColumnsString() + " FROM " + this.influxDbClient.getMeasurement() + " WHERE time > " + newestTimestamp + " ORDER BY time ASC ");
                        if (query.size() > 0) {
                            newestTimestamp = InfluxDbClient.getTimestamp((String) query.get(query.size() - 1).get(0));
                            Iterator<List<Object>> it = query.iterator();
                            while (it.hasNext()) {
                                try {
                                    Map<String, Object> extractEvent = this.influxDbClient.extractEvent(it.next());
                                    if (extractEvent != null) {
                                        this.collector.collect(extractEvent);
                                    }
                                } catch (SpRuntimeException e) {
                                    InfluxDbStreamAdapter.LOG.error("Error: " + e.getMessage());
                                }
                            }
                        }
                    } catch (InterruptedException e2) {
                    }
                }
                this.influxDbClient.disconnect();
            } catch (SpRuntimeException e3) {
                InfluxDbStreamAdapter.LOG.error(e3.getMessage());
            }
        }

        String getNewestTimestamp() throws SpRuntimeException {
            List<List<Object>> query = this.influxDbClient.query("SELECT * FROM " + this.influxDbClient.getMeasurement() + " ORDER BY time DESC LIMIT 1");
            if (query.size() > 0) {
                return InfluxDbClient.getTimestamp((String) query.get(0).get(0));
            }
            throw new SpRuntimeException("No entry found in query");
        }
    }

    public IAdapterConfiguration declareConfig() {
        AdapterConfigurationBuilder withLocales = AdapterConfigurationBuilder.create(ID, InfluxDbStreamAdapter::new).withAssets(new String[]{"documentation.md", "icon.png"}).withLocales(new Locales[]{Locales.EN});
        InfluxConfigs.appendSharedInfluxConfig(withLocales);
        withLocales.requiredIntegerParameter(Labels.withId(POLLING_INTERVAL));
        withLocales.requiredSingleValueSelection(Labels.withId("replaceNullValues"), Options.from(new Tuple2[]{new Tuple2("Yes", "doReplace"), new Tuple2("No", "doNotReplace")}));
        return withLocales.buildConfiguration();
    }

    public void onAdapterStarted(IAdapterParameterExtractor iAdapterParameterExtractor, IEventCollector iEventCollector, IAdapterRuntimeContext iAdapterRuntimeContext) throws AdapterException {
        applyConfigurations(iAdapterParameterExtractor.getStaticPropertyExtractor());
        this.pollingThread = new Thread(new PollingThread(this, this.pollingInterval, iEventCollector));
        this.pollingThread.start();
    }

    public void onAdapterStopped(IAdapterParameterExtractor iAdapterParameterExtractor, IAdapterRuntimeContext iAdapterRuntimeContext) throws AdapterException {
        this.pollingThread.interrupt();
        try {
            this.pollingThread.join();
        } catch (InterruptedException e) {
            throw new AdapterException("Unexpected Error while joining polling thread: " + e.getMessage());
        }
    }

    public GuessSchema onSchemaRequested(IAdapterParameterExtractor iAdapterParameterExtractor, IAdapterGuessSchemaContext iAdapterGuessSchemaContext) throws AdapterException {
        applyConfigurations(iAdapterParameterExtractor.getStaticPropertyExtractor());
        return this.influxDbClient.getSchema();
    }

    private InfluxDbClient getInfluxDbClient() {
        return this.influxDbClient;
    }

    private void applyConfigurations(IStaticPropertyExtractor iStaticPropertyExtractor) {
        this.pollingInterval = ((Integer) iStaticPropertyExtractor.singleValueParameter(POLLING_INTERVAL, Integer.class)).intValue();
        this.influxDbClient = new InfluxDbClient(InfluxConfigs.fromExtractor(iStaticPropertyExtractor), (String) iStaticPropertyExtractor.singleValueParameter(InfluxKeys.DATABASE_MEASUREMENT_KEY, String.class), ((String) iStaticPropertyExtractor.selectedSingleValueInternalName("replaceNullValues", String.class)).equals("doReplace"));
    }
}
