/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.metrics.lib.impl;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kylin.metrics.lib.ActiveReservoirListener;
import org.apache.kylin.metrics.lib.Record;
import org.apache.kylin.metrics.lib.impl.AbstractActiveReservoir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockingReservoir
extends AbstractActiveReservoir {
    private static final Logger logger = LoggerFactory.getLogger(BlockingReservoir.class);
    private final BlockingQueue<Record> recordsQueue;
    private final Thread scheduledReporter;
    private final int MIN_REPORT_SIZE;
    private final int MAX_REPORT_SIZE;
    private final long MAX_REPORT_TIME;
    private List<Record> records;

    public BlockingReservoir() {
        this(1, 100);
    }

    public BlockingReservoir(int minReportSize, int maxReportSize) {
        this(minReportSize, maxReportSize, 10);
    }

    public BlockingReservoir(int minReportSize, int maxReportSize, int MAX_REPORT_TIME) {
        this.MAX_REPORT_SIZE = maxReportSize;
        this.MIN_REPORT_SIZE = minReportSize;
        this.MAX_REPORT_TIME = (long)(MAX_REPORT_TIME * 60) * 1000L;
        this.recordsQueue = new LinkedBlockingQueue<Record>();
        this.listeners = Lists.newArrayList();
        this.records = Lists.newArrayListWithExpectedSize((int)this.MAX_REPORT_SIZE);
        this.scheduledReporter = new ThreadFactoryBuilder().setNameFormat("metrics-blocking-reservoir-scheduler-%d").build().newThread(new ReporterRunnable());
    }

    @Override
    public void update(Record record) {
        if (!this.isReady) {
            logger.info("Current reservoir is not ready for update record");
            return;
        }
        try {
            this.recordsQueue.put(record);
        }
        catch (InterruptedException e) {
            logger.warn("Thread is interrupted during putting value to blocking queue. \n" + e.toString());
        }
    }

    @Override
    public int size() {
        return this.recordsQueue.size();
    }

    private void onRecordUpdate(boolean ifAll) {
        if (ifAll) {
            this.records = Lists.newArrayList();
            this.recordsQueue.drainTo(this.records);
        } else {
            this.records.clear();
            this.recordsQueue.drainTo(this.records, this.MAX_REPORT_SIZE);
        }
        boolean ifSucceed = true;
        for (ActiveReservoirListener listener : this.listeners) {
            if (this.notifyListenerOfUpdatedRecord(listener, this.records)) continue;
            ifSucceed = false;
            logger.warn("It fails to notify listener " + listener.toString() + " of updated records " + this.records.toString());
        }
        if (!ifSucceed) {
            this.notifyListenerHAOfUpdatedRecord(this.records);
        }
    }

    private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener listener, List<Record> records) {
        return listener.onRecordUpdate(records);
    }

    private boolean notifyListenerHAOfUpdatedRecord(List<Record> records) {
        logger.info("The HA listener " + this.listenerHA.toString() + " for updated records " + records.toString() + " will be started");
        if (!this.notifyListenerOfUpdatedRecord(this.listenerHA, records)) {
            logger.error("The HA listener also fails!!!");
            return false;
        }
        return true;
    }

    @Override
    public void start() {
        super.start();
        this.scheduledReporter.start();
    }

    @Override
    public void stop() {
        super.stop();
        this.scheduledReporter.interrupt();
        try {
            this.scheduledReporter.join();
        }
        catch (InterruptedException e) {
            logger.warn("Interrupted during join");
            throw new RuntimeException(e);
        }
    }

    class ReporterRunnable
    implements Runnable {
        ReporterRunnable() {
        }

        @Override
        public void run() {
            long startTime = System.currentTimeMillis();
            while (BlockingReservoir.this.isReady) {
                if (BlockingReservoir.this.size() <= 0) {
                    logger.info("There's no record in the blocking queue.");
                    this.sleep();
                    startTime = System.currentTimeMillis();
                    continue;
                }
                if (BlockingReservoir.this.size() < BlockingReservoir.this.MIN_REPORT_SIZE && System.currentTimeMillis() - startTime < BlockingReservoir.this.MAX_REPORT_TIME) {
                    logger.info("The number of records in the blocking queue is less than " + BlockingReservoir.this.MIN_REPORT_SIZE + " and the duration from last reporting is less than " + BlockingReservoir.this.MAX_REPORT_TIME + "ms. Will delay to report!");
                    this.sleep();
                    continue;
                }
                BlockingReservoir.this.onRecordUpdate(false);
                startTime = System.currentTimeMillis();
            }
            BlockingReservoir.this.onRecordUpdate(true);
            logger.info("Reporter finishes reporting metrics.");
        }

        private void sleep() {
            try {
                Thread.sleep(60000L);
            }
            catch (InterruptedException e) {
                logger.warn("Interrupted during running");
            }
        }
    }
}

