package org.apache.gobblin.source.extractor.extract.kafka;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
import org.apache.gobblin.util.eventbus.EventBusFactory;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.class */
public class KafkaIngestionHealthCheckTest {
    private EventBus eventBus;
    private CountDownLatch countDownLatch;

    @BeforeClass
    public void setUp() throws IOException {
        this.eventBus = EventBusFactory.get("ContainerHealthCheckEventBus", SharedResourcesBrokerFactory.getImplicitBroker());
        this.eventBus.register(this);
    }

    @Subscribe
    @Test(enabled = false)
    public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent containerHealthCheckFailureEvent) {
        this.countDownLatch.countDown();
    }

    @Test
    public void testExecuteIncreasingLatencyCheckEnabled() throws InterruptedException {
        this.countDownLatch = new CountDownLatch(1);
        Config withValue = ConfigFactory.empty().withValue("gobblin.kafka.streaming.containerCapacity", ConfigValueFactory.fromAnyRef(5)).withValue("gobblin.kafka.healthCheck.ingestionLatency.minutes", ConfigValueFactory.fromAnyRef(5));
        KafkaExtractorStatsTracker kafkaExtractorStatsTracker = (KafkaExtractorStatsTracker) Mockito.mock(KafkaExtractorStatsTracker.class);
        Mockito.when(Long.valueOf(kafkaExtractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES))).thenReturn(6L).thenReturn(7L).thenReturn(7L).thenReturn(5L);
        Mockito.when(Double.valueOf(kafkaExtractorStatsTracker.getConsumptionRateMBps())).thenReturn(Double.valueOf(2.0d)).thenReturn(Double.valueOf(1.5d)).thenReturn(Double.valueOf(2.1d)).thenReturn(Double.valueOf(2.5d));
        KafkaIngestionHealthCheck kafkaIngestionHealthCheck = new KafkaIngestionHealthCheck(withValue, kafkaExtractorStatsTracker);
        kafkaIngestionHealthCheck.execute();
        this.countDownLatch.await(10L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(this.countDownLatch.getCount(), 1L);
        kafkaIngestionHealthCheck.execute();
        this.countDownLatch.await(10L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(this.countDownLatch.getCount(), 1L);
        kafkaIngestionHealthCheck.execute();
        this.countDownLatch.await(10L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(this.countDownLatch.getCount(), 0L);
        this.countDownLatch = new CountDownLatch(1);
        kafkaIngestionHealthCheck.execute();
        this.countDownLatch.await(10L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(this.countDownLatch.getCount(), 1L);
        Config withValue2 = withValue.withValue("gobblin.kafka.healthCheck.increasingLatencyCheckEnabled", ConfigValueFactory.fromAnyRef(false));
        KafkaExtractorStatsTracker kafkaExtractorStatsTracker2 = (KafkaExtractorStatsTracker) Mockito.mock(KafkaExtractorStatsTracker.class);
        Mockito.when(Long.valueOf(kafkaExtractorStatsTracker2.getMaxIngestionLatency(TimeUnit.MINUTES))).thenReturn(10L).thenReturn(7L).thenReturn(5L);
        Mockito.when(Double.valueOf(kafkaExtractorStatsTracker2.getConsumptionRateMBps())).thenReturn(Double.valueOf(2.0d)).thenReturn(Double.valueOf(1.5d)).thenReturn(Double.valueOf(2.1d));
        new KafkaIngestionHealthCheck(withValue2, kafkaExtractorStatsTracker2).execute();
    }

    @Test
    public void testExecuteIncreasingLatencyCheckDisabled() throws InterruptedException {
        this.countDownLatch = new CountDownLatch(1);
        Config withValue = ConfigFactory.empty().withValue("gobblin.kafka.streaming.containerCapacity", ConfigValueFactory.fromAnyRef(5)).withValue("gobblin.kafka.healthCheck.ingestionLatency.minutes", ConfigValueFactory.fromAnyRef(5)).withValue("gobblin.kafka.healthCheck.increasingLatencyCheckEnabled", ConfigValueFactory.fromAnyRef(false));
        KafkaExtractorStatsTracker kafkaExtractorStatsTracker = (KafkaExtractorStatsTracker) Mockito.mock(KafkaExtractorStatsTracker.class);
        Mockito.when(Long.valueOf(kafkaExtractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES))).thenReturn(10L).thenReturn(7L).thenReturn(6L).thenReturn(4L);
        Mockito.when(Double.valueOf(kafkaExtractorStatsTracker.getConsumptionRateMBps())).thenReturn(Double.valueOf(2.0d)).thenReturn(Double.valueOf(1.5d)).thenReturn(Double.valueOf(2.1d)).thenReturn(Double.valueOf(2.5d));
        KafkaIngestionHealthCheck kafkaIngestionHealthCheck = new KafkaIngestionHealthCheck(withValue, kafkaExtractorStatsTracker);
        kafkaIngestionHealthCheck.execute();
        this.countDownLatch.await(10L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(this.countDownLatch.getCount(), 1L);
        kafkaIngestionHealthCheck.execute();
        this.countDownLatch.await(10L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(this.countDownLatch.getCount(), 1L);
        kafkaIngestionHealthCheck.execute();
        this.countDownLatch.await(10L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(this.countDownLatch.getCount(), 0L);
        this.countDownLatch = new CountDownLatch(1);
        kafkaIngestionHealthCheck.execute();
        this.countDownLatch.await(10L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(this.countDownLatch.getCount(), 1L);
    }
}
