package org.apache.gobblin.source.extractor.extract.kafka;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamTestUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.class */
public class KafkaStreamingExtractorTest {
    private KafkaStreamingExtractor streamingExtractor;
    private final int numPartitions = 3;

    /* loaded from: input_file:org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest$TestDataPublisher.class */
    static class TestDataPublisher extends DataPublisher {
        public TestDataPublisher(WorkUnitState workUnitState) {
            super(workUnitState);
        }

        public void initialize() {
        }

        public void publishData(Collection<? extends WorkUnitState> collection) {
        }

        public void publishMetadata(Collection<? extends WorkUnitState> collection) {
        }

        public void close() {
        }
    }

    @BeforeClass
    public void setUp() {
        WorkUnitState workUnitState = KafkaExtractorUtils.getWorkUnitState("testTopic", 3);
        workUnitState.setProp("flush.data.publisher.class", TestDataPublisher.class.getName());
        this.streamingExtractor = new KafkaStreamingExtractor(workUnitState);
    }

    @Test
    public void testResetExtractorStats() throws IOException, DataRecordException {
        MultiLongWatermark multiLongWatermark = new MultiLongWatermark(this.streamingExtractor.highWatermark);
        this.streamingExtractor.readStreamEntityImpl();
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 0L);
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
        this.streamingExtractor.readStreamEntityImpl();
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
        this.streamingExtractor.readStreamEntityImpl();
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 1L);
        this.streamingExtractor.onFlushAck();
        this.streamingExtractor.resetExtractorStatsAndWatermarks(false);
        testAfterReset(multiLongWatermark);
        MultiLongWatermark multiLongWatermark2 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
        this.streamingExtractor.readStreamEntityImpl();
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 2L);
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
        Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 1L);
        Assert.assertEquals(this.streamingExtractor.lowWatermark.get(0), 1L);
        Assert.assertEquals(this.streamingExtractor.lowWatermark.get(1), 1L);
        Assert.assertEquals(this.streamingExtractor.lowWatermark.get(2), 1L);
        this.streamingExtractor.onFlushAck();
        this.streamingExtractor.resetExtractorStatsAndWatermarks(false);
        testAfterReset(multiLongWatermark2);
    }

    private void testAfterReset(MultiLongWatermark multiLongWatermark) {
        for (int i = 0; i < 3; i++) {
            Assert.assertEquals(this.streamingExtractor.lowWatermark.get(i), this.streamingExtractor.nextWatermark.get(i));
            Assert.assertTrue(multiLongWatermark.get(i) <= this.streamingExtractor.highWatermark.get(i));
        }
    }

    @Test
    public void testGenerateAdditionalTagHelper() {
        Iterator it = this.streamingExtractor.getAdditionalTagsHelper().values().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((Map) it.next()).containsKey("produceRate"));
        }
    }

    @Test
    public void testReadRecordEnvelopeImpl() throws IOException {
        WorkUnitState workUnitState = KafkaExtractorUtils.getWorkUnitState("testTopic", 3);
        workUnitState.setProp("flush.data.publisher.class", TestDataPublisher.class.getName());
        workUnitState.setProp(KafkaStreamTestUtils.MockKafkaConsumerClient.CAN_RETURN_NULL_VALUED_RECORDS, "true");
        KafkaStreamingExtractor kafkaStreamingExtractor = new KafkaStreamingExtractor(workUnitState);
        for (int i = 0; i < 4; i++) {
            Assert.assertNotNull(Boolean.valueOf(((DecodeableKafkaRecord) kafkaStreamingExtractor.readRecordEnvelopeImpl().getRecord()).getValue() != null));
        }
    }
}
