package org.apache.skywalking.oap.server.core.analysis.worker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.Generated;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.class */
public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MetricsPersistentWorker.class);
    private final Model model;
    private final Map<Metrics, Metrics> context;
    private final IMetricsDAO metricsDAO;
    private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
    private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
    private final DataCarrier<Metrics> dataCarrier;
    private final Optional<MetricsTransWorker> transWorker;
    private final boolean enableDatabaseSession;
    private final boolean supportUpdate;
    private CounterMetrics aggregationCounter;

    /* loaded from: input_file:org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker$PersistentConsumer.class */
    private class PersistentConsumer implements IConsumer<Metrics> {
        private PersistentConsumer() {
        }

        public void init() {
        }

        public void consume(List<Metrics> list) {
            MetricsPersistentWorker.this.onWork(list);
        }

        public void onError(List<Metrics> list, Throwable th) {
            MetricsPersistentWorker.log.error(th.getMessage(), th);
        }

        public void onExit() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO iMetricsDAO, AbstractWorker<Metrics> abstractWorker, AbstractWorker<ExportEvent> abstractWorker2, MetricsTransWorker metricsTransWorker, boolean z, boolean z2) {
        super(moduleDefineHolder, new ReadWriteSafeCache(new MergableBufferedData(), new MergableBufferedData()));
        this.model = model;
        this.context = new HashMap(100);
        this.enableDatabaseSession = z;
        this.metricsDAO = iMetricsDAO;
        this.nextAlarmWorker = Optional.ofNullable(abstractWorker);
        this.nextExportWorker = Optional.ofNullable(abstractWorker2);
        this.transWorker = Optional.ofNullable(metricsTransWorker);
        this.supportUpdate = z2;
        int recommendMaxSize = BulkConsumePool.Creator.recommendMaxSize() / 8;
        try {
            ConsumerPoolFactory.INSTANCE.createIfAbsent("METRICS_L2_AGGREGATION", new BulkConsumePool.Creator("METRICS_L2_AGGREGATION", recommendMaxSize == 0 ? 1 : recommendMaxSize, 20L));
            this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), "METRICS_L2_AGGREGATION", 1, 2000);
            this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get("METRICS_L2_AGGREGATION"), new PersistentConsumer());
            this.aggregationCounter = moduleDefineHolder.find("telemetry").provider().getService(MetricsCreator.class).createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys(new String[]{"metricName", "level", "dimensionality"}), new MetricsTag.Values(new String[]{model.getName(), "2", model.getDownsampling().getName()}));
        } catch (Exception e) {
            throw new UnexpectedException(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO iMetricsDAO, boolean z, boolean z2) {
        this(moduleDefineHolder, model, iMetricsDAO, null, null, null, z, z2);
    }

    @Override // org.apache.skywalking.oap.server.core.worker.AbstractWorker
    public void in(Metrics metrics) {
        this.aggregationCounter.inc();
        this.dataCarrier.produce(metrics);
    }

    @Override // org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker
    public void prepareBatch(Collection<Metrics> collection, List<PrepareRequest> list) {
        long currentTimeMillis = System.currentTimeMillis();
        if (collection.size() == 0) {
            return;
        }
        int max = Math.max(2000, collection.size());
        ArrayList arrayList = new ArrayList();
        for (Metrics metrics : collection) {
            this.transWorker.ifPresent(metricsTransWorker -> {
                metricsTransWorker.in(metrics);
            });
            arrayList.add(metrics);
            if (arrayList.size() == max) {
                flushDataToStorage(arrayList, list);
            }
        }
        if (arrayList.size() > 0) {
            flushDataToStorage(arrayList, list);
        }
        if (list.size() > 0) {
            log.debug("prepare batch requests for model {}, took time: {}, size: {}", new Object[]{this.model.getName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(list.size())});
        }
    }

    private void flushDataToStorage(List<Metrics> list, List<PrepareRequest> list2) {
        try {
            try {
                loadFromStorage(list);
                for (Metrics metrics : list) {
                    Metrics metrics2 = this.context.get(metrics);
                    if (metrics2 == null) {
                        metrics.calculate();
                        list2.add(this.metricsDAO.prepareBatchInsert(this.model, metrics));
                        nextWorker(metrics);
                    } else if (this.supportUpdate) {
                        metrics2.combine(metrics);
                        metrics2.calculate();
                        list2.add(this.metricsDAO.prepareBatchUpdate(this.model, metrics2));
                        nextWorker(metrics2);
                        this.nextExportWorker.ifPresent(abstractWorker -> {
                            abstractWorker.in(new ExportEvent(metrics, ExportEvent.EventType.INCREMENT));
                        });
                    }
                }
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
                list.clear();
            }
        } finally {
            list.clear();
        }
    }

    private void nextWorker(Metrics metrics) {
        this.nextAlarmWorker.ifPresent(abstractWorker -> {
            abstractWorker.in(metrics);
        });
        this.nextExportWorker.ifPresent(abstractWorker2 -> {
            abstractWorker2.in(new ExportEvent(metrics, ExportEvent.EventType.TOTAL));
        });
    }

    private void loadFromStorage(List<Metrics> list) throws IOException {
        if (!this.enableDatabaseSession) {
            this.context.clear();
        }
        ArrayList arrayList = new ArrayList();
        for (Metrics metrics : list) {
            if (!this.context.containsKey(metrics)) {
                arrayList.add(metrics.id());
            }
        }
        if (arrayList.size() > 0) {
            for (Metrics metrics2 : this.metricsDAO.multiGet(this.model, arrayList)) {
                this.context.put(metrics2, metrics2);
            }
        }
    }

    @Override // org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker
    public void endOfRound(long j) {
        if (this.enableDatabaseSession) {
            Iterator<Metrics> it = this.context.values().iterator();
            while (it.hasNext()) {
                Metrics next = it.next();
                next.extendSurvivalTime(j);
                if (next.getSurvivalTime() > 70000) {
                    it.remove();
                }
            }
        }
    }
}
