package org.apache.kylin.metrics.lib.impl;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kylin.metrics.lib.ActiveReservoirListener;
import org.apache.kylin.metrics.lib.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-core-metrics-2.6.6.jar:org/apache/kylin/metrics/lib/impl/BlockingReservoir.class */
public class BlockingReservoir extends AbstractActiveReservoir {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) BlockingReservoir.class);
    private final BlockingQueue<Record> recordsQueue;
    private final Thread scheduledReporter;
    private final int MIN_REPORT_SIZE;
    private final int MAX_REPORT_SIZE;
    private final long MAX_REPORT_TIME;
    private List<Record> records;

    /* loaded from: input_file:WEB-INF/lib/kylin-core-metrics-2.6.6.jar:org/apache/kylin/metrics/lib/impl/BlockingReservoir$ReporterRunnable.class */
    class ReporterRunnable implements Runnable {
        ReporterRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            while (BlockingReservoir.this.isReady) {
                if (BlockingReservoir.this.size() <= 0) {
                    BlockingReservoir.logger.info("There's no record in the blocking queue.");
                    sleep();
                    currentTimeMillis = System.currentTimeMillis();
                } else if (BlockingReservoir.this.size() >= BlockingReservoir.this.MIN_REPORT_SIZE || System.currentTimeMillis() - currentTimeMillis >= BlockingReservoir.this.MAX_REPORT_TIME) {
                    BlockingReservoir.this.onRecordUpdate(false);
                    currentTimeMillis = System.currentTimeMillis();
                } else {
                    BlockingReservoir.logger.info("The number of records in the blocking queue is less than " + BlockingReservoir.this.MIN_REPORT_SIZE + " and the duration from last reporting is less than " + BlockingReservoir.this.MAX_REPORT_TIME + "ms. Will delay to report!");
                    sleep();
                }
            }
            BlockingReservoir.this.onRecordUpdate(true);
            BlockingReservoir.logger.info("Reporter finishes reporting metrics.");
        }

        private void sleep() {
            try {
                Thread.sleep(60000L);
            } catch (InterruptedException e) {
                BlockingReservoir.logger.warn("Interrupted during running");
            }
        }
    }

    public BlockingReservoir() {
        this(1, 100);
    }

    public BlockingReservoir(int i, int i2) {
        this(i, i2, 10);
    }

    public BlockingReservoir(int i, int i2, int i3) {
        this.MAX_REPORT_SIZE = i2;
        this.MIN_REPORT_SIZE = i;
        this.MAX_REPORT_TIME = i3 * 60 * 1000;
        this.recordsQueue = new LinkedBlockingQueue();
        this.listeners = Lists.newArrayList();
        this.records = Lists.newArrayListWithExpectedSize(this.MAX_REPORT_SIZE);
        this.scheduledReporter = new ThreadFactoryBuilder().setNameFormat("metrics-blocking-reservoir-scheduler-%d").build().newThread(new ReporterRunnable());
    }

    @Override // org.apache.kylin.metrics.lib.ActiveReservoir
    public void update(Record record) {
        if (!this.isReady) {
            logger.info("Current reservoir is not ready for update record");
            return;
        }
        try {
            this.recordsQueue.put(record);
        } catch (InterruptedException e) {
            logger.warn("Thread is interrupted during putting value to blocking queue. \n" + e.toString());
        }
    }

    @Override // org.apache.kylin.metrics.lib.ActiveReservoir
    public int size() {
        return this.recordsQueue.size();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onRecordUpdate(boolean z) {
        if (z) {
            this.records = Lists.newArrayList();
            this.recordsQueue.drainTo(this.records);
        } else {
            this.records.clear();
            this.recordsQueue.drainTo(this.records, this.MAX_REPORT_SIZE);
        }
        boolean z2 = true;
        for (ActiveReservoirListener activeReservoirListener : this.listeners) {
            if (!notifyListenerOfUpdatedRecord(activeReservoirListener, this.records)) {
                z2 = false;
                logger.warn("It fails to notify listener " + activeReservoirListener.toString() + " of updated records " + this.records.toString());
            }
        }
        if (z2) {
            return;
        }
        notifyListenerHAOfUpdatedRecord(this.records);
    }

    private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener activeReservoirListener, List<Record> list) {
        return activeReservoirListener.onRecordUpdate(list);
    }

    private boolean notifyListenerHAOfUpdatedRecord(List<Record> list) {
        logger.info("The HA listener " + this.listenerHA.toString() + " for updated records " + list.toString() + " will be started");
        if (notifyListenerOfUpdatedRecord(this.listenerHA, list)) {
            return true;
        }
        logger.error("The HA listener also fails!!!");
        return false;
    }

    @Override // org.apache.kylin.metrics.lib.impl.AbstractActiveReservoir, org.apache.kylin.metrics.lib.ActiveReservoir
    public void start() {
        super.start();
        this.scheduledReporter.start();
    }

    @Override // org.apache.kylin.metrics.lib.impl.AbstractActiveReservoir, org.apache.kylin.metrics.lib.ActiveReservoir
    public void stop() {
        super.stop();
        this.scheduledReporter.interrupt();
        try {
            this.scheduledReporter.join();
        } catch (InterruptedException e) {
            logger.warn("Interrupted during join");
            throw new RuntimeException(e);
        }
    }
}
