package org.apache.storm.kafka.spout;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.class */
public class KafkaSpoutRebalanceTest {

    @Captor
    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    private final long offsetCommitPeriodMs = 2000;
    private final Map<String, Object> conf = new HashMap();

    @Mock
    private TopologyContext contextMock;

    @Mock
    private SpoutOutputCollector collectorMock;

    @Mock
    private KafkaConsumer<String, String> consumerMock;
    private ConsumerFactory<String, String> consumerFactory;

    @Mock
    private TopicFilter topicFilterMock;

    @Mock
    private ManualPartitioner partitionerMock;

    @BeforeEach
    public void setUp() {
        this.consumerFactory = map -> {
            return this.consumerMock;
        };
        Answer answer = invocationOnMock -> {
            return new HashSet();
        };
        ((TopicFilter) Mockito.doAnswer(answer).when(this.topicFilterMock)).getAllSubscribedPartitions((Consumer) ArgumentMatchers.any());
        ((ManualPartitioner) Mockito.doAnswer(answer).when(this.partitionerMock)).getPartitionsForThisTask((List) ArgumentMatchers.any(), (TopologyContext) ArgumentMatchers.any());
    }

    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> kafkaSpout, TopicPartition topicPartition, TopicPartition topicPartition2, TopicAssigner topicAssigner) {
        kafkaSpout.open(this.conf, this.contextMock, this.collectorMock);
        kafkaSpout.activate();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
        ((TopicAssigner) Mockito.verify(topicAssigner)).assignPartitions((Consumer) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (ConsumerRebalanceListener) forClass.capture());
        ConsumerRebalanceListener consumerRebalanceListener = (ConsumerRebalanceListener) forClass.getValue();
        HashSet hashSet = new HashSet();
        hashSet.add(topicPartition);
        hashSet.add(topicPartition2);
        consumerRebalanceListener.onPartitionsAssigned(hashSet);
        Mockito.when(this.consumerMock.assignment()).thenReturn(hashSet);
        Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.singletonMap(topicPartition, SpoutWithMockedConsumerSetupHelper.createRecords(topicPartition, 0L, 1)))).thenReturn(new ConsumerRecords(Collections.singletonMap(topicPartition2, SpoutWithMockedConsumerSetupHelper.createRecords(topicPartition2, 0L, 1)))).thenReturn(new ConsumerRecords(Collections.emptyMap()));
        kafkaSpout.nextTuple();
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass2.capture());
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        kafkaSpout.nextTuple();
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass3.capture());
        consumerRebalanceListener.onPartitionsRevoked(hashSet);
        consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition2));
        ((KafkaConsumer) Mockito.lenient().doAnswer(invocationOnMock -> {
            return Collections.singleton(topicPartition2);
        }).when(this.consumerMock)).assignment();
        ArrayList arrayList = new ArrayList();
        arrayList.add(forClass2.getValue());
        arrayList.add(forClass3.getValue());
        return arrayList;
    }

    @Test
    public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            TopicAssigner topicAssigner = (TopicAssigner) Mockito.mock(TopicAssigner.class);
            KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(this.topicFilterMock, this.partitionerMock, -1).setOffsetCommitPeriodMs(2000L).build(), this.consumerFactory, topicAssigner);
            TopicPartition topicPartition = new TopicPartition("test", 1);
            TopicPartition topicPartition2 = new TopicPartition("test", 2);
            List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition = emitOneMessagePerPartitionThenRevokeOnePartition(kafkaSpout, topicPartition, topicPartition2, topicAssigner);
            kafkaSpout.ack(emitOneMessagePerPartitionThenRevokeOnePartition.get(0));
            kafkaSpout.ack(emitOneMessagePerPartitionThenRevokeOnePartition.get(1));
            Time.advanceTime(2500L);
            kafkaSpout.nextTuple();
            ((KafkaConsumer) Mockito.verify(this.consumerMock, Mockito.times(1))).commitSync((Map) this.commitCapture.capture());
            Map map = (Map) this.commitCapture.getValue();
            MatcherAssert.assertThat(map, Matchers.hasKey(topicPartition2));
            MatcherAssert.assertThat(map, CoreMatchers.not(Matchers.hasKey(topicPartition)));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() {
        TopicAssigner topicAssigner = (TopicAssigner) Mockito.mock(TopicAssigner.class);
        KafkaSpoutRetryService kafkaSpoutRetryService = (KafkaSpoutRetryService) Mockito.mock(KafkaSpoutRetryService.class);
        KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(this.topicFilterMock, this.partitionerMock, -1).setOffsetCommitPeriodMs(10L).setRetry(kafkaSpoutRetryService).build(), this.consumerFactory, topicAssigner);
        TopicPartition topicPartition = new TopicPartition("test", 1);
        TopicPartition topicPartition2 = new TopicPartition("test", 2);
        Mockito.when(kafkaSpoutRetryService.getMessageId((TopicPartition) ArgumentMatchers.any(TopicPartition.class), ArgumentMatchers.anyLong())).thenReturn(new KafkaSpoutMessageId(topicPartition, 0L)).thenReturn(new KafkaSpoutMessageId(topicPartition2, 0L));
        List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition = emitOneMessagePerPartitionThenRevokeOnePartition(kafkaSpout, topicPartition, topicPartition2, topicAssigner);
        ((KafkaSpoutRetryService) Mockito.verify(kafkaSpoutRetryService, Mockito.times(2))).getMessageId((TopicPartition) ArgumentMatchers.any(TopicPartition.class), ArgumentMatchers.anyLong());
        kafkaSpout.fail(emitOneMessagePerPartitionThenRevokeOnePartition.get(0));
        kafkaSpout.fail(emitOneMessagePerPartitionThenRevokeOnePartition.get(1));
        ((KafkaSpoutRetryService) Mockito.verify(kafkaSpoutRetryService, Mockito.never())).schedule(emitOneMessagePerPartitionThenRevokeOnePartition.get(0));
        ((KafkaSpoutRetryService) Mockito.verify(kafkaSpoutRetryService)).schedule(emitOneMessagePerPartitionThenRevokeOnePartition.get(1));
    }

    @Test
    public void testReassignPartitionSeeksForOnlyNewPartitions() {
        TopicAssigner topicAssigner = (TopicAssigner) Mockito.mock(TopicAssigner.class);
        KafkaSpout kafkaSpout = new KafkaSpout(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(this.topicFilterMock, this.partitionerMock, -1).setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST).build(), this.consumerFactory, topicAssigner);
        TopicPartition topicPartition = new TopicPartition("test", 1);
        TopicPartition topicPartition2 = new TopicPartition("test", 2);
        kafkaSpout.open(this.conf, this.contextMock, this.collectorMock);
        kafkaSpout.activate();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
        ((TopicAssigner) Mockito.verify(topicAssigner)).assignPartitions((Consumer) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (ConsumerRebalanceListener) forClass.capture());
        ConsumerRebalanceListener consumerRebalanceListener = (ConsumerRebalanceListener) forClass.getValue();
        HashSet hashSet = new HashSet();
        hashSet.add(topicPartition);
        consumerRebalanceListener.onPartitionsAssigned(hashSet);
        Mockito.reset(new KafkaConsumer[]{this.consumerMock});
        long j = 500;
        Answer answer = invocationOnMock -> {
            return new OffsetAndMetadata(j);
        };
        ((KafkaConsumer) Mockito.lenient().doAnswer(answer).when(this.consumerMock)).committed(topicPartition);
        ((KafkaConsumer) Mockito.doAnswer(answer).when(this.consumerMock)).committed(topicPartition2);
        consumerRebalanceListener.onPartitionsRevoked(hashSet);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(topicPartition);
        hashSet2.add(topicPartition2);
        consumerRebalanceListener.onPartitionsAssigned(hashSet2);
        ((KafkaConsumer) Mockito.verify(this.consumerMock, Mockito.never())).seek((TopicPartition) ArgumentMatchers.eq(topicPartition), ArgumentMatchers.anyLong());
        ((KafkaConsumer) Mockito.verify(this.consumerMock)).seek(topicPartition2, 500L);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -754015326:
                if (implMethodName.equals("lambda$setUp$159cde02$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/storm/kafka/spout/internal/ConsumerFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("createConsumer") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/util/Map;)Lorg/apache/kafka/clients/consumer/Consumer;") && serializedLambda.getImplClass().equals("org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;)Lorg/apache/kafka/clients/consumer/Consumer;")) {
                    KafkaSpoutRebalanceTest kafkaSpoutRebalanceTest = (KafkaSpoutRebalanceTest) serializedLambda.getCapturedArg(0);
                    return map -> {
                        return this.consumerMock;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
