package org.apache.gobblin.source.extractor.extract.kafka;

import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
import org.apache.gobblin.writer.LastWatermarkTracker;
import org.apache.gobblin.writer.WatermarkTracker;
import org.joda.time.LocalDate;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.class */
public class KafkaProduceRateTrackerTest {
    private static final LocalDate HOLIDAY_DATE = new LocalDate(2019, 12, 25);
    private static final LocalDate NON_HOLIDAY_DATE = new LocalDate(2020, 1, 5);
    private static final Long HOLIDAY_TIME = Long.valueOf(HOLIDAY_DATE.toDateTimeAtStartOfDay().toInstant().getMillis());
    private static final Long NON_HOLIDAY_TIME = Long.valueOf(NON_HOLIDAY_DATE.toDateTimeAtStartOfDay().toInstant().getMillis());
    private KafkaProduceRateTracker tracker;
    private List<KafkaPartition> kafkaPartitions = new ArrayList();
    private WatermarkTracker watermarkTracker;
    private WorkUnitState workUnitState;
    private KafkaExtractorStatsTracker extractorStatsTracker;

    @BeforeClass
    public void setUp() {
        this.kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build());
        this.kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build());
        this.workUnitState = new WorkUnitState();
        this.workUnitState.setProp("gobblin.kafka.recordLevelSlaMinutes", 5L);
        this.watermarkTracker = new LastWatermarkTracker(false);
        this.extractorStatsTracker = new KafkaExtractorStatsTracker(this.workUnitState, this.kafkaPartitions);
    }

    private void assertTopicPartitionOrder(KafkaProduceRateTracker kafkaProduceRateTracker, List<KafkaPartition> list) {
        Iterator it = kafkaProduceRateTracker.getPartitionsToProdRate().keySet().iterator();
        Iterator<KafkaPartition> it2 = list.iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(it2.next(), it.next());
        }
    }

    private void writeProduceRateToKafkaWatermarksHelper(long j, long j2, long j3) {
        this.extractorStatsTracker.reset();
        assertTopicPartitionOrder(this.tracker, this.kafkaPartitions);
        this.extractorStatsTracker.onDecodeableRecord(0, j, j2, 100L, j3 - 8000, j3 - 10000);
        long j4 = j + 1;
        this.extractorStatsTracker.onDecodeableRecord(1, j4, j2 + 1, 200L, j3 - 7000, j3 - 9000);
        this.extractorStatsTracker.updateStatisticsForCurrentPartition(0, j4, j3 - 8000);
        this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, j4, j3 - 7000);
        MultiLongWatermark multiLongWatermark = new MultiLongWatermark(Lists.newArrayList(new Long[]{20L, 30L}));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(this.kafkaPartitions.get(0), 35L);
        newHashMap.put(this.kafkaPartitions.get(1), 47L);
        HashMap newHashMap2 = Maps.newHashMap();
        KafkaPartition kafkaPartition = this.kafkaPartitions.get(0);
        KafkaPartition kafkaPartition2 = this.kafkaPartitions.get(1);
        newHashMap2.put(kafkaPartition.toString(), new KafkaStreamingExtractor.KafkaWatermark(kafkaPartition, new LongWatermark(5L)));
        newHashMap2.put(kafkaPartition2.toString(), new KafkaStreamingExtractor.KafkaWatermark(kafkaPartition2, new LongWatermark(7L)));
        this.tracker.writeProduceRateToKafkaWatermarks(newHashMap, newHashMap2, multiLongWatermark, j3);
    }

    private void assertKafkaWatermarks(long j) {
        Map allUnacknowledgedWatermarks = this.watermarkTracker.getAllUnacknowledgedWatermarks();
        Assert.assertEquals(allUnacknowledgedWatermarks.size(), 2);
        Iterator it = Lists.newArrayList(new KafkaPartition[]{this.kafkaPartitions.get(0), this.kafkaPartitions.get(1)}).iterator();
        while (it.hasNext()) {
            KafkaStreamingExtractor.KafkaWatermark kafkaWatermark = (KafkaStreamingExtractor.KafkaWatermark) allUnacknowledgedWatermarks.get(((KafkaPartition) it.next()).toString());
            if (j == HOLIDAY_TIME.longValue() + 10) {
                Assert.assertTrue(kafkaWatermark.avgProduceRates == null);
            } else {
                Assert.assertTrue(kafkaWatermark.avgProduceRates != null);
                Date date = new Date(j);
                int hourOfDay = KafkaProduceRateTracker.getHourOfDay(date);
                int dayOfWeek = KafkaProduceRateTracker.getDayOfWeek(date);
                Assert.assertTrue(kafkaWatermark.avgProduceRates[dayOfWeek][hourOfDay] > 0.0d);
                for (int i = 0; i < 7; i++) {
                    for (int i2 = 0; i2 < 24; i2++) {
                        if (i != dayOfWeek || i2 != hourOfDay) {
                            Assert.assertTrue(kafkaWatermark.avgProduceRates[i][i2] < 0.0d);
                        }
                    }
                }
                Assert.assertTrue(kafkaWatermark.getAvgConsumeRate() > 0.0d);
            }
        }
    }

    @Test
    public void testWriteProduceRateToKafkaWatermarksNoData() {
        long currentTimeMillis = System.currentTimeMillis();
        WorkUnitState workUnitState = new WorkUnitState();
        workUnitState.setProp("gobblin.kafka.produceRateTracker.disableStatsOnHolidays", false);
        workUnitState.setProp("stream.flush.interval.secs", 1L);
        workUnitState.setProp("gobblin.kafka.recordLevelSlaMinutes", 5L);
        LastWatermarkTracker lastWatermarkTracker = new LastWatermarkTracker(false);
        KafkaExtractorStatsTracker kafkaExtractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, this.kafkaPartitions);
        KafkaProduceRateTracker kafkaProduceRateTracker = new KafkaProduceRateTracker(workUnitState, this.kafkaPartitions, lastWatermarkTracker, kafkaExtractorStatsTracker, Long.valueOf(currentTimeMillis));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(this.kafkaPartitions.get(0), 20L);
        newHashMap.put(this.kafkaPartitions.get(1), 30L);
        HashMap newHashMap2 = Maps.newHashMap();
        MultiLongWatermark multiLongWatermark = new MultiLongWatermark(Lists.newArrayList(new Long[]{20L, 30L}));
        kafkaExtractorStatsTracker.reset();
        kafkaProduceRateTracker.writeProduceRateToKafkaWatermarks(newHashMap, newHashMap2, multiLongWatermark, currentTimeMillis);
        Map allUnacknowledgedWatermarks = lastWatermarkTracker.getAllUnacknowledgedWatermarks();
        Iterator<KafkaPartition> it = this.kafkaPartitions.iterator();
        while (it.hasNext()) {
            KafkaStreamingExtractor.KafkaWatermark kafkaWatermark = (KafkaStreamingExtractor.KafkaWatermark) allUnacknowledgedWatermarks.get(it.next().toString());
            Assert.assertTrue(kafkaWatermark.avgProduceRates != null);
            Assert.assertTrue(kafkaWatermark.avgConsumeRate < 0.0d);
            Assert.assertTrue(kafkaWatermark.getLwm().getValue() > 0);
        }
    }

    @Test(dependsOnMethods = {"testWriteProduceRateToKafkaWatermarksNoData"})
    public void testWriteProduceRateToKafkaWatermarks() {
        long nanoTime = System.nanoTime();
        long j = nanoTime + 1;
        long currentTimeMillis = System.currentTimeMillis();
        WorkUnitState workUnitState = new WorkUnitState();
        workUnitState.setProp("gobblin.kafka.produceRateTracker.disableStatsOnHolidays", false);
        workUnitState.setProp("stream.flush.interval.secs", 1L);
        this.tracker = new KafkaProduceRateTracker(workUnitState, this.kafkaPartitions, this.watermarkTracker, this.extractorStatsTracker, Long.valueOf(currentTimeMillis));
        writeProduceRateToKafkaWatermarksHelper(nanoTime, j, currentTimeMillis);
        for (int i = 1; i < 4; i++) {
            writeProduceRateToKafkaWatermarksHelper(nanoTime + 1000 + i, j + 1000 + i, currentTimeMillis + i);
        }
        assertKafkaWatermarks(currentTimeMillis + 3);
    }

    @Test(dependsOnMethods = {"testWriteProduceRateToKafkaWatermarks"})
    public void testWriteProduceRateToKafkaWatermarksWithHolidays() {
        long nanos = TimeUnit.MILLISECONDS.toNanos(HOLIDAY_TIME.longValue());
        long j = nanos + 1;
        Long valueOf = Long.valueOf(HOLIDAY_TIME.longValue() + 10);
        WorkUnitState workUnitState = new WorkUnitState();
        workUnitState.setProp("gobblin.kafka.produceRateTracker.disableStatsOnHolidays", true);
        workUnitState.setProp("stream.flush.interval.secs", 1L);
        this.tracker = new KafkaProduceRateTracker(workUnitState, this.kafkaPartitions, this.watermarkTracker, this.extractorStatsTracker, valueOf);
        writeProduceRateToKafkaWatermarksHelper(nanos, j, valueOf.longValue());
        writeProduceRateToKafkaWatermarksHelper(nanos + 1000, j + 1000, valueOf.longValue() + 1);
        assertKafkaWatermarks(valueOf.longValue());
        long nanos2 = TimeUnit.MILLISECONDS.toNanos(NON_HOLIDAY_TIME.longValue());
        long j2 = nanos2 + 1;
        Long valueOf2 = Long.valueOf(NON_HOLIDAY_TIME.longValue() + 10);
        writeProduceRateToKafkaWatermarksHelper(nanos2, j2, valueOf2.longValue());
        for (int i = 1; i < 4; i++) {
            writeProduceRateToKafkaWatermarksHelper(nanos2 + 1000 + i, j2 + 1000 + i, valueOf2.longValue() + i);
        }
        assertKafkaWatermarks(valueOf2.longValue() + 3);
    }

    @Test
    public void testIsHoliday() {
        WorkUnitState workUnitState = new WorkUnitState();
        workUnitState.setProp("gobblin.kafka.produceRateTracker.disableStatsOnHolidays", true);
        KafkaProduceRateTracker kafkaProduceRateTracker = new KafkaProduceRateTracker(workUnitState, this.kafkaPartitions, this.watermarkTracker, new KafkaExtractorStatsTracker(this.workUnitState, this.kafkaPartitions));
        Assert.assertTrue(kafkaProduceRateTracker.isHoliday(HOLIDAY_DATE));
        Assert.assertTrue(kafkaProduceRateTracker.isHoliday(HOLIDAY_DATE));
        Assert.assertFalse(kafkaProduceRateTracker.isHoliday(NON_HOLIDAY_DATE));
    }

    @Test
    public void testGetPenultimateElement() {
        EvictingQueue create = EvictingQueue.create(3);
        create.add(Double.valueOf(1.0d));
        create.add(Double.valueOf(2.0d));
        create.add(Double.valueOf(3.0d));
        Assert.assertEquals(KafkaProduceRateTracker.getPenultimateElement(create), Double.valueOf(2.0d));
        create.add(Double.valueOf(4.0d));
        Assert.assertEquals(KafkaProduceRateTracker.getPenultimateElement(create), Double.valueOf(3.0d));
        create.add(Double.valueOf(5.0d));
        Assert.assertEquals(KafkaProduceRateTracker.getPenultimateElement(create), Double.valueOf(4.0d));
    }
}
