package org.apache.storm.kafka.spout;

import java.util.regex.Pattern;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.class */
public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAbstractTest {
    public KafkaSpoutTopologyDeployActivateDeactivateTest() {
        super(2000L);
    }

    @Override // org.apache.storm.kafka.spout.KafkaSpoutAbstractTest
    KafkaSpoutConfig<String, String> createSpoutConfig() {
        return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + this.kafkaUnitExtension.getKafkaUnit().getKafkaPort(), Pattern.compile("test"))).setOffsetCommitPeriodMs(this.commitOffsetPeriodMs).setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST).build();
    }

    @Test
    public void test_FirstPollStrategy_Earliest_NotEnforced_OnTopologyActivateDeactivate() throws Exception {
        prepareSpout(2);
        nextTuple_verifyEmitted_ack_resetCollector(0);
        this.spout.deactivate();
        verifyAllMessagesCommitted(1L);
        this.spout.activate();
        nextTuple_verifyEmitted_ack_resetCollector(1);
        commitAndVerifyAllMessagesCommitted(2L);
    }

    @Test
    public void test_FirstPollStrategy_Earliest_NotEnforced_OnPartitionReassignment() throws Exception {
        Mockito.when(this.topologyContext.getStormId()).thenReturn("topology-1");
        prepareSpout(2);
        nextTuple_verifyEmitted_ack_resetCollector(0);
        this.spout.deactivate();
        verifyAllMessagesCommitted(1L);
        setUp();
        SingleTopicKafkaUnitSetupHelper.initializeSpout(this.spout, this.conf, this.topologyContext, this.collectorMock);
        nextTuple_verifyEmitted_ack_resetCollector(1);
        commitAndVerifyAllMessagesCommitted(2L);
    }

    @Test
    public void test_FirstPollStrategy_Earliest_Enforced_OnlyOnTopologyDeployment() throws Exception {
        Mockito.when(this.topologyContext.getStormId()).thenReturn("topology-1");
        prepareSpout(2);
        nextTuple_verifyEmitted_ack_resetCollector(0);
        this.spout.deactivate();
        verifyAllMessagesCommitted(1L);
        setUp();
        Mockito.when(this.topologyContext.getStormId()).thenReturn("topology-2");
        SingleTopicKafkaUnitSetupHelper.initializeSpout(this.spout, this.conf, this.topologyContext, this.collectorMock);
        for (int i = 0; i < 2; i++) {
            nextTuple_verifyEmitted_ack_resetCollector(i);
        }
        commitAndVerifyAllMessagesCommitted(2L);
    }
}
