package org.apache.storm.kafka.spout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.class */
public class KafkaSpoutLogCompactionSupportTest {
    private final long offsetCommitPeriodMs = 2000;
    private final TopologyContext contextMock = (TopologyContext) Mockito.mock(TopologyContext.class);
    private final SpoutOutputCollector collectorMock = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    private final Map<String, Object> conf = new HashMap();
    private final TopicPartition partition = new TopicPartition("test", 1);
    private KafkaConsumer<String, String> consumerMock;
    private KafkaSpoutConfig<String, String> spoutConfig;

    @Captor
    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        this.spoutConfig = SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder((TopicFilter) Mockito.mock(TopicFilter.class), (ManualPartitioner) Mockito.mock(ManualPartitioner.class), -1).setOffsetCommitPeriodMs(2000L).build();
        this.consumerMock = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
    }

    @Test
    public void testCommitSuccessWithOffsetVoids() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(this.spoutConfig, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, 5));
            arrayList.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 8L, 2));
            hashMap.put(this.partition, arrayList);
            Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(hashMap));
            for (int i = 0; i < arrayList.size(); i++) {
                kafkaSpout.nextTuple();
            }
            ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(arrayList.size()))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
            Iterator it = forClass.getAllValues().iterator();
            while (it.hasNext()) {
                kafkaSpout.ack((KafkaSpoutMessageId) it.next());
            }
            Time.advanceTime(2500L);
            Mockito.when(this.consumerMock.poll(ArgumentMatchers.anyLong())).thenReturn(new ConsumerRecords(Collections.emptyMap()));
            kafkaSpout.nextTuple();
            InOrder inOrder = Mockito.inOrder(new Object[]{this.consumerMock});
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).commitSync((Map) this.commitCapture.capture());
            ((KafkaConsumer) inOrder.verify(this.consumerMock)).poll(ArgumentMatchers.anyLong());
            Map map = (Map) this.commitCapture.getValue();
            Assert.assertTrue(map.containsKey(this.partition));
            Assert.assertEquals(10L, ((OffsetAndMetadata) map.get(this.partition)).offset());
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            TopicPartition topicPartition = new TopicPartition("test", 2);
            KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(this.spoutConfig, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition, topicPartition);
            List<KafkaSpoutMessageId> pollAndEmit = SpoutWithMockedConsumerSetupHelper.pollAndEmit(kafkaSpout, this.consumerMock, 3, this.collectorMock, this.partition, 0, 1, 2);
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            List<KafkaSpoutMessageId> pollAndEmit2 = SpoutWithMockedConsumerSetupHelper.pollAndEmit(kafkaSpout, this.consumerMock, 3, this.collectorMock, topicPartition, 0, 1, 2);
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            for (int i = 0; i < 3; i++) {
                kafkaSpout.fail(pollAndEmit.get(i));
                kafkaSpout.fail(pollAndEmit2.get(i));
            }
            Time.advanceTime(50L);
            HashMap hashMap = new HashMap();
            hashMap.put(this.partition, new int[]{2});
            hashMap.put(topicPartition, new int[]{0, 1, 2});
            List<KafkaSpoutMessageId> pollAndEmit3 = SpoutWithMockedConsumerSetupHelper.pollAndEmit(kafkaSpout, this.consumerMock, 4, this.collectorMock, hashMap);
            Time.advanceTime(2500L);
            kafkaSpout.nextTuple();
            ((KafkaConsumer) Mockito.verify(this.consumerMock)).commitSync((Map) this.commitCapture.capture());
            Map map = (Map) this.commitCapture.getValue();
            Assert.assertThat(map.keySet(), CoreMatchers.is(Collections.singleton(this.partition)));
            Assert.assertThat("The first partition should have committed up to the first retriable tuple that is not missing", Long.valueOf(((OffsetAndMetadata) map.get(this.partition)).offset()), CoreMatchers.is(2L));
            Iterator<KafkaSpoutMessageId> it = pollAndEmit3.iterator();
            while (it.hasNext()) {
                kafkaSpout.ack(it.next());
            }
            Time.advanceTime(2500L);
            kafkaSpout.nextTuple();
            ((KafkaConsumer) Mockito.verify(this.consumerMock, Mockito.times(2))).commitSync((Map) this.commitCapture.capture());
            Map map2 = (Map) this.commitCapture.getValue();
            Assert.assertThat(map2, Matchers.hasKey(this.partition));
            Assert.assertThat(map2, Matchers.hasKey(topicPartition));
            Assert.assertThat(Long.valueOf(((OffsetAndMetadata) map2.get(this.partition)).offset()), CoreMatchers.is(3L));
            Assert.assertThat(Long.valueOf(((OffsetAndMetadata) map2.get(topicPartition)).offset()), CoreMatchers.is(3L));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(this.spoutConfig, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
            List<KafkaSpoutMessageId> pollAndEmit = SpoutWithMockedConsumerSetupHelper.pollAndEmit(kafkaSpout, this.consumerMock, 3, this.collectorMock, this.partition, 0, 1, 2);
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            kafkaSpout.fail(pollAndEmit.get(0));
            kafkaSpout.fail(pollAndEmit.get(2));
            Time.advanceTime(50L);
            Iterator<KafkaSpoutMessageId> it = SpoutWithMockedConsumerSetupHelper.pollAndEmit(kafkaSpout, this.consumerMock, 1, this.collectorMock, this.partition, 2).iterator();
            while (it.hasNext()) {
                kafkaSpout.ack(it.next());
            }
            Time.advanceTime(2500L);
            kafkaSpout.nextTuple();
            ((KafkaConsumer) Mockito.verify(this.consumerMock)).commitSync((Map) this.commitCapture.capture());
            Map map = (Map) this.commitCapture.getValue();
            Assert.assertThat(map.keySet(), CoreMatchers.is(Collections.singleton(this.partition)));
            Assert.assertThat("The first partition should have committed the missing offset, but no further since the next tuple is pending", Long.valueOf(((OffsetAndMetadata) map.get(this.partition)).offset()), CoreMatchers.is(1L));
            kafkaSpout.ack(pollAndEmit.get(1));
            Time.advanceTime(2500L);
            kafkaSpout.nextTuple();
            ((KafkaConsumer) Mockito.verify(this.consumerMock, Mockito.times(2))).commitSync((Map) this.commitCapture.capture());
            Map map2 = (Map) this.commitCapture.getValue();
            Assert.assertThat(map2.keySet(), CoreMatchers.is(Collections.singleton(this.partition)));
            Assert.assertThat("The first partition should have committed all offsets", Long.valueOf(((OffsetAndMetadata) map2.get(this.partition)).offset()), CoreMatchers.is(3L));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCommitTupleAfterCompactionGap() {
        Time.SimulatedTime simulatedTime = new Time.SimulatedTime();
        Throwable th = null;
        try {
            KafkaSpout kafkaSpout = SpoutWithMockedConsumerSetupHelper.setupSpout(this.spoutConfig, this.conf, this.contextMock, this.collectorMock, this.consumerMock, this.partition);
            List<KafkaSpoutMessageId> pollAndEmit = SpoutWithMockedConsumerSetupHelper.pollAndEmit(kafkaSpout, this.consumerMock, 1, this.collectorMock, this.partition, 0);
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            List<KafkaSpoutMessageId> pollAndEmit2 = SpoutWithMockedConsumerSetupHelper.pollAndEmit(kafkaSpout, this.consumerMock, 1, this.collectorMock, this.partition, 2);
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
            kafkaSpout.ack(pollAndEmit.get(0));
            Time.advanceTime(2500L);
            kafkaSpout.nextTuple();
            ((KafkaConsumer) Mockito.verify(this.consumerMock)).commitSync((Map) this.commitCapture.capture());
            Map map = (Map) this.commitCapture.getValue();
            Assert.assertThat(map.keySet(), CoreMatchers.is(Collections.singleton(this.partition)));
            Assert.assertThat("The consumer should have committed the offset before the gap", Long.valueOf(((OffsetAndMetadata) map.get(this.partition)).offset()), CoreMatchers.is(1L));
            Mockito.reset(new KafkaConsumer[]{this.consumerMock});
            kafkaSpout.ack(pollAndEmit2.get(0));
            Time.advanceTime(2500L);
            kafkaSpout.nextTuple();
            ((KafkaConsumer) Mockito.verify(this.consumerMock)).commitSync((Map) this.commitCapture.capture());
            Map map2 = (Map) this.commitCapture.getValue();
            Assert.assertThat(map2.keySet(), CoreMatchers.is(Collections.singleton(this.partition)));
            Assert.assertThat("The consumer should have committed the offset after the gap, since offset 1 wasn't emitted and both 0 and 2 are acked", Long.valueOf(((OffsetAndMetadata) map2.get(this.partition)).offset()), CoreMatchers.is(3L));
            if (simulatedTime != null) {
                if (0 == 0) {
                    simulatedTime.close();
                    return;
                }
                try {
                    simulatedTime.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simulatedTime != null) {
                if (0 != 0) {
                    try {
                        simulatedTime.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simulatedTime.close();
                }
            }
            throw th3;
        }
    }
}
