package com.ning.billing.meter.timeline.aggregator;

import com.google.inject.Inject;
import com.ning.billing.meter.MeterConfig;
import com.ning.billing.meter.timeline.MeterInternalCallContext;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.chunks.TimelineChunkMapper;
import com.ning.billing.meter.timeline.codec.SampleCoder;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.meter.timeline.times.TimelineCoder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.sqlobject.stringtemplate.StringTemplate3StatementLocator;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/billing/meter/timeline/aggregator/TimelineAggregator.class */
public class TimelineAggregator {
    private static final Logger log = LoggerFactory.getLogger(TimelineAggregator.class);
    private final IDBI dbi;
    private final TimelineDao timelineDao;
    private final TimelineCoder timelineCoder;
    private final SampleCoder sampleCoder;
    private final MeterConfig config;
    private final TimelineAggregatorSqlDao aggregatorSqlDao;
    private final ScheduledExecutorService aggregatorThread = Executors.newSingleThreadScheduledExecutor();
    private final Map<String, AtomicLong> aggregatorCounters = new LinkedHashMap();
    private final AtomicBoolean isAggregating = new AtomicBoolean(false);
    private final AtomicLong aggregationRuns = new AtomicLong();
    private final AtomicLong foundNothingRuns = new AtomicLong();
    private final AtomicLong aggregatesCreated = makeCounter("aggsCreated");
    private final AtomicLong timelineChunksConsidered = makeCounter("chunksConsidered");
    private final AtomicLong timelineChunkBatchesProcessed = makeCounter("batchesProcessed");
    private final AtomicLong timelineChunksCombined = makeCounter("chunksCombined");
    private final AtomicLong timelineChunksQueuedForCreation = makeCounter("chunksQueued");
    private final AtomicLong timelineChunksWritten = makeCounter("chunksWritten");
    private final AtomicLong timelineChunksInvalidatedOrDeleted = makeCounter("chunksInvalidatedOrDeleted");
    private final AtomicLong timelineChunksBytesCreated = makeCounter("bytesCreated");
    private final AtomicLong msSpentAggregating = makeCounter("msSpentAggregating");
    private final AtomicLong msSpentSleeping = makeCounter("msSpentSleeping");
    private final AtomicLong msWritingDb = makeCounter("msWritingDb");
    private final List<TimelineChunk> chunksToWrite = new ArrayList();
    private final List<Long> chunkIdsToInvalidateOrDelete = new ArrayList();
    private final TimelineChunkMapper timelineChunkMapper = new TimelineChunkMapper();

