package org.apache.storm.kafka.spout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutCommitTest.class */
public class KafkaSpoutCommitTest {
    private final long offsetCommitPeriodMs = 2000;
    private final TopologyContext contextMock = (TopologyContext) Mockito.mock(TopologyContext.class);
    private final SpoutOutputCollector collectorMock = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    private final Map<String, Object> conf = new HashMap();
    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    private KafkaConsumer<String, String> consumerMock;
    private KafkaSpout<String, String> spout;
    private KafkaSpoutConfig spoutConfig;

    @Captor
    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;

    private void setupSpout(Set<TopicPartition> set) {
        MockitoAnnotations.initMocks(this);
        this.spoutConfig = SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder(-1).setOffsetCommitPeriodMs(2000L).build();
        this.consumerMock = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
        this.spout = new KafkaSpout<>(this.spoutConfig, new KafkaConsumerFactory<String, String>() { // from class: org.apache.storm.kafka.spout.KafkaSpoutCommitTest.1
            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
                return KafkaSpoutCommitTest.this.consumerMock;
            }
        });
        this.spout.open(this.conf, this.contextMock, this.collectorMock);
        this.spout.activate();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
        ((KafkaConsumer) Mockito.verify(this.consumerMock)).subscribe(Mockito.anyCollection(), (ConsumerRebalanceListener) forClass.capture());
        ((ConsumerRebalanceListener) forClass.getValue()).onPartitionsAssigned(set);
    }

    @Test
    public void testCommitSuccessWithOffsetVoids() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            setupSpout(Collections.singleton(this.partition));
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 5; i++) {
                arrayList.add(new ConsumerRecord(this.partition.topic(), this.partition.partition(), i, "key", "value"));
            }
            for (int i2 = 8; i2 < 10; i2++) {
                arrayList.add(new ConsumerRecord(this.partition.topic(), this.partition.partition(), i2, "key", "value"));
            }
            hashMap.put(this.partition, arrayList);
            Mockito.when(this.consumerMock.poll(Mockito.anyLong())).thenReturn(new ConsumerRecords(hashMap));
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                this.spout.nextTuple();
            }
            ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(arrayList.size()))).emit(Mockito.anyString(), Mockito.anyList(), forClass.capture());
            Iterator it = forClass.getAllValues().iterator();
            while (it.hasNext()) {
                this.spout.ack((KafkaSpoutMessageId) it.next());
            }
            Time.advanceTime(2500L);
            Map emptyMap = Collections.emptyMap();
            Mockito.when(this.consumerMock.poll(Mockito.anyLong())).thenReturn(new ConsumerRecords(emptyMap));
            this.spout.nextTuple();
            InOrder inOrder = Mockito.inOrder(new Object[]{this.consumerMock});
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).commitSync((Map) this.commitCapture.capture());
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).poll(Mockito.anyLong());
            Map map = (Map) this.commitCapture.getValue();
            Assert.assertTrue(map.containsKey(this.partition));
            Assert.assertEquals(4L, ((OffsetAndMetadata) map.get(this.partition)).offset());
            Mockito.reset(new KafkaConsumer[]{this.consumerMock});
            Mockito.when(this.consumerMock.poll(Mockito.anyLong())).thenReturn(new ConsumerRecords(emptyMap));
            Time.advanceTime(2500L);
            this.spout.nextTuple();
            InOrder inOrder2 = Mockito.inOrder(new Object[]{this.consumerMock});
            ((KafkaConsumer) inOrder2.verify(this.consumerMock)).commitSync((Map) this.commitCapture.capture());
            ((KafkaConsumer) inOrder2.verify(this.consumerMock)).poll(Mockito.anyLong());
            Map map2 = (Map) this.commitCapture.getValue();
            Assert.assertTrue(map2.containsKey(this.partition));
            Assert.assertEquals(9L, ((OffsetAndMetadata) map2.get(this.partition)).offset());
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }
}
