package org.apache.streampipes.dataexplorer.commons.influx;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.AbstractField;
import org.apache.streampipes.model.runtime.field.PrimitiveField;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.vocabulary.XSD;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.class */
public class InfluxStore {
    private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
    DataLakeMeasure measure;
    Map<String, String> sanitizedRuntimeNames;
    private InfluxDB influxDb;

    public InfluxStore(DataLakeMeasure dataLakeMeasure, InfluxConnectionSettings influxConnectionSettings) {
        this.sanitizedRuntimeNames = new HashMap();
        this.influxDb = null;
        this.measure = dataLakeMeasure;
        dataLakeMeasure.getEventSchema().getEventProperties().forEach(eventProperty -> {
            this.sanitizedRuntimeNames.put(eventProperty.getRuntimeName(), InfluxNameSanitizer.renameReservedKeywords(eventProperty.getRuntimeName()));
        });
        connect(influxConnectionSettings);
    }

    public InfluxStore(DataLakeMeasure dataLakeMeasure, Environment environment) throws SpRuntimeException {
        this(dataLakeMeasure, InfluxConnectionSettings.from(environment));
    }

    private void connect(InfluxConnectionSettings influxConnectionSettings) throws SpRuntimeException {
        this.influxDb = InfluxClientProvider.getInfluxDBClient(influxConnectionSettings);
        if (this.influxDb.ping().getVersion().equalsIgnoreCase("unknown")) {
            throw new SpRuntimeException("Could not connect to InfluxDb Server: " + influxConnectionSettings.getConnectionUrl());
        }
        String databaseName = influxConnectionSettings.getDatabaseName();
        if (!InfluxRequests.databaseExists(this.influxDb, databaseName)) {
            LOG.info("Database '" + databaseName + "' not found. Gets created ...");
            createDatabase(databaseName);
        }
        this.influxDb.setDatabase(databaseName);
        this.influxDb.enableBatch(2000, 500, TimeUnit.MILLISECONDS);
    }

    private void createDatabase(String str) throws SpRuntimeException {
        if (!str.matches("^[a-zA-Z_]\\w*$")) {
            throw new SpRuntimeException("Database name '" + str + "' not allowed. Allowed names: ^[a-zA-Z_][a-zA-Z0-9_]*$");
        }
        this.influxDb.query(new Query("CREATE DATABASE \"" + str + "\"", ""));
    }

    public void onEvent(Event event) throws SpRuntimeException {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        if (event == null) {
            throw new SpRuntimeException("event is null");
        }
        Point.Builder time = Point.measurement(this.measure.getMeasureName()).time(event.getFieldBySelector(this.measure.getTimestampField()).getAsPrimitive().getAsLong().longValue(), TimeUnit.MILLISECONDS);
        for (EventProperty eventProperty : this.measure.getEventSchema().getEventProperties()) {
            if (eventProperty instanceof EventPropertyPrimitive) {
                String runtimeName = eventProperty.getRuntimeName();
                if (!this.measure.getTimestampField().endsWith(runtimeName)) {
                    String str = this.sanitizedRuntimeNames.get(runtimeName);
                    try {
                        Optional optionalFieldByRuntimeName = event.getOptionalFieldByRuntimeName(runtimeName);
                        if (optionalFieldByRuntimeName.isPresent()) {
                            PrimitiveField asPrimitive = ((AbstractField) optionalFieldByRuntimeName.get()).getAsPrimitive();
                            if (asPrimitive.getRawValue() == null) {
                                arrayList2.add(str);
                            } else if (PropertyScope.DIMENSION_PROPERTY.name().equals(eventProperty.getPropertyScope())) {
                                time.tag(str, asPrimitive.getAsString());
                            } else {
                                handleMeasurementProperty(time, (EventPropertyPrimitive) eventProperty, str, asPrimitive);
                            }
                        } else {
                            arrayList.add(runtimeName);
                        }
                    } catch (SpRuntimeException e) {
                        LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored", runtimeName, e);
                    }
                }
            }
        }
        if (arrayList.size() > 0) {
            LOG.debug("Ignored {} fields which were present in the schema, but not in the provided event: {}", Integer.valueOf(arrayList.size()), String.join(", ", arrayList));
        }
        if (arrayList2.size() > 0) {
            LOG.warn("Ignored {} fields which had a value 'null': {}", Integer.valueOf(arrayList2.size()), String.join(", ", arrayList2));
        }
        this.influxDb.write(time.build());
    }

    private void handleMeasurementProperty(Point.Builder builder, @NotNull EventPropertyPrimitive eventPropertyPrimitive, String str, PrimitiveField primitiveField) {
        try {
            String runtimeType = eventPropertyPrimitive.getRuntimeType();
            if (XSD.INTEGER.toString().equals(runtimeType)) {
                try {
                    builder.addField(str, primitiveField.getAsInt());
                } catch (NumberFormatException e) {
                    builder.addField(str, primitiveField.getAsFloat());
                }
                return;
            }
            if (XSD.LONG.toString().equals(runtimeType)) {
                try {
                    builder.addField(str, primitiveField.getAsLong());
                } catch (NumberFormatException e2) {
                    builder.addField(str, primitiveField.getAsFloat());
                }
            } else if (XSD.FLOAT.toString().equals(runtimeType)) {
                builder.addField(str, primitiveField.getAsFloat());
            } else if (XSD.DOUBLE.toString().equals(runtimeType)) {
                builder.addField(str, primitiveField.getAsDouble());
            } else if (XSD.BOOLEAN.toString().equals(runtimeType)) {
                builder.addField(str, primitiveField.getAsBoolean().booleanValue());
            } else if ("http://schema.org/Number".equals(runtimeType)) {
                builder.addField(str, primitiveField.getAsDouble());
            } else {
                builder.addField(str, primitiveField.getAsString());
            }
            return;
        } catch (NumberFormatException e3) {
            LOG.warn("Wrong number format for field {}, ignoring.", str);
        }
        LOG.warn("Wrong number format for field {}, ignoring.", str);
    }

    public void close() throws SpRuntimeException {
        this.influxDb.flush();
        try {
            Thread.sleep(1000L);
            this.influxDb.close();
        } catch (InterruptedException e) {
            throw new SpRuntimeException(e);
        }
    }
}
