package org.apache.streampipes.extensions.connectors.influx.sink;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxConnectionSettings;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxRequests;
import org.apache.streampipes.extensions.connectors.influx.shared.SharedInfluxClient;
import org.apache.streampipes.model.runtime.Event;
import org.influxdb.BatchOptions;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.class */
public class InfluxDbClient extends SharedInfluxClient {
    private static final Logger LOG = LoggerFactory.getLogger(InfluxDbClient.class);
    private final String timestampField;
    private final Integer batchSize;
    private final Integer flushDuration;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InfluxDbClient(InfluxConnectionSettings influxConnectionSettings, String str, String str2, Integer num, Integer num2) throws SpRuntimeException {
        super(influxConnectionSettings, str);
        this.measureName = str;
        this.timestampField = str2;
        this.batchSize = num;
        this.flushDuration = num2;
        connect();
    }

    private void connect() throws SpRuntimeException {
        super.initClient();
        String databaseName = this.connectionSettings.getDatabaseName();
        if (!InfluxRequests.databaseExists(this.influxDb, databaseName)) {
            LOG.info("Database '" + databaseName + "' not found. Gets created ...");
            createDatabase(databaseName);
        }
        this.influxDb.setDatabase(databaseName);
        this.influxDb.enableBatch(BatchOptions.DEFAULTS.actions(this.batchSize.intValue()).flushDuration(this.flushDuration.intValue()));
    }

    private void createDatabase(String str) throws SpRuntimeException {
        if (!str.matches("^[a-zA-Z_][a-zA-Z0-9_]*$")) {
            throw new SpRuntimeException("Databasename '" + str + "' not allowed. Allowed names: ^[a-zA-Z_][a-zA-Z0-9_]*$");
        }
        this.influxDb.query(new Query("CREATE DATABASE \"" + str + "\"", ""));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void save(Event event) throws SpRuntimeException {
        if (event == null) {
            throw new SpRuntimeException("event is null");
        }
        Point.Builder time = Point.measurement(this.measureName).time(event.getFieldBySelector(this.timestampField).getAsPrimitive().getAsLong(), TimeUnit.MILLISECONDS);
        for (Map.Entry entry : event.getRaw().entrySet()) {
            if (entry.getValue() instanceof Integer) {
                time.addField(InfluxDbSink.prepareString((String) entry.getKey()), (Integer) entry.getValue());
            } else if (entry.getValue() instanceof Long) {
                time.addField(InfluxDbSink.prepareString((String) entry.getKey()), (Long) entry.getValue());
            } else if (entry.getValue() instanceof Double) {
                time.addField(InfluxDbSink.prepareString((String) entry.getKey()), (Double) entry.getValue());
            } else if (entry.getValue() instanceof Boolean) {
                time.addField(InfluxDbSink.prepareString((String) entry.getKey()), ((Boolean) entry.getValue()).booleanValue());
            } else {
                time.addField(InfluxDbSink.prepareString((String) entry.getKey()), entry.getValue().toString());
            }
        }
        this.influxDb.write(time.build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.influxDb.close();
    }
}
