package org.apache.storm.kafka.spout.internal;

import java.util.NoSuchElementException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/storm/kafka/spout/internal/OffsetManagerTest.class */
public class OffsetManagerTest {
    private static final String COMMIT_METADATA = "{\"topologyId\":\"tp1\",\"taskId\":3,\"threadName\":\"Thread-20\"}";

    @Rule
    public ExpectedException expect = ExpectedException.none();
    private final long initialFetchOffset = 0;
    private final TopicPartition testTp = new TopicPartition("testTopic", 0);
    private final OffsetManager manager = new OffsetManager(this.testTp, 0);

    @Test
    public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() {
        this.manager.addToEmitMsgs(0L);
        this.manager.addToEmitMsgs(1L);
        this.manager.addToEmitMsgs(2L);
        this.manager.addToEmitMsgs(5L);
        this.manager.addToEmitMsgs(6L);
        this.manager.addToAckMsgs(getMessageId(0L));
        this.manager.addToAckMsgs(getMessageId(1L));
        this.manager.addToAckMsgs(getMessageId(2L));
        this.manager.addToAckMsgs(getMessageId(6L));
        Assert.assertThat("The offset manager should not skip past offset 5 which is still pending", Long.valueOf(this.manager.findNextCommitOffset(COMMIT_METADATA).offset()), CoreMatchers.is(3L));
        this.manager.addToAckMsgs(getMessageId(5L));
        Assert.assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", this.manager.findNextCommitOffset(COMMIT_METADATA), CoreMatchers.is(new OffsetAndMetadata(7L, COMMIT_METADATA)));
    }

    @Test
    public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() {
        this.manager.addToEmitMsgs(5L);
        this.manager.addToEmitMsgs(6L);
        this.manager.addToAckMsgs(getMessageId(6L));
        Assert.assertThat("The offset manager should not skip past offset 5 which is still pending", this.manager.findNextCommitOffset(COMMIT_METADATA), CoreMatchers.is(CoreMatchers.nullValue()));
        this.manager.addToAckMsgs(getMessageId(5L));
        Assert.assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", this.manager.findNextCommitOffset(COMMIT_METADATA), CoreMatchers.is(new OffsetAndMetadata(7L, COMMIT_METADATA)));
    }

    @Test
    public void testFindNextCommittedOffsetWithNoAcks() {
        Assert.assertThat("There shouldn't be a next commit offset when nothing has been acked", this.manager.findNextCommitOffset(COMMIT_METADATA), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void testFindNextCommitOffsetWithOneAck() {
        emitAndAckMessage(getMessageId(0L));
        Assert.assertThat("The next commit offset should be one past the processed message offset", Long.valueOf(this.manager.findNextCommitOffset(COMMIT_METADATA).offset()), CoreMatchers.is(1L));
    }

    @Test
    public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() {
        emitAndAckMessage(getMessageId(1L));
        emitAndAckMessage(getMessageId(0L));
        Assert.assertThat("The next commit offset should be one past the processed message offset", Long.valueOf(this.manager.findNextCommitOffset(COMMIT_METADATA).offset()), CoreMatchers.is(2L));
    }

    @Test
    public void testFindNextCommitOffsetWithAckedOffsetGap() {
        emitAndAckMessage(getMessageId(2L));
        this.manager.addToEmitMsgs(1L);
        emitAndAckMessage(getMessageId(0L));
        Assert.assertThat("The next commit offset should cover the sequential acked offsets", Long.valueOf(this.manager.findNextCommitOffset(COMMIT_METADATA).offset()), CoreMatchers.is(1L));
    }

    @Test
    public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() {
        emitAndAckMessage(getMessageId(2L));
        emitAndAckMessage(getMessageId(0L));
        Assert.assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist", Long.valueOf(this.manager.findNextCommitOffset(COMMIT_METADATA).offset()), CoreMatchers.is(3L));
    }

    @Test
    public void testFindNextCommitOffsetWithUnackedOffsetGap() {
        this.manager.addToEmitMsgs(1L);
        emitAndAckMessage(getMessageId(0L));
        Assert.assertThat("The next commit offset should cover the contiguously acked offsets", Long.valueOf(this.manager.findNextCommitOffset(COMMIT_METADATA).offset()), CoreMatchers.is(1L));
    }

    @Test
    public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() {
        OffsetManager offsetManager = new OffsetManager(this.testTp, 10L);
        emitAndAckMessage(getMessageId(0L));
        Assert.assertThat("Acking an offset earlier than the committed offset should have no effect", offsetManager.findNextCommitOffset(COMMIT_METADATA), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void testCommit() {
        emitAndAckMessage(getMessageId(0L));
        emitAndAckMessage(getMessageId(1L));
        emitAndAckMessage(getMessageId(2L));
        Assert.assertThat("Should have committed all messages to the left of the earliest uncommitted offset", Long.valueOf(this.manager.commit(new OffsetAndMetadata(2L))), CoreMatchers.is(2L));
        Assert.assertThat("The committed messages should not be in the acked list anymore", Boolean.valueOf(this.manager.contains(getMessageId(0L))), CoreMatchers.is(false));
        Assert.assertThat("The committed messages should not be in the emitted list anymore", Boolean.valueOf(this.manager.containsEmitted(0L)), CoreMatchers.is(false));
        Assert.assertThat("The committed messages should not be in the acked list anymore", Boolean.valueOf(this.manager.contains(getMessageId(1L))), CoreMatchers.is(false));
        Assert.assertThat("The committed messages should not be in the emitted list anymore", Boolean.valueOf(this.manager.containsEmitted(1L)), CoreMatchers.is(false));
        Assert.assertThat("The uncommitted message should still be in the acked list", Boolean.valueOf(this.manager.contains(getMessageId(2L))), CoreMatchers.is(true));
        Assert.assertThat("The uncommitted message should still be in the emitted list", Boolean.valueOf(this.manager.containsEmitted(2L)), CoreMatchers.is(true));
    }

    private KafkaSpoutMessageId getMessageId(long j) {
        return new KafkaSpoutMessageId(this.testTp, j);
    }

    private void emitAndAckMessage(KafkaSpoutMessageId kafkaSpoutMessageId) {
        this.manager.addToEmitMsgs(kafkaSpoutMessageId.offset());
        this.manager.addToAckMsgs(kafkaSpoutMessageId);
    }

    @Test
    public void testGetNthUncommittedOffsetAfterCommittedOffset() {
        this.manager.addToEmitMsgs(1L);
        this.manager.addToEmitMsgs(2L);
        this.manager.addToEmitMsgs(5L);
        this.manager.addToEmitMsgs(30L);
        Assert.assertThat("The third uncommitted offset should be 5", Long.valueOf(this.manager.getNthUncommittedOffsetAfterCommittedOffset(3)), CoreMatchers.is(5L));
        Assert.assertThat("The fourth uncommitted offset should be 30", Long.valueOf(this.manager.getNthUncommittedOffsetAfterCommittedOffset(4)), CoreMatchers.is(30L));
        this.expect.expect(NoSuchElementException.class);
        this.manager.getNthUncommittedOffsetAfterCommittedOffset(5);
    }

    @Test
    public void testCommittedFlagSetOnCommit() throws Exception {
        Assert.assertFalse(this.manager.hasCommitted());
        this.manager.commit((OffsetAndMetadata) Mockito.mock(OffsetAndMetadata.class));
        Assert.assertTrue(this.manager.hasCommitted());
    }
}