    @Inject
    public TimelineAggregator(IDBI idbi, TimelineDao timelineDao, TimelineCoder timelineCoder, SampleCoder sampleCoder, MeterConfig meterConfig) {
        this.dbi = idbi;
        this.timelineDao = timelineDao;
        this.timelineCoder = timelineCoder;
        this.sampleCoder = sampleCoder;
        this.config = meterConfig;
        this.aggregatorSqlDao = (TimelineAggregatorSqlDao) idbi.onDemand(TimelineAggregatorSqlDao.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int aggregateTimelineCandidates(List<TimelineChunk> list, int i, int i2) {
        TimelineChunk timelineChunk = list.get(0);
        log.debug("For sourceId {}, metricId {}, looking to aggregate {} candidates in {} chunks", Integer.valueOf(timelineChunk.getSourceId()), Integer.valueOf(timelineChunk.getMetricId()), Integer.valueOf(list.size()), Integer.valueOf(i2));
        int i3 = 0;
        int i4 = 0;
        while (list.size() >= i4 + i2) {
            List<TimelineChunk> subList = list.subList(i4, i4 + i2);
            i4 += i2;
            this.timelineChunksCombined.addAndGet(i2);
            try {
                aggregateHostSampleChunks(subList, i);
            } catch (IOException e) {
                log.error(String.format("IOException aggregating {} chunks, sourceId %s, metricId %s, looking to aggregate %s candidates in %s chunks", Integer.valueOf(timelineChunk.getSourceId()), Integer.valueOf(timelineChunk.getMetricId()), Integer.valueOf(list.size()), Integer.valueOf(i2)), (Throwable) e);
            }
            i3++;
        }
        return i3;
    }

    private void aggregateHostSampleChunks(List<TimelineChunk> list, int i) throws IOException {
        TimelineChunk timelineChunk = list.get(0);
        TimelineChunk timelineChunk2 = list.get(list.size() - 1);
        int size = list.size();
        int sourceId = timelineChunk.getSourceId();
        DateTime startTime = timelineChunk.getStartTime();
        DateTime endTime = timelineChunk2.getEndTime();
        ArrayList arrayList = new ArrayList(size);
        try {
            ArrayList arrayList2 = new ArrayList(size);
            ArrayList arrayList3 = new ArrayList(size);
            int i2 = 0;
            for (TimelineChunk timelineChunk3 : list) {
                arrayList.add(timelineChunk3.getTimeBytesAndSampleBytes().getTimeBytes());
                arrayList2.add(timelineChunk3.getTimeBytesAndSampleBytes().getSampleBytes());
                i2 += timelineChunk3.getSampleCount();
                arrayList3.add(Long.valueOf(timelineChunk3.getChunkId()));
            }
            byte[] combineTimelines = this.timelineCoder.combineTimelines(arrayList, Integer.valueOf(i2));
            byte[] combineSampleBytes = this.sampleCoder.combineSampleBytes(arrayList2);
            int length = 4 + combineTimelines.length + combineSampleBytes.length;
            log.debug("For sourceId {}, aggregationLevel {}, aggregating {} timelines ({} bytes, {} samples): {}", Integer.valueOf(timelineChunk.getSourceId()), Integer.valueOf(timelineChunk.getAggregationLevel()), Integer.valueOf(list.size()), Integer.valueOf(length), Integer.valueOf(i2));
            this.timelineChunksBytesCreated.addAndGet(length);
            this.chunksToWrite.add(new TimelineChunk(0L, sourceId, timelineChunk.getMetricId(), startTime, endTime, combineTimelines, combineSampleBytes, i2, i + 1, false, false));
            this.chunkIdsToInvalidateOrDelete.addAll(arrayList3);
            this.timelineChunksQueuedForCreation.incrementAndGet();
            if (this.chunkIdsToInvalidateOrDelete.size() >= this.config.getMaxChunkIdsToInvalidateOrDelete()) {
                performWrites();
            }
        } catch (Exception e) {
            log.error(String.format("Exception aggregating level %d, sourceId %d, metricId %d, startTime %s, endTime %s", Integer.valueOf(i), Integer.valueOf(sourceId), Integer.valueOf(timelineChunk.getMetricId()), startTime, endTime), (Throwable) e);
        }
    }

    private void performWrites() {
        MeterInternalCallContext meterInternalCallContext = new MeterInternalCallContext();
        long currentTimeMillis = System.currentTimeMillis();
        this.aggregatorSqlDao.begin();
        this.timelineDao.bulkInsertTimelineChunks(this.chunksToWrite, meterInternalCallContext);
        if (this.config.getDeleteAggregatedChunks()) {
            this.aggregatorSqlDao.deleteTimelineChunks(this.chunkIdsToInvalidateOrDelete, meterInternalCallContext);
        } else {
            this.aggregatorSqlDao.makeTimelineChunksInvalid(this.chunkIdsToInvalidateOrDelete, meterInternalCallContext);
        }
        this.aggregatorSqlDao.commit();
        this.msWritingDb.addAndGet(System.currentTimeMillis() - currentTimeMillis);
        this.timelineChunksWritten.addAndGet(this.chunksToWrite.size());
        this.timelineChunksInvalidatedOrDeleted.addAndGet(this.chunkIdsToInvalidateOrDelete.size());
        this.chunksToWrite.clear();
        this.chunkIdsToInvalidateOrDelete.clear();
        long millis = this.config.getAggregationSleepBetweenBatches().getMillis();
        if (millis > 0) {
            long currentTimeMillis2 = System.currentTimeMillis();
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.msSpentSleeping.addAndGet(System.currentTimeMillis() - currentTimeMillis2);
        }
        this.timelineChunkBatchesProcessed.incrementAndGet();
    }

    public void getAndProcessTimelineAggregationCandidates() {
        if (!this.isAggregating.compareAndSet(false, true)) {
            log.info("Asked to aggregate, but we're already aggregating!");
            return;
        }
        log.debug("Starting aggregating");
        this.aggregationRuns.incrementAndGet();
        String[] split = this.config.getChunksToAggregate().split(",");
        int i = 0;
        while (true) {
            if (i >= this.config.getMaxAggregationLevel()) {
                break;
            }
            long j = this.aggregatesCreated.get();
            Map<String, Long> captureAggregatorCounters = captureAggregatorCounters();
            streamingAggregateLevel(i, Integer.parseInt(split[i >= split.length ? split.length - 1 : i]));
            Map<String, Long> subtractFromAggregatorCounters = subtractFromAggregatorCounters(captureAggregatorCounters);
            if (this.aggregatesCreated.get() - j == 0) {
                if (i == 0) {
                    this.foundNothingRuns.incrementAndGet();
                }
                log.debug("Created no new aggregates, so skipping higher-level aggregations");
            } else {
                StringBuilder sb = new StringBuilder();
                sb.append("For aggregation level ").append(i).append(", runs ").append(this.aggregationRuns.get()).append(", foundNothingRuns ").append(this.foundNothingRuns.get());
                for (Map.Entry<String, Long> entry : subtractFromAggregatorCounters.entrySet()) {
                    sb.append(", ").append(entry.getKey()).append(": ").append(entry.getValue());
                }
                log.info(sb.toString());
                i++;
            }
        }
        log.debug("Aggregation done");
        this.isAggregating.set(false);
    }

    private void streamingAggregateLevel(final int i, final int i2) {
        final ArrayList arrayList = new ArrayList();
        final TimelineChunkConsumer timelineChunkConsumer = new TimelineChunkConsumer() { // from class: com.ning.billing.meter.timeline.aggregator.TimelineAggregator.1
            int lastSourceId = 0;
            int lastMetricId = 0;

            @Override // com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer
            public void processTimelineChunk(TimelineChunk timelineChunk) {
                TimelineAggregator.this.timelineChunksConsidered.incrementAndGet();
                int sourceId = timelineChunk.getSourceId();
                int metricId = timelineChunk.getMetricId();
                if (this.lastSourceId == 0) {
                    this.lastSourceId = sourceId;
                    this.lastMetricId = metricId;
                }
                if (this.lastSourceId != sourceId || this.lastMetricId != metricId) {
                    TimelineAggregator.this.aggregatesCreated.addAndGet(TimelineAggregator.this.aggregateTimelineCandidates(arrayList, i, i2));
                    arrayList.clear();
                    this.lastSourceId = sourceId;
                    this.lastMetricId = metricId;
                }
                arrayList.add(timelineChunk);
            }
        };
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.dbi.withHandle(new HandleCallback<Void>() { // from class: com.ning.billing.meter.timeline.aggregator.TimelineAggregator.2
                /* JADX WARN: Can't rename method to resolve collision */
                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.skife.jdbi.v2.tweak.HandleCallback
                public Void withHandle(Handle handle) throws Exception {
                    Query bind = handle.createQuery("getStreamingAggregationCandidates").setFetchSize(Integer.MIN_VALUE).bind("aggregationLevel", i).bind("tenantRecordId", 0L);
                    bind.setStatementLocator(new StringTemplate3StatementLocator(TimelineAggregatorSqlDao.class));
                    ResultIterator resultIterator = null;
                    try {
                        try {
                            resultIterator = bind.map(TimelineAggregator.this.timelineChunkMapper).iterator();
                            while (resultIterator.hasNext()) {
                                timelineChunkConsumer.processTimelineChunk((TimelineChunk) resultIterator.next());
                            }
                            if (resultIterator == null) {
                                return null;
                            }
                            resultIterator.close();
                            return null;
                        } catch (Exception e) {
                            TimelineAggregator.log.error(String.format("Exception during aggregation of level %d", Integer.valueOf(i)), (Throwable) e);
                            if (resultIterator == null) {
                                return null;
                            }
                            resultIterator.close();
                            return null;
                        }
                    } catch (Throwable th) {
                        if (resultIterator != null) {
                            resultIterator.close();
                        }
                        throw th;
                    }
                }
            });
            if (arrayList.size() >= i2) {
                this.aggregatesCreated.addAndGet(aggregateTimelineCandidates(arrayList, i, i2));
            }
            if (this.chunkIdsToInvalidateOrDelete.size() > 0) {
                performWrites();
            }
        } finally {
            this.msSpentAggregating.addAndGet(System.currentTimeMillis() - currentTimeMillis);
        }
    }

    private AtomicLong makeCounter(String str) {
        AtomicLong atomicLong = new AtomicLong();
        this.aggregatorCounters.put(str, atomicLong);
        return atomicLong;
    }

    private Map<String, Long> captureAggregatorCounters() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<String, AtomicLong> entry : this.aggregatorCounters.entrySet()) {
            linkedHashMap.put(entry.getKey(), Long.valueOf(entry.getValue().get()));
        }
        return linkedHashMap;
    }

    private Map<String, Long> subtractFromAggregatorCounters(Map<String, Long> map) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<String, AtomicLong> entry : this.aggregatorCounters.entrySet()) {
            String key = entry.getKey();
            linkedHashMap.put(key, Long.valueOf(entry.getValue().get() - map.get(key).longValue()));
        }
        return linkedHashMap;
    }

    public void runAggregationThread() {
        this.aggregatorThread.scheduleWithFixedDelay(new Runnable() { // from class: com.ning.billing.meter.timeline.aggregator.TimelineAggregator.3
            @Override // java.lang.Runnable
            public void run() {
                TimelineAggregator.this.getAndProcessTimelineAggregationCandidates();
            }
        }, this.config.getAggregationInterval().getMillis(), this.config.getAggregationInterval().getMillis(), TimeUnit.MILLISECONDS);
    }

    public void stopAggregationThread() {
        this.aggregatorThread.shutdown();
    }

    public long getAggregationRuns() {
        return this.aggregationRuns.get();
    }

    public long getFoundNothingRuns() {
        return this.foundNothingRuns.get();
    }

    public long getTimelineChunksConsidered() {
        return this.timelineChunksConsidered.get();
    }

    public long getTimelineChunkBatchesProcessed() {
        return this.timelineChunkBatchesProcessed.get();
    }

    public long getTimelineChunksCombined() {
        return this.timelineChunksCombined.get();
    }

    public long getTimelineChunksQueuedForCreation() {
        return this.timelineChunksQueuedForCreation.get();
    }

    public long getTimelineChunksWritten() {
        return this.timelineChunksWritten.get();
    }

    public long getTimelineChunksInvalidatedOrDeleted() {
        return this.timelineChunksInvalidatedOrDeleted.get();
    }

    public long getTimelineChunksBytesCreated() {
        return this.timelineChunksBytesCreated.get();
    }

    public long getMsSpentAggregating() {
        return this.msSpentAggregating.get();
    }

    public long getMsSpentSleeping() {
        return this.msSpentSleeping.get();
    }

    public long getMsWritingDb() {
        return this.msWritingDb.get();
    }

    public void initiateAggregation() {
        log.info("Starting user-initiated aggregation");
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: com.ning.billing.meter.timeline.aggregator.TimelineAggregator.4
            @Override // java.lang.Runnable
            public void run() {
                TimelineAggregator.this.getAndProcessTimelineAggregationCandidates();
            }
        });
    }
}
