package org.apache.storm.kafka.spout;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.NullRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.class */
public class KafkaSpoutMessagingGuaranteeTest {

    @Captor
    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    private final TopologyContext contextMock = (TopologyContext) Mockito.mock(TopologyContext.class);
    private final SpoutOutputCollector collectorMock = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    private final Map<String, Object> conf = new HashMap();
    private final TopicPartition partition = new TopicPartition("test", 1);
    private KafkaConsumer<String, String> consumerMock;

    @BeforeEach
    public void setUp() {
        this.consumerMock = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
    }

    @Test
    public void testAtMostOnceModeCommitsBeforeEmit() {
        KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE).build(), this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
        Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.singletonMap(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, 1))));
        kafkaSpout.nextTuple();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.consumerMock, this.collectorMock});
        ((KafkaConsumer) inOrder.verify(this.consumerMock)).poll(ArgumentMatchers.anyLong());
        ((KafkaConsumer) inOrder.verify(this.consumerMock)).commitSync((Map) this.commitCapture.capture());
        ((SpoutOutputCollector) inOrder.verify(this.collectorMock)).emit((String) ArgumentMatchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), ArgumentMatchers.anyList());
        CommitMetadataManager commitMetadataManager = new CommitMetadataManager(this.contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
        Map map = (Map) this.commitCapture.getValue();
        MatcherAssert.assertThat(Long.valueOf(((OffsetAndMetadata) map.get(this.partition)).offset()), CoreMatchers.is(0L));
        MatcherAssert.assertThat(((OffsetAndMetadata) map.get(this.partition)).metadata(), CoreMatchers.is(commitMetadataManager.getCommitMetadata()));
    }

    private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
        KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(kafkaSpoutConfig, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
        Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.singletonMap(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, kafkaSpoutConfig.getMaxUncommittedOffsets())))).thenReturn(new ConsumerRecords(Collections.singletonMap(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, kafkaSpoutConfig.getMaxUncommittedOffsets() - 1, kafkaSpoutConfig.getMaxUncommittedOffsets()))));
        for (int i = 0; i < kafkaSpoutConfig.getMaxUncommittedOffsets() * 2; i++) {
            kafkaSpout.nextTuple();
        }
        ((KafkaConsumer) Mockito.verify(this.consumerMock, Mockito.times(2))).poll(ArgumentMatchers.anyLong());
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(kafkaSpoutConfig.getMaxUncommittedOffsets() * 2))).emit((String) ArgumentMatchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), ArgumentMatchers.anyList());
    }

    @Test
    public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() {
        doTestModeDisregardsMaxUncommittedOffsets(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE).build());
    }

    @Test
    public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() {
        doTestModeDisregardsMaxUncommittedOffsets(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE).build());
    }

    private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
        KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(kafkaSpoutConfig, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
        Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.singletonMap(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, 1))));
        kafkaSpout.nextTuple();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit((String) ArgumentMatchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), ArgumentMatchers.anyList(), forClass.capture());
        MatcherAssert.assertThat("Should have captured a message id", forClass.getValue(), CoreMatchers.not(CoreMatchers.nullValue()));
        kafkaSpout.fail(forClass.getValue());
        Mockito.reset(new KafkaConsumer[]{this.consumerMock});
        kafkaSpout.nextTuple();
        ((KafkaConsumer) Mockito.verify(this.consumerMock, Mockito.never())).seek((TopicPartition) ArgumentMatchers.eq(this.partition), ArgumentMatchers.anyLong());
    }

    @Test
    public void testAtMostOnceModeCannotReplayTuples() {
        doTestModeCannotReplayTuples(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE).setTupleTrackingEnforced(true).build());
    }

    @Test
    public void testNoGuaranteeModeCannotReplayTuples() {
        doTestModeCannotReplayTuples(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE).setTupleTrackingEnforced(true).build());
    }

    @Test
    public void testAtMostOnceModeDoesNotCommitAckedTuples() {
        KafkaSpoutConfig build = SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE).setTupleTrackingEnforced(true).build();
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            try {
                KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(build, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
                Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.singletonMap(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, 1))));
                kafkaSpout.nextTuple();
                Mockito.clearInvocations(new KafkaConsumer[]{this.consumerMock});
                ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
                ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit((String) ArgumentMatchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), ArgumentMatchers.anyList(), forClass.capture());
                MatcherAssert.assertThat("Should have captured a message id", forClass.getValue(), CoreMatchers.not(CoreMatchers.nullValue()));
                kafkaSpout.ack(forClass.getValue());
                Time.advanceTime(500 + build.getOffsetsCommitPeriodMs());
                Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.emptyMap()));
                kafkaSpout.nextTuple();
                ((KafkaConsumer) Mockito.verify(this.consumerMock, Mockito.never())).commitSync((Map) ArgumentMatchers.argThat(map -> {
                    return !map.containsKey(this.partition);
                }));
                if (simulatedTime != null) {
                    if (0 == 0) {
                        simulatedTime.close();
                        return;
                    }
                    try {
                        simulatedTime.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (simulatedTime != null) {
                if (th != null) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNoGuaranteeModeCommitsPolledTuples() {
        KafkaSpoutConfig build = SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE).setTupleTrackingEnforced(true).build();
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            try {
                KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(build, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
                Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.singletonMap(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, 1))));
                kafkaSpout.nextTuple();
                Mockito.when(Long.valueOf(this.consumerMock.position(this.partition))).thenReturn(1L);
                ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
                ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit((String) ArgumentMatchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), ArgumentMatchers.anyList(), forClass.capture());
                MatcherAssert.assertThat("Should have captured a message id", forClass.getValue(), CoreMatchers.not(CoreMatchers.nullValue()));
                Time.advanceTime(500 + build.getOffsetsCommitPeriodMs());
                kafkaSpout.nextTuple();
                ((KafkaConsumer) Mockito.verify(this.consumerMock)).commitAsync((Map) this.commitCapture.capture(), (OffsetCommitCallback) ArgumentMatchers.isNull());
                CommitMetadataManager commitMetadataManager = new CommitMetadataManager(this.contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
                Map map = (Map) this.commitCapture.getValue();
                MatcherAssert.assertThat(Long.valueOf(((OffsetAndMetadata) map.get(this.partition)).offset()), CoreMatchers.is(1L));
                MatcherAssert.assertThat(((OffsetAndMetadata) map.get(this.partition)).metadata(), CoreMatchers.is(commitMetadataManager.getCommitMetadata()));
                if (simulatedTime != null) {
                    if (0 == 0) {
                        simulatedTime.close();
                        return;
                    }
                    try {
                        simulatedTime.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (simulatedTime != null) {
                if (th != null) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th4;
        }
    }

    private void doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee processingGuarantee) {
        KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setProcessingGuarantee(processingGuarantee).setTupleTrackingEnforced(true).setRecordTranslator(new NullRecordTranslator()).build(), this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
        Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.singletonMap(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, 1))));
        kafkaSpout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit((String) ArgumentMatchers.any(), (List) ArgumentMatchers.any(), ArgumentMatchers.any());
    }

    @Test
    public void testAtMostOnceModeCanFilterNullTuples() {
        doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
    }

    @Test
    public void testNoGuaranteeModeCanFilterNullTuples() {
        doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
    }
}
