package org.apache.storm.kafka.spout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.class */
public class KafkaSpoutRetryLimitTest {
    private final long offsetCommitPeriodMs = 2000;
    private final TopologyContext contextMock = (TopologyContext) Mockito.mock(TopologyContext.class);
    private final SpoutOutputCollector collectorMock = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    private final Map<String, Object> conf = new HashMap();
    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    private KafkaConsumer<String, String> consumerMock;
    private KafkaSpout<String, String> spout;
    private KafkaSpoutConfig spoutConfig;
    public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));

    private void setupSpoutWithNoRetry(Set<TopicPartition> set) {
        this.spoutConfig = SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder(-1).setOffsetCommitPeriodMs(2000L).setRetry(ZERO_RETRIES_RETRY_SERVICE).build();
        this.consumerMock = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
        this.spout = new KafkaSpout<>(this.spoutConfig, new KafkaConsumerFactory<String, String>() { // from class: org.apache.storm.kafka.spout.KafkaSpoutRetryLimitTest.1
            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
                return KafkaSpoutRetryLimitTest.this.consumerMock;
            }
        });
        this.spout.open(this.conf, this.contextMock, this.collectorMock);
        this.spout.activate();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
        ((KafkaConsumer) Mockito.verify(this.consumerMock)).subscribe(Matchers.anyCollection(), (ConsumerRebalanceListener) forClass.capture());
        ((ConsumerRebalanceListener) forClass.getValue()).onPartitionsAssigned(set);
    }

    @Test
    public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            setupSpoutWithNoRetry(Collections.singleton(this.partition));
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i <= 3; i++) {
                arrayList.add(new ConsumerRecord(this.partition.topic(), this.partition.partition(), i, "key", "value"));
            }
            hashMap.put(this.partition, arrayList);
            Mockito.when(this.consumerMock.poll(Matchers.anyLong())).thenReturn(new ConsumerRecords(hashMap));
            for (int i2 = 0; i2 < arrayList.size(); i2++) {
                this.spout.nextTuple();
            }
            ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(arrayList.size()))).emit(Matchers.anyString(), Matchers.anyList(), forClass.capture());
            Iterator it = forClass.getAllValues().iterator();
            while (it.hasNext()) {
                this.spout.fail((KafkaSpoutMessageId) it.next());
            }
            Time.advanceTime(2500L);
            this.spout.nextTuple();
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Map.class);
            InOrder inOrder = Mockito.inOrder(new Object[]{this.consumerMock});
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).commitSync((Map) forClass2.capture());
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).poll(Matchers.anyLong());
            Assert.assertTrue(((Map) forClass2.getValue()).containsKey(this.partition));
            Assert.assertEquals(3, ((OffsetAndMetadata) ((Map) forClass2.getValue()).get(this.partition)).offset());
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }
}
