package com.ning.billing.meter.timeline.aggregator;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.ning.billing.meter.MeterTestSuiteWithEmbeddedDB;
import com.ning.billing.meter.timeline.TimelineSourceEventAccumulator;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
import com.ning.billing.meter.timeline.codec.SampleCoder;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.meter.timeline.samples.ScalarSample;
import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
import com.ning.billing.meter.timeline.times.TimelineCoder;
import com.ning.billing.util.cache.CacheControllerDispatcher;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.config.MeterConfig;
import com.ning.billing.util.dao.NonEntityDao;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mockito.Mockito;
import org.skife.config.ConfigurationObjectFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.class */
public class TestTimelineAggregator extends MeterTestSuiteWithEmbeddedDB {
    private static final String EVENT_TYPE = "myType";
    private static final int EVENT_TYPE_ID = 123;
    private static final String MIN_HEAPUSED_KIND = "min_heapUsed";
    private static final String MAX_HEAPUSED_KIND = "max_heapUsed";
    private TimelineDao timelineDao;
    private TimelineAggregator aggregator;
    private static final UUID HOST_UUID = UUID.randomUUID();
    private static final String HOST_NAME = HOST_UUID.toString();
    private static final DateTime START_TIME = new DateTime(DateTimeZone.UTC);
    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
    private final NonEntityDao nonEntityDao = (NonEntityDao) Mockito.mock(NonEntityDao.class);
    private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(new ClockMock(), this.nonEntityDao, new CacheControllerDispatcher());
    private Integer hostId = null;
    private Integer minHeapUsedKindId = null;
    private Integer maxHeapUsedKindId = null;

    @BeforeMethod(groups = {"mysql"})
    public void setUp() throws Exception {
        this.timelineDao = new DefaultTimelineDao(getDBI());
        Properties properties = System.getProperties();
        properties.put("killbill.usage.timelines.chunksToAggregate", "2,2");
        this.aggregator = new TimelineAggregator(getDBI(), this.timelineDao, timelineCoder, sampleCoder, (MeterConfig) new ConfigurationObjectFactory(properties).build(MeterConfig.class), this.internalCallContextFactory);
    }

    @Test(groups = {"mysql"})
    public void testAggregation() throws Exception {
        this.hostId = Integer.valueOf(this.timelineDao.getOrAddSource(HOST_NAME, this.internalCallContext));
        Assert.assertNotNull(this.hostId);
        Assert.assertEquals(this.timelineDao.getSources(this.internalCallContext).values().size(), 1);
        this.minHeapUsedKindId = Integer.valueOf(this.timelineDao.getOrAddMetric(Integer.valueOf(EVENT_TYPE_ID), MIN_HEAPUSED_KIND, this.internalCallContext));
        Assert.assertNotNull(this.minHeapUsedKindId);
        this.maxHeapUsedKindId = Integer.valueOf(this.timelineDao.getOrAddMetric(Integer.valueOf(EVENT_TYPE_ID), MAX_HEAPUSED_KIND, this.internalCallContext));
        Assert.assertNotNull(this.maxHeapUsedKindId);
        Assert.assertEquals(this.timelineDao.getMetrics(this.internalCallContext).values().size(), 2);
        createAOneHourTimelineChunk(125);
        createAOneHourTimelineChunk(60);
        checkSamplesForATimeline(185, 126, 0L);
        checkSamplesForATimeline(185, 125, 2L);
        checkSamplesForATimeline(64, 61, 0L);
        checkSamplesForATimeline(125, 65, 2L);
        checkSamplesForATimeline(60, 0, 2L);
        checkSamplesForATimeline(125, 0, 4L);
        checkSamplesForATimeline(124, 0, 4L);
        checkSamplesForATimeline(124, 66, 2L);
        this.aggregator.getAndProcessTimelineAggregationCandidates();
        Assert.assertEquals(this.timelineDao.getSources(this.internalCallContext).values().size(), 1);
        Assert.assertEquals(this.timelineDao.getMetrics(this.internalCallContext).values().size(), 2);
        checkSamplesForATimeline(185, 126, 0L);
        checkSamplesForATimeline(185, 125, 2L);
        checkSamplesForATimeline(64, 61, 2L);
        checkSamplesForATimeline(125, 65, 2L);
        checkSamplesForATimeline(60, 0, 2L);
        checkSamplesForATimeline(125, 0, 2L);
        checkSamplesForATimeline(124, 0, 2L);
        checkSamplesForATimeline(124, 66, 2L);
    }

    private void checkSamplesForATimeline(Integer num, Integer num2, long j) throws InterruptedException {
        final AtomicLong atomicLong = new AtomicLong(0L);
        this.timelineDao.getSamplesBySourceIdsAndMetricIds(ImmutableList.of(this.hostId), ImmutableList.of(this.minHeapUsedKindId, this.maxHeapUsedKindId), START_TIME.minusMinutes(num.intValue()), START_TIME.minusMinutes(num2.intValue()), new TimelineChunkConsumer() { // from class: com.ning.billing.meter.timeline.aggregator.TestTimelineAggregator.1
            public void processTimelineChunk(TimelineChunk timelineChunk) {
                Assert.assertEquals(Integer.valueOf(timelineChunk.getSourceId()), TestTimelineAggregator.this.hostId);
                Assert.assertTrue(timelineChunk.getMetricId() == TestTimelineAggregator.this.minHeapUsedKindId.intValue() || timelineChunk.getMetricId() == TestTimelineAggregator.this.maxHeapUsedKindId.intValue());
                atomicLong.incrementAndGet();
            }
        }, this.internalCallContext);
        Assert.assertEquals(atomicLong.get(), j);
    }

    private void createAOneHourTimelineChunk(int i) throws IOException {
        DateTime minusMinutes = START_TIME.minusMinutes(i);
        TimelineSourceEventAccumulator timelineSourceEventAccumulator = new TimelineSourceEventAccumulator(this.timelineDao, timelineCoder, sampleCoder, this.hostId, EVENT_TYPE_ID, minusMinutes, this.internalCallContextFactory);
        for (int i2 = 0; i2 < 120; i2++) {
            DateTime plusSeconds = minusMinutes.plusSeconds(i2 * 30);
            timelineSourceEventAccumulator.addSourceSamples(new SourceSamplesForTimestamp(this.hostId, EVENT_TYPE, plusSeconds, createEvent(plusSeconds.getMillis())));
        }
        timelineSourceEventAccumulator.extractAndQueueTimelineChunks();
    }

    private Map<Integer, ScalarSample> createEvent(long j) {
        return ImmutableMap.of(this.minHeapUsedKindId, new ScalarSample(SampleOpcode.LONG, Long.valueOf(Long.MIN_VALUE + j)), this.maxHeapUsedKindId, new ScalarSample(SampleOpcode.LONG, Long.valueOf(Long.MAX_VALUE - j)));
    }
}
