package org.apache.storm.kafka.spout;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutEmitTest.class */
public class KafkaSpoutEmitTest {
    private final long offsetCommitPeriodMs = 2000;
    private final TopologyContext contextMock = (TopologyContext) Mockito.mock(TopologyContext.class);
    private final SpoutOutputCollector collectorMock = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    private final Map<String, Object> conf = new HashMap();
    private final TopicPartition partition = new TopicPartition("test", 1);
    private KafkaConsumer<String, String> consumerMock;
    private KafkaSpoutConfig<String, String> spoutConfig;

    @BeforeEach
    public void setUp() {
        this.spoutConfig = SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setOffsetCommitPeriodMs(2000L).build();
        this.consumerMock = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
    }

    @Test
    public void testNextTupleEmitsAtMostOneTuple() {
        KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(this.spoutConfig, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
        HashMap hashMap = new HashMap();
        hashMap.put(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, 10));
        Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(hashMap));
        kafkaSpout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(1))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), ArgumentMatchers.any(KafkaSpoutMessageId.class));
    }

    @Test
    public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() throws IOException {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(this.spoutConfig, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
            HashMap hashMap = new HashMap();
            int maxUncommittedOffsets = this.spoutConfig.getMaxUncommittedOffsets();
            hashMap.put(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, maxUncommittedOffsets));
            Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(hashMap));
            for (int i = 0; i < maxUncommittedOffsets; i++) {
                kafkaSpout.nextTuple();
            }
            ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(maxUncommittedOffsets))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
            Iterator it = forClass.getAllValues().iterator();
            while (it.hasNext()) {
                kafkaSpout.fail((KafkaSpoutMessageId) it.next());
            }
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            Time.advanceTime(50L);
            for (int i2 = 0; i2 < maxUncommittedOffsets; i2++) {
                kafkaSpout.nextTuple();
            }
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(maxUncommittedOffsets))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), ArgumentCaptor.forClass(KafkaSpoutMessageId.class).capture());
            ArrayList arrayList = new ArrayList();
            Iterator it2 = forClass.getAllValues().iterator();
            while (it2.hasNext()) {
                arrayList.add(Long.valueOf(((KafkaSpoutMessageId) it2.next()).offset()));
            }
            InOrder inOrder = Mockito.inOrder(new Object[]{this.consumerMock});
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).seek(this.partition, ((Long) arrayList.get(0)).longValue());
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).poll(ArgumentMatchers.anyLong());
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            TopicPartition topicPartition = new TopicPartition("test", 2);
            KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(this.spoutConfig, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition, topicPartition);
            HashMap hashMap = new HashMap();
            hashMap.put(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, this.spoutConfig.getMaxUncommittedOffsets()));
            hashMap.put(topicPartition, SpoutWithMockedConsumerSetupHelper.createRecords(topicPartition, 0L, this.spoutConfig.getMaxUncommittedOffsets() + 1));
            int maxUncommittedOffsets = (this.spoutConfig.getMaxUncommittedOffsets() * 2) + 1;
            Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(hashMap));
            for (int i = 0; i < maxUncommittedOffsets; i++) {
                kafkaSpout.nextTuple();
            }
            ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(maxUncommittedOffsets))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
            Optional findAny = forClass.getAllValues().stream().filter(kafkaSpoutMessageId -> {
                return kafkaSpoutMessageId.partition() == this.partition.partition();
            }).findAny();
            kafkaSpout.fail(findAny.get());
            kafkaSpout.fail(forClass.getAllValues().stream().filter(kafkaSpoutMessageId2 -> {
                return kafkaSpoutMessageId2.partition() == topicPartition.partition();
            }).max((kafkaSpoutMessageId3, kafkaSpoutMessageId4) -> {
                return (int) (kafkaSpoutMessageId3.offset() - kafkaSpoutMessageId4.offset());
            }).get());
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            Time.advanceTime(50L);
            Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.singletonMap(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, ((KafkaSpoutMessageId) findAny.get()).offset(), 1))));
            kafkaSpout.nextTuple();
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(1))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), ArgumentMatchers.any());
            InOrder inOrder = Mockito.inOrder(new Object[]{this.consumerMock});
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).seek(this.partition, ((KafkaSpoutMessageId) findAny.get()).offset());
            ((KafkaConsumer) inOrder.verify(this.consumerMock, Mockito.never())).seek((TopicPartition) ArgumentMatchers.eq(topicPartition), ArgumentMatchers.anyLong());
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).pause(Collections.singleton(topicPartition));
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).poll(ArgumentMatchers.anyLong());
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).resume(Collections.singleton(topicPartition));
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            kafkaSpout.nextTuple();
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), ArgumentMatchers.any());
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }
}
