package org.apache.skywalking.oap.server.core.storage;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/core/storage/PersistenceTimer.class */
public enum PersistenceTimer {
    INSTANCE;


    @Generated
    private static final Logger log = LoggerFactory.getLogger(PersistenceTimer.class);

    @VisibleForTesting
    boolean isStarted = false;
    private CounterMetrics errorCounter;
    private HistogramMetrics prepareLatency;
    private HistogramMetrics executeLatency;
    private HistogramMetrics allLatency;
    private ExecutorService prepareExecutorService;

    PersistenceTimer() {
    }

    public void start(ModuleManager moduleManager, CoreModuleConfig coreModuleConfig) {
        log.info("persistence timer start");
        IBatchDAO iBatchDAO = (IBatchDAO) moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
        MetricsCreator service = moduleManager.find("telemetry").provider().getService(MetricsCreator.class);
        this.errorCounter = service.createCounter("persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        this.prepareLatency = service.createHistogramMetric("persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE, new double[0]);
        this.executeLatency = service.createHistogramMetric("persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE, new double[0]);
        this.allLatency = service.createHistogramMetric("persistence_timer_bulk_all_latency", "Latency of the all stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE, new double[0]);
        this.prepareExecutorService = Executors.newFixedThreadPool(coreModuleConfig.getPrepareThreads());
        if (this.isStarted) {
            return;
        }
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RunnableWithExceptionProtection(() -> {
            extractDataAndSave(iBatchDAO).join();
        }, th -> {
            log.error("Extract data and save failure.", th);
        }), 5L, coreModuleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
        this.isStarted = true;
    }

    private CompletableFuture<Void> extractDataAndSave(IBatchDAO iBatchDAO) {
        if (log.isDebugEnabled()) {
            log.debug("Extract data and save");
        }
        long currentTimeMillis = System.currentTimeMillis();
        HistogramMetrics.Timer createTimer = this.allLatency.createTimer();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
        arrayList.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) arrayList.stream().map(persistenceWorker -> {
            return CompletableFuture.runAsync(() -> {
                HistogramMetrics.Timer createTimer2 = this.prepareLatency.createTimer();
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("extract {} worker data and save", persistenceWorker.getClass().getName());
                    }
                    List<PrepareRequest> buildBatchRequests = persistenceWorker.buildBatchRequests();
                    persistenceWorker.endOfRound();
                    if (createTimer2 != null) {
                        createTimer2.close();
                    }
                    if (CollectionUtils.isEmpty(buildBatchRequests)) {
                        return;
                    }
                    HistogramMetrics.Timer createTimer3 = this.executeLatency.createTimer();
                    iBatchDAO.flush(buildBatchRequests).whenComplete((r3, th) -> {
                        createTimer3.close();
                    });
                } catch (Throwable th2) {
                    if (createTimer2 != null) {
                        try {
                            createTimer2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            }, this.prepareExecutorService);
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
        allOf.whenComplete((r12, th) -> {
            iBatchDAO.endOfFlush();
            createTimer.close();
            if (log.isDebugEnabled()) {
                log.debug("Batch persistence duration: {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            }
            if (th != null) {
                this.errorCounter.inc();
                log.error(th.getMessage(), th);
            }
        });
        return allOf;
    }
}
