package org.apache.kylin.common.metrics.service;

import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.metrics.MetricsGroup;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.common.util.InfluxDBUtils;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.guava30.shaded.common.base.Throwables;
import org.apache.kylin.job.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.kylin.shaded.influxdb.org.influxdb.BatchOptions;
import org.apache.kylin.shaded.influxdb.org.influxdb.InfluxDB;
import org.apache.kylin.shaded.influxdb.org.influxdb.InfluxDBIOException;
import org.apache.kylin.shaded.influxdb.org.influxdb.dto.Point;
import org.apache.kylin.shaded.influxdb.org.influxdb.dto.Pong;
import org.apache.kylin.shaded.influxdb.org.influxdb.dto.Query;
import org.apache.kylin.shaded.influxdb.org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/common/metrics/service/InfluxDBInstance.class */
public class InfluxDBInstance {
    private static final Logger logger = LoggerFactory.getLogger(InfluxDBInstance.class);
    private String database;
    private String retentionPolicyName;
    private String retentionDuration;
    private String shardDuration;
    private int replicationFactor;
    private boolean useDefault;
    private static final String DEFAULT_DATABASE = "KYLIN_MONITOR";
    private static final String DEFAULT_RETENTION_POLICY_NAME = "KYLIN_MONITOR_RP";
    private static final String RETENTION_DURATION = "90d";
    private static final String SHARD_DURATION = "7d";
    private static final int REPLICATION_FACTOR = 1;
    private static final boolean USE_DEFAULT = true;
    private volatile InfluxDB influxDB;
    private ScheduledExecutorService scheduledExecutorService;
    private KapConfig config = KapConfig.wrap(KylinConfig.getInstanceFromEnv());

    public InfluxDBInstance(String str, String str2, String str3, String str4, int i, boolean z) {
        this.database = str;
        this.retentionPolicyName = str2;
        this.retentionDuration = str3;
        this.shardDuration = str4;
        this.replicationFactor = i;
        this.useDefault = z;
    }

    public void init() {
        if (StringUtils.isEmpty(this.config.influxdbAddress())) {
            logger.info("InfluxDB address is empty, skip it");
        } else {
            tryConnectInfluxDB();
            startMonitorInfluxDB();
        }
    }

    private void tryConnectInfluxDB() {
        try {
            if (this.influxDB == null) {
                String influxdbAddress = this.config.influxdbAddress();
                String influxdbUsername = this.config.influxdbUsername();
                String influxdbPassword = this.config.influxdbPassword();
                boolean isInfluxdbHttpsEnabled = this.config.isInfluxdbHttpsEnabled();
                boolean isInfluxdbUnsafeSslEnabled = this.config.isInfluxdbUnsafeSslEnabled();
                logger.info("Init influxDB, address: {}, username: {}", influxdbAddress, influxdbUsername);
                this.influxDB = InfluxDBUtils.getInfluxDBInstance(influxdbAddress, influxdbUsername, influxdbPassword, isInfluxdbHttpsEnabled, isInfluxdbUnsafeSslEnabled);
                this.influxDB.setDatabase(getDatabase());
                this.influxDB.setRetentionPolicy(getRetentionPolicyName());
                if (!this.influxDB.databaseExists(getDatabase())) {
                    logger.info("Create influxDB database {}", getDatabase());
                    this.influxDB.createDatabase(getDatabase());
                    logger.info("Create influxDB retention policy '{}' on database '{}'", getRetentionPolicyName(), getDatabase());
                    this.influxDB.createRetentionPolicy(getRetentionPolicyName(), getDatabase(), getRetentionDuration(), getShardDuration(), getReplicationFactor(), isUseDefault());
                }
                this.influxDB.enableBatch(BatchOptions.DEFAULTS.actions(1000).bufferLimit(10000).flushDuration(this.config.getInfluxDBFlushDuration()).jitterDuration(500));
            } else {
                Pong ping = this.influxDB.ping();
                MetricsGroup.monitorRegisterMetrics();
                logger.trace("Connected to influxDB successfully. [{}]", ping);
            }
        } catch (Exception e) {
            if (this.influxDB != null && this.influxDB.isBatchEnabled()) {
                this.influxDB.disableBatch();
            }
            this.influxDB = null;
            if (Throwables.getCausalChain(e).stream().anyMatch(th -> {
                return th instanceof InfluxDBIOException;
            })) {
                logger.warn("Check influxDB Instance error, database: {}, retentionPolicy: {} ex: {}", new Object[]{getDatabase(), getRetentionPolicyName(), e.getMessage()});
            } else {
                logger.error("Unknown exception happened", e);
            }
        }
    }

    private void startMonitorInfluxDB() {
        logger.info("Start to monitor influxDB Instance, database: {}, retentionPolicy: {}", getDatabase(), getRetentionPolicyName());
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("InfluxDBMonitor-" + getDatabase()));
        this.scheduledExecutorService.scheduleWithFixedDelay(this::tryConnectInfluxDB, 60L, 600L, TimeUnit.SECONDS);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            getInfluxDB().close();
            ExecutorServiceUtil.forceShutdown(this.scheduledExecutorService);
            logger.info("Shutdown InfluxDB Instance, database: {}, retentionPolicy: {}", getDatabase(), getRetentionPolicyName());
        }));
    }

    public boolean write(String str, Map<String, String> map, Map<String, Object> map2, long j) {
        if (this.influxDB == null) {
            logger.error("InfluxDB is not connected, abort writing.");
            return false;
        }
        getInfluxDB().write(Point.measurement(str).time(j, TimeUnit.MILLISECONDS).tag(map).fields(map2).build());
        return true;
    }

    public QueryResult read(String str) {
        if (this.influxDB != null) {
            return getInfluxDB().query(new Query(str, getDatabase()));
        }
        logger.error("InfluxDB is not connected, abort reading.");
        return new QueryResult();
    }

    @Generated
    public String getDatabase() {
        return this.database;
    }

    @Generated
    public String getRetentionPolicyName() {
        return this.retentionPolicyName;
    }

    @Generated
    public String getRetentionDuration() {
        return this.retentionDuration;
    }

    @Generated
    public String getShardDuration() {
        return this.shardDuration;
    }

    @Generated
    public int getReplicationFactor() {
        return this.replicationFactor;
    }

    @Generated
    public boolean isUseDefault() {
        return this.useDefault;
    }

    @Generated
    public InfluxDB getInfluxDB() {
        return this.influxDB;
    }

    @Generated
    public ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    @Generated
    public KapConfig getConfig() {
        return this.config;
    }

    @Generated
    public void setInfluxDB(InfluxDB influxDB) {
        this.influxDB = influxDB;
    }
}
