package org.apache.storm.kafka.spout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutEmitTest.class */
public class KafkaSpoutEmitTest {
    private final long offsetCommitPeriodMs = 2000;
    private final TopologyContext contextMock = (TopologyContext) Mockito.mock(TopologyContext.class);
    private final SpoutOutputCollector collectorMock = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    private final Map<String, Object> conf = new HashMap();
    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    private KafkaConsumer<String, String> consumerMock;
    private KafkaSpout<String, String> spout;
    private KafkaSpoutConfig spoutConfig;

    private void setupSpout(Set<TopicPartition> set) {
        this.spoutConfig = SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder(-1).setOffsetCommitPeriodMs(2000L).build();
        this.consumerMock = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
        this.spout = new KafkaSpout<>(this.spoutConfig, new KafkaConsumerFactory<String, String>() { // from class: org.apache.storm.kafka.spout.KafkaSpoutEmitTest.1
            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
                return KafkaSpoutEmitTest.this.consumerMock;
            }
        });
        this.spout.open(this.conf, this.contextMock, this.collectorMock);
        this.spout.activate();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
        ((KafkaConsumer) Mockito.verify(this.consumerMock)).subscribe(Matchers.anyCollection(), (ConsumerRebalanceListener) forClass.capture());
        ((ConsumerRebalanceListener) forClass.getValue()).onPartitionsAssigned(set);
    }

    @Test
    public void testNextTupleEmitsAtMostOneTuple() {
        setupSpout(Collections.singleton(this.partition));
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ConsumerRecord(this.partition.topic(), this.partition.partition(), i, "key", "value"));
        }
        hashMap.put(this.partition, arrayList);
        Mockito.when(this.consumerMock.poll(Matchers.anyLong())).thenReturn(new ConsumerRecords(hashMap));
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(1))).emit(Matchers.anyString(), Matchers.anyList(), Matchers.anyObject());
    }

    @Test
    public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            try {
                setupSpout(Collections.singleton(this.partition));
                HashMap hashMap = new HashMap();
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < this.spoutConfig.getMaxUncommittedOffsets(); i++) {
                    arrayList.add(new ConsumerRecord(this.partition.topic(), this.partition.partition(), i, "key", "value"));
                }
                hashMap.put(this.partition, arrayList);
                Mockito.when(this.consumerMock.poll(Matchers.anyLong())).thenReturn(new ConsumerRecords(hashMap));
                for (int i2 = 0; i2 < arrayList.size(); i2++) {
                    this.spout.nextTuple();
                }
                ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
                ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(arrayList.size()))).emit(Matchers.anyString(), Matchers.anyList(), forClass.capture());
                Iterator it = forClass.getAllValues().iterator();
                while (it.hasNext()) {
                    this.spout.fail((KafkaSpoutMessageId) it.next());
                }
                Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
                Time.advanceTime(50L);
                for (int i3 = 0; i3 < arrayList.size(); i3++) {
                    this.spout.nextTuple();
                }
                ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(arrayList.size()))).emit(Matchers.anyString(), Matchers.anyList(), ArgumentCaptor.forClass(KafkaSpoutMessageId.class).capture());
                ArrayList arrayList2 = new ArrayList();
                Iterator it2 = forClass.getAllValues().iterator();
                while (it2.hasNext()) {
                    arrayList2.add(Long.valueOf(((KafkaSpoutMessageId) it2.next()).offset()));
                }
                InOrder inOrder = Mockito.inOrder(new Object[]{this.consumerMock});
                ((KafkaConsumer) inOrder.verify(this.consumerMock)).seek(this.partition, ((Long) arrayList2.get(0)).longValue());
                ((KafkaConsumer) inOrder.verify(this.consumerMock)).poll(Matchers.anyLong());
                if (simulatedTime != null) {
                    if (0 == 0) {
                        simulatedTime.close();
                        return;
                    }
                    try {
                        simulatedTime.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (simulatedTime != null) {
                if (th != null) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            setupSpout(Collections.singleton(this.partition));
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < this.spoutConfig.getMaxUncommittedOffsets(); i++) {
                arrayList.add(new ConsumerRecord(this.partition.topic(), this.partition.partition(), i, "key", "value"));
            }
            hashMap.put(this.partition, arrayList);
            HashMap hashMap2 = new HashMap();
            ArrayList arrayList2 = new ArrayList();
            for (int i2 = 0; i2 < 5; i2++) {
                arrayList2.add(new ConsumerRecord(this.partition.topic(), this.partition.partition(), this.spoutConfig.getMaxUncommittedOffsets() + i2, "key", "value"));
            }
            hashMap2.put(this.partition, arrayList2);
            Mockito.when(this.consumerMock.poll(Matchers.anyLong())).thenReturn(new ConsumerRecords(hashMap)).thenReturn(new ConsumerRecords(hashMap2));
            for (int i3 = 0; i3 < this.spoutConfig.getMaxUncommittedOffsets() + 5; i3++) {
                this.spout.nextTuple();
            }
            ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(arrayList.size()))).emit(Matchers.anyString(), Matchers.anyList(), forClass.capture());
            KafkaSpoutMessageId kafkaSpoutMessageId = (KafkaSpoutMessageId) forClass.getAllValues().get(forClass.getAllValues().size() - 1);
            this.spout.fail(kafkaSpoutMessageId);
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            Time.advanceTime(50L);
            for (int i4 = 0; i4 < arrayList.size() + 5; i4++) {
                this.spout.nextTuple();
            }
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(5))).emit(Matchers.anyString(), Matchers.anyList(), forClass2.capture());
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            InOrder inOrder = Mockito.inOrder(new Object[]{this.consumerMock});
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).seek(this.partition, kafkaSpoutMessageId.offset());
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).poll(Matchers.anyLong());
            Time.advanceTime(50L);
            List allValues = forClass2.getAllValues();
            KafkaSpoutMessageId kafkaSpoutMessageId2 = (KafkaSpoutMessageId) allValues.remove(0);
            Iterator it = allValues.iterator();
            while (it.hasNext()) {
                this.spout.fail((KafkaSpoutMessageId) it.next());
            }
            for (int i5 = 0; i5 < arrayList.size() + 5; i5++) {
                this.spout.nextTuple();
            }
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(Matchers.anyString(), Matchers.anyList(), Matchers.anyObject());
            this.spout.fail(kafkaSpoutMessageId2);
            this.spout.nextTuple();
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(1))).emit(Matchers.anyString(), Matchers.anyList(), Matchers.anyObject());
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }
}
