package org.apache.kylin.metrics.lib.impl.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kylin.metrics.lib.ActiveReservoir;
import org.apache.kylin.metrics.lib.impl.InstantReservoir;
import org.apache.kylin.metrics.lib.impl.RecordEvent;
import org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.rule.PowerMockRule;

@PrepareForTest({KafkaReservoirReporter.KafkaReservoirListener.class})
/* loaded from: input_file:org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.class */
public class KafkaReservoirReporterTest {

    @Rule
    public PowerMockRule rule = new PowerMockRule();
    private KafkaReservoirReporter kafkaReporter;
    private ActiveReservoir reservoir;

    @Before
    public void setUp() throws Exception {
        System.setProperty("KYLIN_CONF", "../examples/test_case_data/localmeta");
        PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn((KafkaProducer) PowerMockito.mock(KafkaProducer.class));
        this.reservoir = new InstantReservoir();
        this.reservoir.start();
        this.kafkaReporter = KafkaReservoirReporter.forRegistry(this.reservoir).build();
    }

    @After
    public void after() throws Exception {
        System.clearProperty("KYLIN_CONF");
    }

    @Test
    public void testUpdate() {
        RecordEvent recordEvent = new RecordEvent("TEST");
        this.reservoir.update(recordEvent);
        Assert.assertEquals(0L, this.kafkaReporter.getListener().getNRecord());
        this.kafkaReporter.start();
        this.reservoir.update(recordEvent);
        this.reservoir.update(recordEvent);
        Assert.assertEquals(2L, this.kafkaReporter.getListener().getNRecord());
        this.kafkaReporter.stop();
        this.reservoir.update(recordEvent);
        Assert.assertEquals(2L, this.kafkaReporter.getListener().getNRecord());
        Assert.assertEquals(0L, this.kafkaReporter.getListener().getNRecordSkip());
    }
}
