package org.apache.storm.kafka.spout;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.KafkaUnitExtension;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.class */
public class KafkaSpoutReactivationTest {

    @Captor
    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    private Consumer<String, String> consumerSpy;
    private KafkaSpout<String, String> spout;

    @RegisterExtension
    public KafkaUnitExtension kafkaUnitExtension = new KafkaUnitExtension();
    private final TopologyContext topologyContext = (TopologyContext) Mockito.mock(TopologyContext.class);
    private final Map<String, Object> conf = new HashMap();
    private final SpoutOutputCollector collector = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    private final long commitOffsetPeriodMs = 2000;
    private final int maxPollRecords = 10;

    public void prepareSpout(int i, FirstPollOffsetStrategy firstPollOffsetStrategy) throws Exception {
        KafkaSpoutConfig build = SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + this.kafkaUnitExtension.getKafkaUnit().getKafkaPort(), new String[]{"test"})).setFirstPollOffsetStrategy(firstPollOffsetStrategy).setOffsetCommitPeriodMs(2000L).setProp("max.poll.records", 10).build();
        this.consumerSpy = (Consumer) Mockito.spy(new ConsumerFactoryDefault().createConsumer(build.getKafkaProps()));
        ConsumerFactory consumerFactory = (ConsumerFactory) Mockito.mock(ConsumerFactory.class);
        Mockito.when(consumerFactory.createConsumer((Map) ArgumentMatchers.any())).thenReturn(this.consumerSpy);
        this.spout = new KafkaSpout<>(build, consumerFactory, new TopicAssigner());
        SingleTopicKafkaUnitSetupHelper.populateTopicData(this.kafkaUnitExtension.getKafkaUnit(), "test", i);
        SingleTopicKafkaUnitSetupHelper.initializeSpout(this.spout, this.conf, this.topologyContext, this.collector);
    }

    private KafkaSpoutMessageId emitOne() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collector)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
        Mockito.clearInvocations(new SpoutOutputCollector[]{this.collector});
        return (KafkaSpoutMessageId) forClass.getValue();
    }

    private void doReactivationTest(FirstPollOffsetStrategy firstPollOffsetStrategy) throws Exception {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            try {
                prepareSpout(20, firstPollOffsetStrategy);
                for (int i = 0; i < 7 - 1; i++) {
                    this.spout.ack(emitOne());
                }
                KafkaSpoutMessageId emitOne = emitOne();
                this.spout.deactivate();
                SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(this.consumerSpy, this.commitCapture, 7 - 1);
                Mockito.clearInvocations(new Consumer[]{this.consumerSpy});
                this.spout.ack(emitOne);
                this.spout.activate();
                for (int i2 = 7; i2 < 20; i2++) {
                    this.spout.ack(emitOne());
                }
                Time.advanceTime(2500L);
                this.spout.nextTuple();
                SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(this.consumerSpy, this.commitCapture, 20);
                Mockito.clearInvocations(new SpoutOutputCollector[]{this.collector});
                this.spout.nextTuple();
                ((SpoutOutputCollector) Mockito.verify(this.collector, Mockito.never())).emit((String) ArgumentMatchers.any(), (List) ArgumentMatchers.any(), ArgumentMatchers.any());
                if (simulatedTime != null) {
                    if (0 == 0) {
                        simulatedTime.close();
                        return;
                    }
                    try {
                        simulatedTime.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (simulatedTime != null) {
                if (th != null) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSpoutShouldResumeWhereItLeftOffWithUncommittedEarliestStrategy() throws Exception {
        doReactivationTest(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
    }

    @Test
    public void testSpoutShouldResumeWhereItLeftOffWithEarliestStrategy() throws Exception {
        doReactivationTest(FirstPollOffsetStrategy.EARLIEST);
    }

    @Test
    public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws Exception {
        prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
        for (int i = 0; i < 5; i++) {
            this.spout.ack(emitOne());
        }
        this.spout.deactivate();
        Assert.assertThat(((Map) this.spout.getKafkaOffsetMetric().getValueAndReset()).get("test/totalSpoutLag"), CoreMatchers.is(5L));
    }
}
