package org.apache.storm.kafka.spout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.class */
public class KafkaSpoutRebalanceTest {

    @Captor
    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    private final long offsetCommitPeriodMs = 2000;
    private final Map<String, Object> conf = new HashMap();
    private TopologyContext contextMock;
    private SpoutOutputCollector collectorMock;
    private KafkaConsumer<String, String> consumerMock;
    private KafkaConsumerFactory<String, String> consumerFactoryMock;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        this.contextMock = (TopologyContext) Mockito.mock(TopologyContext.class);
        this.collectorMock = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
        this.consumerMock = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
        this.consumerFactoryMock = new KafkaConsumerFactory<String, String>() { // from class: org.apache.storm.kafka.spout.KafkaSpoutRebalanceTest.1
            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
                return KafkaSpoutRebalanceTest.this.consumerMock;
            }
        };
    }

    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> kafkaSpout, TopicPartition topicPartition, TopicPartition topicPartition2) {
        kafkaSpout.open(this.conf, this.contextMock, this.collectorMock);
        kafkaSpout.activate();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
        ((KafkaConsumer) Mockito.verify(this.consumerMock)).subscribe(Matchers.anyCollection(), (ConsumerRebalanceListener) forClass.capture());
        ConsumerRebalanceListener consumerRebalanceListener = (ConsumerRebalanceListener) forClass.getValue();
        ArrayList arrayList = new ArrayList();
        arrayList.add(topicPartition);
        arrayList.add(topicPartition2);
        consumerRebalanceListener.onPartitionsAssigned(arrayList);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Collections.singletonList(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), 0L, "key", "value")));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicPartition2, Collections.singletonList(new ConsumerRecord(topicPartition2.topic(), topicPartition2.partition(), 0L, "key", "value")));
        Mockito.when(this.consumerMock.poll(Matchers.anyLong())).thenReturn(new ConsumerRecords(hashMap)).thenReturn(new ConsumerRecords(hashMap2)).thenReturn(new ConsumerRecords(Collections.emptyMap()));
        kafkaSpout.nextTuple();
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(Mockito.anyString(), Mockito.anyList(), forClass2.capture());
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        kafkaSpout.nextTuple();
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(Mockito.anyString(), Mockito.anyList(), forClass3.capture());
        consumerRebalanceListener.onPartitionsRevoked(arrayList);
        consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition2));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(forClass2.getValue());
        arrayList2.add(forClass3.getValue());
        return arrayList2;
    }

    @Test
    public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder(-1).setOffsetCommitPeriodMs(2000L).build(), this.consumerFactoryMock);
            TopicPartition topicPartition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
            TopicPartition topicPartition2 = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
            List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition = emitOneMessagePerPartitionThenRevokeOnePartition(kafkaSpout, topicPartition, topicPartition2);
            kafkaSpout.ack(emitOneMessagePerPartitionThenRevokeOnePartition.get(0));
            kafkaSpout.ack(emitOneMessagePerPartitionThenRevokeOnePartition.get(1));
            Time.advanceTime(2500L);
            kafkaSpout.nextTuple();
            ((KafkaConsumer) Mockito.verify(this.consumerMock, Mockito.times(1))).commitSync((Map) this.commitCapture.capture());
            Map map = (Map) this.commitCapture.getValue();
            Assert.assertThat(map, org.hamcrest.Matchers.hasKey(topicPartition2));
            Assert.assertThat(map, CoreMatchers.not(org.hamcrest.Matchers.hasKey(topicPartition)));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
        KafkaSpoutRetryService kafkaSpoutRetryService = (KafkaSpoutRetryService) Mockito.mock(KafkaSpoutRetryService.class);
        KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder(-1).setOffsetCommitPeriodMs(10L).setRetry(kafkaSpoutRetryService).build(), this.consumerFactoryMock);
        TopicPartition topicPartition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
        TopicPartition topicPartition2 = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
        Mockito.when(kafkaSpoutRetryService.getMessageId((ConsumerRecord) Mockito.any(ConsumerRecord.class))).thenReturn(new KafkaSpoutMessageId(topicPartition, 0L)).thenReturn(new KafkaSpoutMessageId(topicPartition2, 0L));
        List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition = emitOneMessagePerPartitionThenRevokeOnePartition(kafkaSpout, topicPartition, topicPartition2);
        ((KafkaSpoutRetryService) Mockito.verify(kafkaSpoutRetryService, Mockito.times(2))).getMessageId((ConsumerRecord) Mockito.any(ConsumerRecord.class));
        kafkaSpout.fail(emitOneMessagePerPartitionThenRevokeOnePartition.get(0));
        kafkaSpout.fail(emitOneMessagePerPartitionThenRevokeOnePartition.get(1));
        ((KafkaSpoutRetryService) Mockito.verify(kafkaSpoutRetryService, Mockito.never())).schedule(emitOneMessagePerPartitionThenRevokeOnePartition.get(0));
        ((KafkaSpoutRetryService) Mockito.verify(kafkaSpoutRetryService)).schedule(emitOneMessagePerPartitionThenRevokeOnePartition.get(1));
    }
}
