package org.apache.storm.kafka.spout;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.KafkaUnitRule;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.class */
public class SingleTopicKafkaSpoutTest {

    @Captor
    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    private KafkaConsumer<String, String> consumerSpy;
    private KafkaConsumerFactory<String, String> consumerFactory;
    private KafkaSpout<String, String> spout;

    @Rule
    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
    private final TopologyContext topologyContext = (TopologyContext) Mockito.mock(TopologyContext.class);
    private final Map<String, Object> conf = new HashMap();
    private final SpoutOutputCollector collector = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    private final long commitOffsetPeriodMs = 2000;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        KafkaSpoutConfig<String, String> kafkaSpoutConfig = SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig(this.kafkaUnitRule.getKafkaUnit().getKafkaPort(), 2000L);
        this.consumerSpy = (KafkaConsumer) Mockito.spy(new KafkaConsumerFactoryDefault().createConsumer(kafkaSpoutConfig));
        this.consumerFactory = new KafkaConsumerFactory<String, String>() { // from class: org.apache.storm.kafka.spout.SingleTopicKafkaSpoutTest.1
            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig2) {
                return SingleTopicKafkaSpoutTest.this.consumerSpy;
            }
        };
        this.spout = new KafkaSpout<>(kafkaSpoutConfig, this.consumerFactory);
    }

    void populateTopicData(String str, int i) throws InterruptedException, ExecutionException, TimeoutException {
        this.kafkaUnitRule.getKafkaUnit().createTopic(str);
        for (int i2 = 0; i2 < i; i2++) {
            this.kafkaUnitRule.getKafkaUnit().sendMessage(new ProducerRecord(str, Integer.toString(i2), Integer.toString(i2)));
        }
    }

    private void initializeSpout(int i) throws InterruptedException, ExecutionException, TimeoutException {
        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, i);
        this.spout.open(this.conf, this.topologyContext, this.collector);
        this.spout.activate();
    }

    private void verifyAllMessagesCommitted(long j) {
        ((KafkaConsumer) Mockito.verify(this.consumerSpy, Mockito.times(1))).commitSync((Map) this.commitCapture.capture());
        Map map = (Map) this.commitCapture.getValue();
        MatcherAssert.assertThat("Expected commits for only one topic partition", Integer.valueOf(map.entrySet().size()), CoreMatchers.is(1));
        MatcherAssert.assertThat("Expected committed offset to cover all emitted messages", Long.valueOf(((OffsetAndMetadata) ((Map.Entry) map.entrySet().iterator().next()).getValue()).offset()), CoreMatchers.is(Long.valueOf(j - 1)));
    }

    @Test
    public void shouldContinueWithSlowDoubleAcks() throws Exception {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            initializeSpout(20);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
            this.spout.nextTuple();
            ((SpoutOutputCollector) Mockito.verify(this.collector)).emit(Matchers.anyString(), Matchers.anyList(), forClass.capture());
            this.spout.ack(forClass.getValue());
            for (int i = 0; i < 20 / 2; i++) {
                this.spout.nextTuple();
            }
            this.spout.ack(forClass.getValue());
            for (int i2 = 0; i2 < 20; i2++) {
                this.spout.nextTuple();
            }
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Object.class);
            ((SpoutOutputCollector) Mockito.verify(this.collector, Mockito.times(20))).emit((String) Matchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), Matchers.anyList(), forClass2.capture());
            Iterator it = forClass2.getAllValues().iterator();
            while (it.hasNext()) {
                this.spout.ack(it.next());
            }
            Time.advanceTime(2500L);
            this.spout.nextTuple();
            verifyAllMessagesCommitted(20);
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldEmitAllMessages() throws Exception {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            initializeSpout(10);
            for (int i = 0; i < 10; i++) {
                this.spout.nextTuple();
                ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
                ((SpoutOutputCollector) Mockito.verify(this.collector)).emit((String) Matchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), (List) Matchers.eq(new Values(new Object[]{SingleTopicKafkaSpoutConfiguration.TOPIC, Integer.toString(i), Integer.toString(i)})), forClass.capture());
                this.spout.ack(forClass.getValue());
                Mockito.reset(new SpoutOutputCollector[]{this.collector});
            }
            Time.advanceTime(2500L);
            this.spout.nextTuple();
            verifyAllMessagesCommitted(10);
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldReplayInOrderFailedMessages() throws Exception {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            initializeSpout(10);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
            this.spout.nextTuple();
            ((SpoutOutputCollector) Mockito.verify(this.collector)).emit(Matchers.anyString(), Matchers.anyList(), forClass.capture());
            this.spout.ack(forClass.getValue());
            Mockito.reset(new SpoutOutputCollector[]{this.collector});
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Object.class);
            this.spout.nextTuple();
            ((SpoutOutputCollector) Mockito.verify(this.collector)).emit(Matchers.anyString(), Matchers.anyList(), forClass2.capture());
            this.spout.fail(forClass2.getValue());
            Mockito.reset(new SpoutOutputCollector[]{this.collector});
            for (int i = 0; i < 10; i++) {
                this.spout.nextTuple();
            }
            ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
            ((SpoutOutputCollector) Mockito.verify(this.collector, Mockito.times(10 - 1))).emit((String) Matchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), Matchers.anyList(), forClass3.capture());
            Iterator it = forClass3.getAllValues().iterator();
            while (it.hasNext()) {
                this.spout.ack(it.next());
            }
            Time.advanceTime(2500L);
            this.spout.nextTuple();
            verifyAllMessagesCommitted(10);
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            initializeSpout(10);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
            this.spout.nextTuple();
            ((SpoutOutputCollector) Mockito.verify(this.collector)).emit(Matchers.anyString(), Matchers.anyList(), forClass.capture());
            Mockito.reset(new SpoutOutputCollector[]{this.collector});
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Object.class);
            this.spout.nextTuple();
            ((SpoutOutputCollector) Mockito.verify(this.collector)).emit(Matchers.anyString(), Matchers.anyList(), forClass2.capture());
            Mockito.reset(new SpoutOutputCollector[]{this.collector});
            this.spout.ack(forClass2.getValue());
            this.spout.fail(forClass.getValue());
            for (int i = 0; i < 10; i++) {
                this.spout.nextTuple();
            }
            ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
            ((SpoutOutputCollector) Mockito.verify(this.collector, Mockito.times(10 - 1))).emit((String) Matchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), Matchers.anyList(), forClass3.capture());
            Iterator it = forClass3.getAllValues().iterator();
            while (it.hasNext()) {
                this.spout.ack(it.next());
            }
            Time.advanceTime(2500L);
            this.spout.nextTuple();
            verifyAllMessagesCommitted(10);
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }
}
