package org.apache.skywalking.oap.server.core.analysis.worker;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.status.BootingStatus;
import org.apache.skywalking.oap.server.core.status.ClusterStatus;
import org.apache.skywalking.oap.server.core.status.ServerStatusService;
import org.apache.skywalking.oap.server.core.status.ServerStatusWatcher;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.class */
public class MetricsPersistentWorker extends PersistenceWorker<Metrics> implements ServerStatusWatcher {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MetricsPersistentWorker.class);
    private final Model model;
    private final MetricsSessionCache sessionCache;
    private final IMetricsDAO metricsDAO;
    private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
    private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
    private final DataCarrier<Metrics> dataCarrier;
    private final Optional<MetricsTransWorker> transWorker;
    private final boolean supportUpdate;
    private CounterMetrics aggregationCounter;
    private CounterMetrics readMetricsCounter;
    private CounterMetrics cachedMetricsCounter;
    private int persistentCounter;
    private int persistentMod;
    private int metricsDataTTL;
    private final ServerStatusService serverStatusService;
    private volatile long timeOfLatestStabilitySts;

    /* loaded from: input_file:org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker$PersistentConsumer.class */
    private class PersistentConsumer implements IConsumer<Metrics> {
        private PersistentConsumer() {
        }

        public void consume(List<Metrics> list) {
            MetricsPersistentWorker.this.onWork(list);
        }

        public void onError(List<Metrics> list, Throwable th) {
            MetricsPersistentWorker.log.error(th.getMessage(), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO iMetricsDAO, AbstractWorker<Metrics> abstractWorker, AbstractWorker<ExportEvent> abstractWorker2, MetricsTransWorker metricsTransWorker, boolean z, long j, int i, MetricStreamKind metricStreamKind) {
        super(moduleDefineHolder, new ReadWriteSafeCache(new MergableBufferedData(), new MergableBufferedData()));
        this.timeOfLatestStabilitySts = 0L;
        this.model = model;
        this.sessionCache = new MetricsSessionCache(j);
        this.metricsDAO = iMetricsDAO;
        this.nextAlarmWorker = Optional.ofNullable(abstractWorker);
        this.nextExportWorker = Optional.ofNullable(abstractWorker2);
        this.transWorker = Optional.ofNullable(metricsTransWorker);
        this.supportUpdate = z;
        this.persistentCounter = 0;
        this.persistentMod = 1;
        this.metricsDataTTL = i;
        int recommendMaxSize = BulkConsumePool.Creator.recommendMaxSize() / 8;
        try {
            ConsumerPoolFactory.INSTANCE.createIfAbsent("METRICS_L2_AGGREGATION", new BulkConsumePool.Creator("METRICS_L2_AGGREGATION", recommendMaxSize == 0 ? 1 : recommendMaxSize, 20L));
            this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), "METRICS_L2_AGGREGATION", 1, MetricStreamKind.MAL == metricStreamKind ? 1000 : 2000);
            this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get("METRICS_L2_AGGREGATION"), new PersistentConsumer());
            MetricsCreator service = moduleDefineHolder.find("telemetry").provider().getService(MetricsCreator.class);
            this.aggregationCounter = service.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys(new String[]{"metricName", "level", "dimensionality"}), new MetricsTag.Values(new String[]{model.getName(), "2", model.getDownsampling().getName()}));
            this.readMetricsCounter = service.createCounter("metrics_persistent_cache", "The counter of metrics status, new or cached.", new MetricsTag.Keys(new String[]{"status"}), new MetricsTag.Values(new String[]{"new"}));
            this.cachedMetricsCounter = service.createCounter("metrics_persistent_cache", "The counter of metrics status, new or cached.", new MetricsTag.Keys(new String[]{"status"}), new MetricsTag.Values(new String[]{"cached"}));
            this.serverStatusService = (ServerStatusService) moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServerStatusService.class);
            if (model.getDownsampling().equals(DownSampling.Minute)) {
                this.serverStatusService.registerWatcher(this);
            }
        } catch (Exception e) {
            throw new UnexpectedException(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO iMetricsDAO, boolean z, long j, int i, MetricStreamKind metricStreamKind) {
        this(moduleDefineHolder, model, iMetricsDAO, null, null, null, z, j, i, metricStreamKind);
        this.sessionCache.setTimeoutThreshold(j * 4);
        this.persistentMod = 4;
    }

    @Override // org.apache.skywalking.oap.server.core.worker.AbstractWorker
    public void in(Metrics metrics) {
        this.aggregationCounter.inc();
        this.dataCarrier.produce(metrics);
    }

    @Override // org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker
    public List<PrepareRequest> buildBatchRequests() {
        int i = this.persistentCounter;
        this.persistentCounter = i + 1;
        if (i % this.persistentMod != 0) {
            return Collections.emptyList();
        }
        List<Metrics> read = getCache().read();
        long currentTimeMillis = System.currentTimeMillis();
        if (read.size() == 0) {
            return Collections.emptyList();
        }
        int min = Math.min(2000, read.size());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList(read.size());
        for (Metrics metrics : read) {
            this.transWorker.ifPresent(metricsTransWorker -> {
                metricsTransWorker.in(metrics);
            });
            arrayList.add(metrics);
            if (arrayList.size() == min) {
                prepareFlushDataToStorage(arrayList, arrayList2);
            }
        }
        if (arrayList.size() > 0) {
            prepareFlushDataToStorage(arrayList, arrayList2);
        }
        if (arrayList2.size() > 0) {
            log.debug("prepare batch requests for model {}, took time: {}, size: {}", new Object[]{this.model.getName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(arrayList2.size())});
        }
        return arrayList2;
    }

    private void prepareFlushDataToStorage(List<Metrics> list, List<PrepareRequest> list2) {
        try {
            try {
                loadFromStorage(list);
                long currentTimeMillis = System.currentTimeMillis();
                for (Metrics metrics : list) {
                    Metrics metrics2 = this.sessionCache.get(metrics);
                    if (metrics2 != null) {
                        metrics2.setLastUpdateTimestamp(currentTimeMillis);
                        if (this.supportUpdate) {
                            if (!(!metrics2.combine(metrics))) {
                                metrics2.calculate();
                                list2.add(this.metricsDAO.prepareBatchUpdate(this.model, metrics2, new SessionCacheCallback(this.sessionCache, metrics2)));
                                nextWorker(metrics2);
                            }
                        }
                    } else {
                        metrics.calculate();
                        list2.add(this.metricsDAO.prepareBatchInsert(this.model, metrics, new SessionCacheCallback(this.sessionCache, metrics)));
                        nextWorker(metrics);
                        metrics.setLastUpdateTimestamp(currentTimeMillis);
                    }
                    this.nextExportWorker.ifPresent(abstractWorker -> {
                        abstractWorker.in(new ExportEvent(metrics, ExportEvent.EventType.INCREMENT));
                    });
                }
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
                list.clear();
            }
        } finally {
            list.clear();
        }
    }

    private void nextWorker(Metrics metrics) {
        this.nextAlarmWorker.ifPresent(abstractWorker -> {
            abstractWorker.in(metrics);
        });
        this.nextExportWorker.ifPresent(abstractWorker2 -> {
            abstractWorker2.in(new ExportEvent(metrics, ExportEvent.EventType.TOTAL));
        });
    }

    private void loadFromStorage(List<Metrics> list) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            List<Metrics> list2 = (List) list.stream().filter(metrics -> {
                Metrics requireInitialization = requireInitialization(metrics);
                if (requireInitialization == null) {
                    return true;
                }
                if (this.model.isTimeRelativeID() || !this.metricsDAO.isExpiredCache(this.model, requireInitialization, currentTimeMillis, this.metricsDataTTL)) {
                    return false;
                }
                this.sessionCache.remove(metrics);
                return true;
            }).collect(Collectors.toList());
            this.readMetricsCounter.inc(list2.size());
            this.cachedMetricsCounter.inc(list.size() - list2.size());
            if (list2.isEmpty()) {
                return;
            }
            this.metricsDAO.multiGet(this.model, list2).forEach(metrics2 -> {
                metrics2.setLastUpdateTimestamp(currentTimeMillis);
                this.sessionCache.put(metrics2);
            });
        } catch (Exception e) {
            log.error("Failed to load metrics for merging", e);
        }
    }

    @Override // org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker
    public void endOfRound() {
        this.sessionCache.removeExpired();
    }

    private Metrics requireInitialization(Metrics metrics) {
        Metrics metrics2 = this.sessionCache.get(metrics);
        if (metrics2 != null) {
            return metrics2;
        }
        if (this.model.isTimeRelativeID() && this.timeOfLatestStabilitySts > 0 && metrics.getTimeBucket() > this.timeOfLatestStabilitySts && metrics2 == null) {
            return metrics;
        }
        return null;
    }

    @Override // org.apache.skywalking.oap.server.core.status.ServerStatusWatcher
    public void onServerBooted(BootingStatus bootingStatus) {
        this.timeOfLatestStabilitySts = TimeBucket.getMinuteTimeBucket(bootingStatus.getUptime());
    }

    @Override // org.apache.skywalking.oap.server.core.status.ServerStatusWatcher
    public void onClusterRebalanced(ClusterStatus clusterStatus) {
        this.timeOfLatestStabilitySts = TimeBucket.getMinuteTimeBucket(clusterStatus.getRebalancedTime());
    }
}
