package org.apache.skywalking.oap.server.core.analysis.worker;

import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetric;
import org.apache.skywalking.oap.server.telemetry.api.MetricCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.class */
public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
    private static final Logger logger = LoggerFactory.getLogger(IndicatorAggregateWorker.class);
    private AbstractWorker<Indicator> nextWorker;
    private final DataCarrier<Indicator> dataCarrier;
    private final MergeDataCache<Indicator> mergeDataCache;
    private int messageNum;
    private final String modelName;
    private CounterMetric aggregationCounter;

    /* loaded from: input_file:org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker$AggregatorConsumer.class */
    private class AggregatorConsumer implements IConsumer<Indicator> {
        private final IndicatorAggregateWorker aggregator;

        private AggregatorConsumer(IndicatorAggregateWorker indicatorAggregateWorker) {
            this.aggregator = indicatorAggregateWorker;
        }

        public void init() {
        }

        public void consume(List<Indicator> list) {
            int i = 0;
            for (Indicator indicator : list) {
                i++;
                if (i == list.size()) {
                    indicator.getEndOfBatchContext().setEndOfBatch(true);
                }
                this.aggregator.onWork(indicator);
            }
        }

        public void onError(List<Indicator> list, Throwable th) {
            IndicatorAggregateWorker.logger.error(th.getMessage(), th);
        }

        public void onExit() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IndicatorAggregateWorker(ModuleManager moduleManager, int i, AbstractWorker<Indicator> abstractWorker, String str) {
        super(i);
        this.modelName = str;
        this.nextWorker = abstractWorker;
        this.mergeDataCache = new MergeDataCache<>();
        this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + str, 1, 10000);
        this.dataCarrier.consume(new AggregatorConsumer(this), 1);
        this.aggregationCounter = moduleManager.find("telemetry").provider().getService(MetricCreator.class).createCounter("indicator_aggregation", "The number of rows in aggregation", new MetricTag.Keys(new String[]{"metricName", "level", "dimensionality"}), new MetricTag.Values(new String[]{str, "1", "min"}));
    }

    @Override // org.apache.skywalking.oap.server.core.worker.AbstractWorker
    public final void in(Indicator indicator) {
        indicator.setEndOfBatchContext(new EndOfBatchContext(false));
        this.dataCarrier.produce(indicator);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onWork(Indicator indicator) {
        this.aggregationCounter.inc();
        this.messageNum++;
        aggregate(indicator);
        if (this.messageNum >= 1000 || indicator.getEndOfBatchContext().isEndOfBatch()) {
            sendToNext();
            this.messageNum = 0;
        }
    }

    private void sendToNext() {
        this.mergeDataCache.switchPointer();
        while (this.mergeDataCache.getLast().isWriting()) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
        this.mergeDataCache.getLast().collection().forEach(indicator -> {
            if (logger.isDebugEnabled()) {
                logger.debug(indicator.toString());
            }
            this.nextWorker.in(indicator);
        });
        this.mergeDataCache.finishReadingLast();
    }

    private void aggregate(Indicator indicator) {
        this.mergeDataCache.writing();
        if (this.mergeDataCache.containsKey(indicator)) {
            this.mergeDataCache.get(indicator).combine(indicator);
        } else {
            this.mergeDataCache.put(indicator);
        }
        this.mergeDataCache.finishWriting();
    }
}
