package org.apache.gobblin.source.extractor.extract.kafka;

import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogReader;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.EnvelopeSchemaConverterTest;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaExtractorStatsTracker;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.class */
public class KafkaExtractorStatsTrackerTest {
    List<KafkaPartition> kafkaPartitions = new ArrayList();
    private KafkaExtractorStatsTracker extractorStatsTracker;
    private WorkUnitState workUnitState;
    static final KafkaPartition PARTITION0 = new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build();
    static final KafkaPartition PARTITION1 = new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build();
    private long epochDurationMs;

    @BeforeClass
    public void setUp() {
        this.kafkaPartitions.add(PARTITION0);
        this.kafkaPartitions.add(PARTITION1);
        this.workUnitState = new WorkUnitState();
        this.workUnitState.setProp("gobblin.kafka.recordLevelSlaMinutes", 10L);
        this.workUnitState.setProp("gobblin.kafka.observedLatencyMeasurementEnabled", true);
        this.extractorStatsTracker = new KafkaExtractorStatsTracker(this.workUnitState, this.kafkaPartitions);
    }

    @Test
    public void testOnUndecodeableRecord() {
        Assert.assertEquals(this.extractorStatsTracker.getErrorPartitionCount(), 0);
        Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), -1L);
        Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), -1L);
        this.extractorStatsTracker.onUndecodeableRecord(0);
        Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), 1L);
        Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(1).longValue(), -1L);
        Assert.assertEquals(this.extractorStatsTracker.getErrorPartitionCount(), 1);
        this.extractorStatsTracker.onUndecodeableRecord(0);
        Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), 2L);
        Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(1).longValue(), -1L);
        Assert.assertEquals(this.extractorStatsTracker.getErrorPartitionCount(), 1);
    }

    @Test
    public void testOnNullRecord() {
        Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), -1L);
        Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), -1L);
        this.extractorStatsTracker.onNullRecord(0);
        Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), 1L);
        Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(1).longValue(), -1L);
        this.extractorStatsTracker.onNullRecord(0);
        Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), 2L);
        Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(1).longValue(), -1L);
    }

    @Test
    public void testResetStartFetchEpochTime() {
        long currentTimeMillis = System.currentTimeMillis();
        this.extractorStatsTracker.resetStartFetchEpochTime(1);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getStartFetchEpochTime() >= currentTimeMillis);
    }

    @Test
    public void testOnDecodeableRecord() throws InterruptedException {
        this.extractorStatsTracker.reset();
        long nanoTime = System.nanoTime();
        Thread.sleep(1L);
        long nanoTime2 = System.nanoTime();
        long currentTimeMillis = System.currentTimeMillis();
        long millis = currentTimeMillis - TimeUnit.MINUTES.toMillis(15L);
        long millis2 = currentTimeMillis - TimeUnit.MINUTES.toMillis(16L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getProcessedRecordCount(), 0L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getPartitionTotalSize(), 0L);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getDecodeRecordTime() == 0);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getReadRecordTime() == 0);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getSlaMissedRecordCount(), -1L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getMinLogAppendTime(), -1L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getMaxLogAppendTime(), -1L);
        Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 0L);
        this.extractorStatsTracker.onDecodeableRecord(0, nanoTime, nanoTime2, 100L, millis, millis2);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getProcessedRecordCount(), 1L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getPartitionTotalSize(), 100L);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getDecodeRecordTime() > 0);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getReadRecordTime() > 0);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getSlaMissedRecordCount(), 1L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getMinLogAppendTime(), millis);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getMaxLogAppendTime(), millis);
        Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 1L);
        long nanoTime3 = System.nanoTime();
        Thread.sleep(1L);
        long nanoTime4 = System.nanoTime();
        long currentTimeMillis2 = System.currentTimeMillis();
        long j = currentTimeMillis2 - 10;
        long j2 = currentTimeMillis2 - 20;
        long decodeRecordTime = ((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getDecodeRecordTime();
        long readRecordTime = ((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getReadRecordTime();
        this.extractorStatsTracker.onDecodeableRecord(0, nanoTime3, nanoTime4, 100L, j, j2);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getProcessedRecordCount(), 2L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getPartitionTotalSize(), 200L);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getDecodeRecordTime() > decodeRecordTime);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getReadRecordTime() > readRecordTime);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getSlaMissedRecordCount(), 1L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getMinLogAppendTime(), millis);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getMaxLogAppendTime(), j);
        Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 2L);
    }

    @Test
    public void testOnFetchNextMessageBuffer() throws InterruptedException {
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getFetchMessageBufferTime(), 0L);
        long nanoTime = System.nanoTime();
        Thread.sleep(1L);
        this.extractorStatsTracker.onFetchNextMessageBuffer(1, nanoTime);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getFetchMessageBufferTime() > 0);
    }

    @Test
    public void testOnPartitionReadComplete() throws InterruptedException {
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getReadRecordTime(), 0L);
        long nanoTime = System.nanoTime();
        Thread.sleep(1L);
        this.extractorStatsTracker.onPartitionReadComplete(1, nanoTime);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getReadRecordTime() > 0);
    }

    @Test(dependsOnMethods = {"testOnDecodeableRecord"})
    public void testUpdateStatisticsForCurrentPartition() throws InterruptedException {
        long nanoTime = System.nanoTime();
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getStopFetchEpochTime(), 0L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getElapsedTime(), 0L);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getAvgMillisPerRecord() < 0.0d);
        this.extractorStatsTracker.updateStatisticsForCurrentPartition(0, nanoTime, 0L);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getStopFetchEpochTime() > 0);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getElapsedTime() > 0);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getAvgMillisPerRecord() > 0.0d);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getAvgRecordSize(), 100L);
        long nanoTime2 = System.nanoTime();
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - 10;
        Thread.sleep(1L);
        this.extractorStatsTracker.onDecodeableRecord(1, nanoTime2, System.nanoTime(), 100L, j, currentTimeMillis - 20);
        this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, nanoTime2, 0L);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getElapsedTime() > 0);
        Assert.assertTrue(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getAvgMillisPerRecord() > 0.0d);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getAvgRecordSize(), 100L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getSlaMissedRecordCount(), 0L);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getMinLogAppendTime(), j);
        Assert.assertEquals(((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getMaxLogAppendTime(), j);
        Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 3L);
        long startFetchEpochTime = ((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getStartFetchEpochTime();
        long stopFetchEpochTime = ((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getStopFetchEpochTime();
        this.epochDurationMs = stopFetchEpochTime - startFetchEpochTime;
        long minLogAppendTime = ((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0))).getMinLogAppendTime();
        long maxLogAppendTime = ((KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(1))).getMaxLogAppendTime();
        Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMinStartFetchEpochTime(), startFetchEpochTime);
        Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMaxStopFetchEpochTime(), stopFetchEpochTime);
        Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMinLogAppendTime(), minLogAppendTime);
        Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMaxLogAppendTime(), maxLogAppendTime);
        Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getNumBytesConsumed(), 300L);
        Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getProcessedRecordCount(), 3L);
        Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getSlaMissedRecordCount(), 1L);
    }

    @Test(dependsOnMethods = {"testUpdateStatisticsForCurrentPartition"})
    public void testGetAvgRecordSize() {
        Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 100L);
        Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 100L);
        this.extractorStatsTracker.reset();
        Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 0L);
        long nanoTime = System.nanoTime();
        long j = nanoTime + 1;
        long currentTimeMillis = System.currentTimeMillis();
        this.extractorStatsTracker.onDecodeableRecord(1, nanoTime, j, 150L, currentTimeMillis - 10, currentTimeMillis - 20);
        Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150L);
    }

    @Test(dependsOnMethods = {"testGetAvgRecordSize"})
    public void testGetMaxLatency() {
        Assert.assertTrue(this.extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES) >= 15);
    }

    @Test(dependsOnMethods = {"testGetMaxLatency"})
    public void testGetConsumptionRateMBps() {
        Assert.assertEquals(new Double(Math.ceil(((this.extractorStatsTracker.getConsumptionRateMBps() * this.epochDurationMs) * 1024.0d) * 1024.0d) / 1000.0d).longValue(), 300L);
    }

    @Test(dependsOnMethods = {"testGetConsumptionRateMBps"})
    public void testGetMaxLatencyNoRecordsInEpoch() {
        this.extractorStatsTracker.reset();
        Long valueOf = Long.valueOf(System.nanoTime());
        this.extractorStatsTracker.updateStatisticsForCurrentPartition(0, valueOf.longValue(), 0L);
        this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, valueOf.longValue(), 0L);
        this.extractorStatsTracker.reset();
        Assert.assertEquals(this.extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES), 0L);
    }

    @Test
    public void testGenerateTagsForPartitions() throws Exception {
        MultiLongWatermark multiLongWatermark = new MultiLongWatermark(Arrays.asList(new Long(10L), new Long(20L)));
        MultiLongWatermark multiLongWatermark2 = new MultiLongWatermark(Arrays.asList(new Long(20L), new Long(30L)));
        MultiLongWatermark multiLongWatermark3 = new MultiLongWatermark(Arrays.asList(new Long(15L), new Long(25L)));
        ImmutableMap of = ImmutableMap.of(PARTITION0, ImmutableMap.of(EnvelopeSchemaConverterTest.SCHEMA_KEY, "testValue"));
        this.workUnitState.removeProp(KafkaUtils.getPartitionPropName("startFetchEpochTime", 0));
        this.workUnitState.removeProp(KafkaUtils.getPartitionPropName("stopFetchEpochTime", 0));
        KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, PARTITION0, 0.0d);
        KafkaExtractorStatsTracker.ExtractorStats extractorStats = (KafkaExtractorStatsTracker.ExtractorStats) this.extractorStatsTracker.getStatsMap().get(this.kafkaPartitions.get(0));
        extractorStats.setStartFetchEpochTime(1000L);
        extractorStats.setStopFetchEpochTime(10000L);
        extractorStats.setAvgMillisPerRecord(10.1d);
        Map generateTagsForPartitions = this.extractorStatsTracker.generateTagsForPartitions(multiLongWatermark, multiLongWatermark2, multiLongWatermark3, of);
        Assert.assertEquals(this.workUnitState.getPropAsLong(KafkaUtils.getPartitionPropName("startFetchEpochTime", 0)), extractorStats.getStartFetchEpochTime());
        Assert.assertEquals(this.workUnitState.getPropAsLong(KafkaUtils.getPartitionPropName("stopFetchEpochTime", 0)), extractorStats.getStopFetchEpochTime());
        Assert.assertEquals(Double.valueOf(KafkaUtils.getPartitionAvgRecordMillis(this.workUnitState, PARTITION0)), Double.valueOf(extractorStats.getAvgMillisPerRecord()));
        extractorStats.setStartFetchEpochTime(0L);
        extractorStats.setStopFetchEpochTime(0L);
        extractorStats.setAvgMillisPerRecord(-1.0d);
        Assert.assertTrue(((Map) generateTagsForPartitions.get(PARTITION0)).containsKey(EnvelopeSchemaConverterTest.SCHEMA_KEY));
        Assert.assertEquals((String) ((Map) generateTagsForPartitions.get(PARTITION0)).get(EnvelopeSchemaConverterTest.SCHEMA_KEY), "testValue");
        Assert.assertFalse(((Map) generateTagsForPartitions.get(PARTITION1)).containsKey(EnvelopeSchemaConverterTest.SCHEMA_KEY));
    }

    @Test
    public void testConvertHistogramToString() {
        Histogram histogram = new Histogram(1L, 100L, 3);
        histogram.recordValue(3L);
        histogram.recordValue(25L);
        histogram.recordValue(25L);
        histogram.recordValue(92L);
        Histogram nextIntervalHistogram = new HistogramLogReader(new ByteArrayInputStream(KafkaExtractorStatsTracker.convertHistogramToString(histogram).getBytes(Charsets.UTF_8))).nextIntervalHistogram();
        Assert.assertEquals(nextIntervalHistogram.getTotalCount(), 4L);
        Assert.assertEquals(nextIntervalHistogram.getMaxValue(), 92L);
        Assert.assertEquals(nextIntervalHistogram.getCountAtValue(25L), 2L);
        Assert.assertEquals(nextIntervalHistogram.getCountAtValue(3L), 1L);
        Assert.assertEquals(nextIntervalHistogram.getCountAtValue(92L), 1L);
    }
}
