package com.ning.billing.meter.timeline;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.ning.billing.meter.MeterConfig;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.codec.SampleCoder;
import com.ning.billing.meter.timeline.persistent.FileBackedBuffer;
import com.ning.billing.meter.timeline.persistent.Replayer;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.meter.timeline.samples.ScalarSample;
import com.ning.billing.meter.timeline.shutdown.ShutdownSaveMode;
import com.ning.billing.meter.timeline.shutdown.StartTimes;
import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
import com.ning.billing.meter.timeline.times.TimelineCoder;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.TenantContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/billing/meter/timeline/TimelineEventHandler.class */
public class TimelineEventHandler {
    private final MeterConfig config;
    private final TimelineDao timelineDAO;
    private final TimelineCoder timelineCoder;
    private final SampleCoder sampleCoder;
    private final BackgroundDBChunkWriter backgroundWriter;
    private final FileBackedBuffer backingBuffer;
    private final ShutdownSaveMode shutdownSaveMode;
    private static final Logger log = LoggerFactory.getLogger(TimelineEventHandler.class);
    private static final Comparator<TimelineChunk> CHUNK_COMPARATOR = new Comparator<TimelineChunk>() { // from class: com.ning.billing.meter.timeline.TimelineEventHandler.1
        @Override // java.util.Comparator
        public int compare(TimelineChunk timelineChunk, TimelineChunk timelineChunk2) {
            int sourceId = timelineChunk.getSourceId() - timelineChunk.getSourceId();
            if (sourceId < 0) {
                return -1;
            }
            if (sourceId > 0) {
                return 1;
            }
            int metricId = timelineChunk.getMetricId() - timelineChunk2.getMetricId();
            if (metricId < 0) {
                return -1;
            }
            if (metricId > 0) {
                return 1;
            }
            long millis = timelineChunk.getStartTime().getMillis() - timelineChunk2.getStartTime().getMillis();
            if (millis < 0) {
                return -1;
            }
            return millis > 0 ? 1 : 0;
        }
    };
    private final ScheduledExecutorService purgeThread = Executors.newSingleThreadScheduledExecutor();
    private final Map<Integer, SourceAccumulatorsAndUpdateDate> accumulators = new ConcurrentHashMap();
    private final AtomicBoolean shuttingDown = new AtomicBoolean();
    private final AtomicBoolean replaying = new AtomicBoolean();
    private final AtomicLong eventsDiscarded = new AtomicLong(0);
    private final AtomicLong eventsReceivedAfterShuttingDown = new AtomicLong();
    private final AtomicLong handledEventCount = new AtomicLong();
    private final AtomicLong addedSourceEventAccumulatorMapCount = new AtomicLong();
    private final AtomicLong addedSourceEventAccumulatorCount = new AtomicLong();
    private final AtomicLong getInMemoryChunksCallCount = new AtomicLong();
    private final AtomicLong accumulatorDeepCopyCount = new AtomicLong();
    private final AtomicLong inMemoryChunksReturnedCount = new AtomicLong();
    private final AtomicLong replayCount = new AtomicLong();
    private final AtomicLong replaySamplesFoundCount = new AtomicLong();
    private final AtomicLong replaySamplesOutsideTimeRangeCount = new AtomicLong();
    private final AtomicLong replaySamplesProcessedCount = new AtomicLong();
    private final AtomicLong forceCommitCallCount = new AtomicLong();
    private final AtomicLong purgedAccumsBecauseSourceNotUpdated = new AtomicLong();
    private final AtomicLong purgedAccumsBecauseCategoryNotUpdated = new AtomicLong();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ning/billing/meter/timeline/TimelineEventHandler$SourceAccumulatorsAndUpdateDate.class */
    public static class SourceAccumulatorsAndUpdateDate {
        private final Map<Integer, TimelineSourceEventAccumulator> categoryAccumulators;
        private DateTime lastUpdateDate;

        public SourceAccumulatorsAndUpdateDate(Map<Integer, TimelineSourceEventAccumulator> map, DateTime dateTime) {
            this.categoryAccumulators = map;
            this.lastUpdateDate = dateTime;
        }

        public Map<Integer, TimelineSourceEventAccumulator> getCategoryAccumulators() {
            return this.categoryAccumulators;
        }

        public DateTime getLastUpdateDate() {
            return this.lastUpdateDate;
        }

        public void markUpdated() {
            this.lastUpdateDate = new DateTime();
        }
    }

