package io.confluent.support.metrics;

import io.confluent.support.metrics.common.Collector;
import io.confluent.support.metrics.common.kafka.KafkaUtilities;
import io.confluent.support.metrics.common.kafka.ZkClientProvider;
import io.confluent.support.metrics.serde.AvroSerializer;
import io.confluent.support.metrics.submitters.ConfluentSubmitter;
import io.confluent.support.metrics.submitters.KafkaSubmitter;
import io.confluent.support.metrics.submitters.ResponseHandler;
import io.confluent.support.metrics.utils.Jitter;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/support/metrics/BaseMetricsReporter.class */
public abstract class BaseMetricsReporter extends Thread implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(BaseMetricsReporter.class);
    private static final long RETENTION_MS = 31536000000L;
    public static final int SUPPORT_TOPIC_REPLICATION = 3;
    private static final int SUPPORT_TOPIC_PARTITIONS = 1;
    private static final long SETTLING_TIME_MS = 10000;
    private final boolean enableSettlingTime;
    private String customerId;
    private long reportIntervalMs;
    private String supportTopic;
    private KafkaSubmitter kafkaSubmitter;
    private ConfluentSubmitter confluentSubmitter;
    private Collector metricsCollector;
    private final AvroSerializer encoder;
    protected final KafkaUtilities kafkaUtilities;
    protected final BaseSupportConfig supportConfig;
    private final ResponseHandler responseHandler;
    private volatile boolean isClosing;

    public BaseMetricsReporter(String str, boolean z, BaseSupportConfig baseSupportConfig) {
        this(str, z, baseSupportConfig, new KafkaUtilities(), null, true);
    }

    public BaseMetricsReporter(String str, boolean z, BaseSupportConfig baseSupportConfig, KafkaUtilities kafkaUtilities, ResponseHandler responseHandler, boolean z2) {
        super(str);
        this.encoder = new AvroSerializer();
        this.isClosing = false;
        setDaemon(z);
        Objects.requireNonNull(baseSupportConfig, "supportConfig can't be null");
        if (StringUtils.isNotBlank(baseSupportConfig.getKafkaTopic())) {
            Objects.requireNonNull(kafkaUtilities, "kafkaUtilities can't be null");
        }
        this.kafkaUtilities = kafkaUtilities;
        this.supportConfig = baseSupportConfig;
        this.responseHandler = responseHandler;
        this.enableSettlingTime = z2;
    }

    public void init() {
        this.customerId = this.supportConfig.getCustomerId();
        this.metricsCollector = metricsCollector();
        this.metricsCollector.setRuntimeState(Collector.RuntimeState.Running);
        this.reportIntervalMs = this.supportConfig.getReportIntervalMs();
        this.supportTopic = this.supportConfig.getKafkaTopic();
        if (this.supportTopic.isEmpty()) {
            this.kafkaSubmitter = null;
        } else {
            this.kafkaSubmitter = new KafkaSubmitter(zkClientProvider(), this.supportTopic);
        }
        String endpointHTTP = this.supportConfig.getEndpointHTTP();
        String endpointHTTPS = this.supportConfig.getEndpointHTTPS();
        String proxy = this.supportConfig.getProxy();
        if (endpointHTTP.isEmpty() && endpointHTTPS.isEmpty()) {
            this.confluentSubmitter = null;
        } else {
            this.confluentSubmitter = new ConfluentSubmitter(this.customerId, endpointHTTP, endpointHTTPS, proxy, this.responseHandler);
        }
        if (reportingEnabled()) {
            return;
        }
        log.info("Metrics collection disabled by component configuration");
    }

    protected abstract ZkClientProvider zkClientProvider();

    protected abstract Collector metricsCollector();

    protected boolean reportingEnabled() {
        return sendToKafkaEnabled() || sendToConfluentEnabled();
    }

    protected boolean sendToKafkaEnabled() {
        return this.kafkaSubmitter != null;
    }

    protected boolean sendToConfluentEnabled() {
        return this.confluentSubmitter != null;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            try {
                if (reportingEnabled()) {
                    waitForServer();
                    while (!this.isClosing) {
                        log.info("Attempting to collect and submit metrics");
                        submitMetrics();
                        Thread.sleep(Jitter.addOnePercentJitter(this.reportIntervalMs));
                    }
                }
                if (this.isClosing) {
                    log.info("Gracefully terminating metrics collection");
                    this.metricsCollector.setRuntimeState(Collector.RuntimeState.ShuttingDown);
                    submitMetrics();
                }
                log.info("Metrics collection stopped");
            } catch (InterruptedException e) {
                log.error("Caught InterruptedException during metrics collection", e);
                Thread.currentThread().interrupt();
                if (this.isClosing) {
                    log.info("Gracefully terminating metrics collection");
                    this.metricsCollector.setRuntimeState(Collector.RuntimeState.ShuttingDown);
                    submitMetrics();
                }
                log.info("Metrics collection stopped");
            } catch (Exception e2) {
                log.error("Caught exception during metrics collection", e2);
                if (this.isClosing) {
                    log.info("Gracefully terminating metrics collection");
                    this.metricsCollector.setRuntimeState(Collector.RuntimeState.ShuttingDown);
                    submitMetrics();
                }
                log.info("Metrics collection stopped");
            }
        } catch (Throwable th) {
            if (this.isClosing) {
                log.info("Gracefully terminating metrics collection");
                this.metricsCollector.setRuntimeState(Collector.RuntimeState.ShuttingDown);
                submitMetrics();
            }
            log.info("Metrics collection stopped");
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        log.info("Closing BaseMetricsReporter");
        this.isClosing = true;
        interrupt();
    }

    private void waitForServer() throws InterruptedException {
        log.info("Waiting until monitored service is ready for metrics collection");
        while (!this.isClosing && !isReadyForMetricsCollection() && !isShuttingDown()) {
            if (this.enableSettlingTime) {
                long addOnePercentJitter = Jitter.addOnePercentJitter(SETTLING_TIME_MS);
                log.info("Waiting {} ms for the monitored service to finish starting up", Long.valueOf(addOnePercentJitter));
                Thread.sleep(addOnePercentJitter);
            }
        }
        if (isShuttingDown()) {
            close();
        } else {
            log.info("Monitored service is now ready");
        }
    }

    protected abstract boolean isReadyForMetricsCollection();

    protected abstract boolean isShuttingDown();

    void submitMetrics() {
        byte[] bArr = null;
        try {
            bArr = this.encoder.serialize(this.metricsCollector.collectMetrics());
        } catch (IOException e) {
            log.error("Could not serialize metrics record: {}", e.toString());
        }
        try {
            if (sendToKafkaEnabled() && bArr != null && this.kafkaUtilities.createAndVerifyTopic(zkClientProvider().zkClient(), this.supportTopic, SUPPORT_TOPIC_PARTITIONS, 3, RETENTION_MS)) {
                this.kafkaSubmitter.submit(bArr);
            }
        } catch (RuntimeException e2) {
            log.error("Could not submit metrics to Kafka topic {}: {}", this.supportTopic, e2.getMessage());
        }
        try {
            if (sendToConfluentEnabled() && bArr != null) {
                this.confluentSubmitter.submit(bArr);
            }
        } catch (RuntimeException e3) {
            log.error("Could not submit metrics to Confluent: {}", e3.getMessage());
        }
    }
}