    @Inject
    public TimelineEventHandler(MeterConfig meterConfig, TimelineDao timelineDao, TimelineCoder timelineCoder, SampleCoder sampleCoder, BackgroundDBChunkWriter backgroundDBChunkWriter, FileBackedBuffer fileBackedBuffer) {
        this.config = meterConfig;
        this.timelineDAO = timelineDao;
        this.timelineCoder = timelineCoder;
        this.sampleCoder = sampleCoder;
        this.backgroundWriter = backgroundDBChunkWriter;
        this.backingBuffer = fileBackedBuffer;
        this.shutdownSaveMode = ShutdownSaveMode.fromString(meterConfig.getShutdownSaveMode());
    }

    private void saveAccumulators() {
        for (Map.Entry<Integer, SourceAccumulatorsAndUpdateDate> entry : this.accumulators.entrySet()) {
            int intValue = entry.getKey().intValue();
            for (Map.Entry<Integer, TimelineSourceEventAccumulator> entry2 : entry.getValue().getCategoryAccumulators().entrySet()) {
                int intValue2 = entry2.getKey().intValue();
                TimelineSourceEventAccumulator value = entry2.getValue();
                log.debug("Saving Timeline for sourceId [{}] and categoryId [{}]", Integer.valueOf(intValue), Integer.valueOf(intValue2));
                value.extractAndQueueTimelineChunks();
            }
        }
    }

    private void saveStartTimes(StartTimes startTimes) {
        for (Map.Entry<Integer, SourceAccumulatorsAndUpdateDate> entry : this.accumulators.entrySet()) {
            int intValue = entry.getKey().intValue();
            for (Map.Entry<Integer, TimelineSourceEventAccumulator> entry2 : entry.getValue().getCategoryAccumulators().entrySet()) {
                int intValue2 = entry2.getKey().intValue();
                TimelineSourceEventAccumulator value = entry2.getValue();
                log.debug("Saving Timeline start time for sourceId [{}] and category [{}]", Integer.valueOf(intValue), Integer.valueOf(intValue2));
                startTimes.addTime(intValue, intValue2, value.getStartTime());
            }
        }
    }

    public synchronized void purgeOldSourcesAndAccumulators(DateTime dateTime) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, SourceAccumulatorsAndUpdateDate> entry : this.accumulators.entrySet()) {
            int intValue = entry.getKey().intValue();
            SourceAccumulatorsAndUpdateDate value = entry.getValue();
            if (value.getLastUpdateDate().isBefore(dateTime)) {
                arrayList.add(Integer.valueOf(intValue));
                this.purgedAccumsBecauseSourceNotUpdated.incrementAndGet();
                Iterator<TimelineSourceEventAccumulator> it = value.getCategoryAccumulators().values().iterator();
                while (it.hasNext()) {
                    it.next().extractAndQueueTimelineChunks();
                }
            } else {
                ArrayList arrayList2 = new ArrayList();
                Map<Integer, TimelineSourceEventAccumulator> categoryAccumulators = value.getCategoryAccumulators();
                for (Map.Entry<Integer, TimelineSourceEventAccumulator> entry2 : categoryAccumulators.entrySet()) {
                    int intValue2 = entry2.getKey().intValue();
                    TimelineSourceEventAccumulator value2 = entry2.getValue();
                    DateTime latestSampleAddTime = value2.getLatestSampleAddTime();
                    if (latestSampleAddTime != null && latestSampleAddTime.isBefore(dateTime)) {
                        this.purgedAccumsBecauseCategoryNotUpdated.incrementAndGet();
                        value2.extractAndQueueTimelineChunks();
                        arrayList2.add(Integer.valueOf(intValue2));
                    }
                }
                Iterator it2 = arrayList2.iterator();
                while (it2.hasNext()) {
                    categoryAccumulators.remove(Integer.valueOf(((Integer) it2.next()).intValue()));
                }
            }
        }
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            this.accumulators.remove(Integer.valueOf(((Integer) it3.next()).intValue()));
        }
    }

    public void record(String str, String str2, DateTime dateTime, Map<String, Object> map, CallContext callContext) {
        if (this.shuttingDown.get()) {
            this.eventsReceivedAfterShuttingDown.incrementAndGet();
            return;
        }
        try {
            this.handledEventCount.incrementAndGet();
            int orAddSource = this.timelineDAO.getOrAddSource(str, callContext);
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            convertSamplesToScalarSamples(str2, map, linkedHashMap, callContext);
            if (linkedHashMap.isEmpty()) {
                this.eventsDiscarded.incrementAndGet();
                return;
            }
            SourceSamplesForTimestamp sourceSamplesForTimestamp = new SourceSamplesForTimestamp(Integer.valueOf(orAddSource), str2, dateTime, linkedHashMap);
            if (!this.replaying.get() && this.config.storeSamplesLocallyTemporary()) {
                this.backingBuffer.append(sourceSamplesForTimestamp);
            }
            processSamples(sourceSamplesForTimestamp, callContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public TimelineSourceEventAccumulator getOrAddSourceEventAccumulator(int i, int i2, DateTime dateTime) {
        return getOrAddSourceEventAccumulator(i, i2, dateTime, (int) this.config.getTimelineLength().getMillis());
    }

    public synchronized TimelineSourceEventAccumulator getOrAddSourceEventAccumulator(int i, int i2, DateTime dateTime, int i3) {
        SourceAccumulatorsAndUpdateDate sourceAccumulatorsAndUpdateDate = this.accumulators.get(Integer.valueOf(i));
        if (sourceAccumulatorsAndUpdateDate == null) {
            this.addedSourceEventAccumulatorMapCount.incrementAndGet();
            sourceAccumulatorsAndUpdateDate = new SourceAccumulatorsAndUpdateDate(new HashMap(), new DateTime());
            this.accumulators.put(Integer.valueOf(i), sourceAccumulatorsAndUpdateDate);
        }
        sourceAccumulatorsAndUpdateDate.markUpdated();
        Map<Integer, TimelineSourceEventAccumulator> categoryAccumulators = sourceAccumulatorsAndUpdateDate.getCategoryAccumulators();
        TimelineSourceEventAccumulator timelineSourceEventAccumulator = categoryAccumulators.get(Integer.valueOf(i2));
        if (timelineSourceEventAccumulator == null) {
            this.addedSourceEventAccumulatorCount.incrementAndGet();
            timelineSourceEventAccumulator = new TimelineSourceEventAccumulator(this.timelineDAO, this.timelineCoder, this.sampleCoder, this.backgroundWriter, i, i2, dateTime, Integer.valueOf(i3));
            categoryAccumulators.put(Integer.valueOf(i2), timelineSourceEventAccumulator);
            log.debug("Created new Timeline for sourceId [{}] and category [{}]", Integer.valueOf(i), Integer.valueOf(i2));
        }
        return timelineSourceEventAccumulator;
    }

    @VisibleForTesting
    public void processSamples(SourceSamplesForTimestamp sourceSamplesForTimestamp, TenantContext tenantContext) throws ExecutionException, IOException {
        getOrAddSourceEventAccumulator(sourceSamplesForTimestamp.getSourceId(), this.timelineDAO.getEventCategoryId(sourceSamplesForTimestamp.getCategory(), tenantContext).intValue(), sourceSamplesForTimestamp.getTimestamp()).addSourceSamples(sourceSamplesForTimestamp);
    }

    public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(Integer num, @Nullable DateTime dateTime, @Nullable DateTime dateTime2, TenantContext tenantContext) throws IOException, ExecutionException {
        return getInMemoryTimelineChunks(num, (List<Integer>) ImmutableList.copyOf(this.timelineDAO.getMetrics(tenantContext).keySet()), dateTime, dateTime2, tenantContext);
    }

    public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(Integer num, Integer num2, @Nullable DateTime dateTime, @Nullable DateTime dateTime2, TenantContext tenantContext) throws IOException, ExecutionException {
        return getInMemoryTimelineChunks(num, (List<Integer>) ImmutableList.of(num2), dateTime, dateTime2, tenantContext);
    }

    public synchronized Collection<? extends TimelineChunk> getInMemoryTimelineChunks(Integer num, List<Integer> list, @Nullable DateTime dateTime, @Nullable DateTime dateTime2, TenantContext tenantContext) throws IOException, ExecutionException {
        this.getInMemoryChunksCallCount.incrementAndGet();
        SourceAccumulatorsAndUpdateDate sourceAccumulatorsAndUpdateDate = this.accumulators.get(num);
        if (sourceAccumulatorsAndUpdateDate == null) {
            return ImmutableList.of();
        }
        ArrayList arrayList = new ArrayList();
        for (TimelineSourceEventAccumulator timelineSourceEventAccumulator : sourceAccumulatorsAndUpdateDate.getCategoryAccumulators().values()) {
            if (dateTime == null || !timelineSourceEventAccumulator.getEndTime().isBefore(dateTime)) {
                if (dateTime2 == null || !timelineSourceEventAccumulator.getStartTime().isAfter(dateTime2)) {
                    arrayList.addAll(timelineSourceEventAccumulator.getInMemoryTimelineChunks(list));
                }
            }
        }
        this.inMemoryChunksReturnedCount.addAndGet(arrayList.size());
        Collections.sort(arrayList, CHUNK_COMPARATOR);
        return arrayList;
    }

    @VisibleForTesting
    void convertSamplesToScalarSamples(String str, Map<String, Object> map, Map<Integer, ScalarSample> map2, CallContext callContext) {
        if (map == null) {
            return;
        }
        Integer valueOf = Integer.valueOf(this.timelineDAO.getOrAddEventCategory(str, callContext));
        for (String str2 : map.keySet()) {
            map2.put(Integer.valueOf(this.timelineDAO.getOrAddMetric(valueOf, str2, callContext)), ScalarSample.fromObject(map.get(str2)));
        }
    }

    public void replay(String str, final CallContext callContext) {
        this.replayCount.incrementAndGet();
        log.info("Starting replay of files in {}", str);
        Replayer replayer = new Replayer(str);
        StartTimes startTimes = null;
        if (this.shutdownSaveMode == ShutdownSaveMode.SAVE_START_TIMES) {
            startTimes = this.timelineDAO.getLastStartTimes(callContext);
            if (startTimes == null) {
                log.info("Did not find startTimes");
            } else {
                log.info("Retrieved startTimes from the db");
            }
        }
        final StartTimes startTimes2 = startTimes;
        DateTime minStartTime = startTimes == null ? null : startTimes2.getMinStartTime();
        long j = this.replaySamplesFoundCount.get();
        long j2 = this.replaySamplesOutsideTimeRangeCount.get();
        long j3 = this.replaySamplesProcessedCount.get();
        try {
            try {
                this.replaying.set(true);
                int readAll = replayer.readAll(startTimes2 == null, minStartTime, new Function<SourceSamplesForTimestamp, Void>() { // from class: com.ning.billing.meter.timeline.TimelineEventHandler.2
                    public Void apply(@Nullable SourceSamplesForTimestamp sourceSamplesForTimestamp) {
                        if (sourceSamplesForTimestamp == null) {
                            return null;
                        }
                        TimelineEventHandler.this.replaySamplesFoundCount.incrementAndGet();
                        boolean z = true;
                        try {
                            int sourceId = sourceSamplesForTimestamp.getSourceId();
                            int intValue = TimelineEventHandler.this.timelineDAO.getEventCategoryId(sourceSamplesForTimestamp.getCategory(), callContext).intValue();
                            if (startTimes2 != null) {
                                DateTime timestamp = sourceSamplesForTimestamp.getTimestamp();
                                DateTime startTimeForSourceIdAndCategoryId = startTimes2.getStartTimeForSourceIdAndCategoryId(sourceId, intValue);
                                if (timestamp == null || timestamp.isBefore(startTimes2.getMinStartTime()) || (startTimeForSourceIdAndCategoryId != null && timestamp.isBefore(startTimeForSourceIdAndCategoryId))) {
                                    TimelineEventHandler.this.replaySamplesOutsideTimeRangeCount.incrementAndGet();
                                    z = false;
                                }
                            }
                            if (z) {
                                TimelineEventHandler.this.replaySamplesProcessedCount.incrementAndGet();
                                TimelineEventHandler.this.processSamples(sourceSamplesForTimestamp, callContext);
                            }
                            return null;
                        } catch (Exception e) {
                            TimelineEventHandler.log.warn("Got exception replaying sample, data potentially lost! {}", sourceSamplesForTimestamp.toString());
                            return null;
                        }
                    }
                });
                if (this.shutdownSaveMode == ShutdownSaveMode.SAVE_START_TIMES) {
                    this.timelineDAO.deleteLastStartTimes(callContext);
                    log.info("Deleted old startTimes");
                }
                log.info(String.format("Replay completed; %d files skipped, samples read %d, samples outside time range %d, samples used %d", Integer.valueOf(readAll), Long.valueOf(this.replaySamplesFoundCount.get() - j), Long.valueOf(this.replaySamplesOutsideTimeRangeCount.get() - j2), Long.valueOf(this.replaySamplesProcessedCount.get() - j3)));
                this.replaying.set(false);
            } catch (RuntimeException e) {
                log.error("Ignoring error when replaying the data", e);
                this.replaying.set(false);
            }
        } catch (Throwable th) {
            this.replaying.set(false);
            throw th;
        }
    }

    public void forceCommit() {
        this.forceCommitCallCount.incrementAndGet();
        saveAccumulators();
        discardBackingBuffer();
        log.info("Timelines committed");
    }

    public void commitAndShutdown(CallContext callContext) {
        this.shuttingDown.set(true);
        if (this.shutdownSaveMode == ShutdownSaveMode.SAVE_START_TIMES) {
            StartTimes startTimes = new StartTimes();
            saveStartTimes(startTimes);
            this.timelineDAO.insertLastStartTimes(startTimes, callContext);
            log.info("During shutdown, saved timeline start times in the db");
        } else {
            saveAccumulators();
            log.info("During shutdown, saved timeline accumulators");
        }
        performShutdown();
        discardBackingBuffer();
    }

    private void discardBackingBuffer() {
        if (this.config.storeSamplesLocallyTemporary()) {
            this.backingBuffer.discard();
        }
    }

    private void performShutdown() {
        this.backgroundWriter.initiateShutdown();
        while (!this.backgroundWriter.getShutdownFinished()) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.purgeThread.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void purgeFilesAndAccumulators() {
        purgeFilesAndAccumulators(new DateTime().minus(this.config.getTimelineLength().getMillis()), new DateTime().minus(2 * this.config.getTimelineLength().getMillis()));
    }

    private synchronized void purgeFilesAndAccumulators(DateTime dateTime, DateTime dateTime2) {
        purgeOldSourcesAndAccumulators(dateTime);
        new Replayer(this.config.getSpoolDir()).purgeOldFiles(dateTime2);
    }

    public void startPurgeThread() {
        this.purgeThread.scheduleWithFixedDelay(new Runnable() { // from class: com.ning.billing.meter.timeline.TimelineEventHandler.3
            @Override // java.lang.Runnable
            public void run() {
                TimelineEventHandler.this.purgeFilesAndAccumulators();
            }
        }, this.config.getTimelineLength().getMillis(), this.config.getTimelineLength().getMillis(), TimeUnit.MILLISECONDS);
    }

    @VisibleForTesting
    public Collection<TimelineSourceEventAccumulator> getAccumulators() {
        ArrayList arrayList = new ArrayList();
        Iterator<SourceAccumulatorsAndUpdateDate> it = this.accumulators.values().iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getCategoryAccumulators().values());
        }
        return arrayList;
    }

    @VisibleForTesting
    public FileBackedBuffer getBackingBuffer() {
        return this.backingBuffer;
    }

    public long getEventsDiscarded() {
        return this.eventsDiscarded.get();
    }

    public long getSourceEventAccumulatorCount() {
        return this.accumulators.size();
    }

    public long getEventsReceivedAfterShuttingDown() {
        return this.eventsReceivedAfterShuttingDown.get();
    }

    public long getHandledEventCount() {
        return this.handledEventCount.get();
    }

    public long getAddedSourceEventAccumulatorMapCount() {
        return this.addedSourceEventAccumulatorMapCount.get();
    }

    public long getAddedSourceEventAccumulatorCount() {
        return this.addedSourceEventAccumulatorCount.get();
    }

    public long getGetInMemoryChunksCallCount() {
        return this.getInMemoryChunksCallCount.get();
    }

    public long getAccumulatorDeepCopyCount() {
        return this.accumulatorDeepCopyCount.get();
    }

    public long getInMemoryChunksReturnedCount() {
        return this.inMemoryChunksReturnedCount.get();
    }

    public long getReplayCount() {
        return this.replayCount.get();
    }

    public long getReplaySamplesFoundCount() {
        return this.replaySamplesFoundCount.get();
    }

    public long getReplaySamplesOutsideTimeRangeCount() {
        return this.replaySamplesOutsideTimeRangeCount.get();
    }

    public long getReplaySamplesProcessedCount() {
        return this.replaySamplesProcessedCount.get();
    }

    public long getForceCommitCallCount() {
        return this.forceCommitCallCount.get();
    }

    public long getPurgedAccumsBecauseSourceNotUpdated() {
        return this.purgedAccumsBecauseSourceNotUpdated.get();
    }

    public long getPurgedAccumsBecauseCategoryNotUpdated() {
        return this.purgedAccumsBecauseCategoryNotUpdated.get();
    }
}
