package org.apache.storm.kafka.spout.trident;

import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.class */
public class KafkaTridentSpoutEmitterEmitTest {

    @Captor
    public ArgumentCaptor<List<Object>> emitCaptor;

    @Mock
    public TopologyContext topologyContextMock;

    @Mock
    public TridentCollector collectorMock = (TridentCollector) Mockito.mock(TridentCollector.class);
    private final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
    private final TopicPartition partition = new TopicPartition("test", 0);
    private final String topologyId = "topologyId";
    private final long firstOffsetInKafka = 0;
    private final int recordsInKafka = 100;
    private final long lastOffsetInKafka = 99;
    private final long startTimeStamp = 1557214606103L;

    @BeforeEach
    public void setUp() {
        Mockito.when(this.topologyContextMock.getStormId()).thenReturn("topologyId");
        this.consumer.assign(Collections.singleton(this.partition));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.partition, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(this.partition, 100L));
        SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 0L, 100).forEach(consumerRecord -> {
            this.consumer.addRecord(consumerRecord);
        });
    }

    private KafkaTridentSpoutEmitter<String, String> createEmitter(Consumer<String, String> consumer, FirstPollOffsetStrategy firstPollOffsetStrategy) {
        return new KafkaTridentSpoutEmitter<>(SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1).setRecordTranslator(consumerRecord -> {
            return new Values(new Object[]{Long.valueOf(consumerRecord.offset())});
        }, new Fields(new String[]{"offset"})).setFirstPollOffsetStrategy(firstPollOffsetStrategy).setPollTimeoutMs(1L).setStartTimeStamp(1557214606103L).build(), this.topologyContextMock, map -> {
            return consumer;
        }, new TopicAssigner());
    }

    private KafkaTridentSpoutEmitter<String, String> createEmitter(FirstPollOffsetStrategy firstPollOffsetStrategy) {
        return createEmitter(this.consumer, firstPollOffsetStrategy);
    }

    private Map<String, Object> doEmitNewBatchTest(FirstPollOffsetStrategy firstPollOffsetStrategy, TridentCollector tridentCollector, TopicPartition topicPartition, Map<String, Object> map) {
        return createEmitter(firstPollOffsetStrategy).emitPartitionBatchNew(new TransactionAttempt(10L, 0), tridentCollector, new KafkaTridentSpoutTopicPartition(topicPartition), map);
    }

    @Test
    public void testEmitNewBatchWithNullMetaUncommittedEarliest() {
        Map<String, Object> doEmitNewBatchTest = doEmitNewBatchTest(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, this.collectorMock, this.partition, null);
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.times(100))).emit((List) this.emitCaptor.capture());
        List allValues = this.emitCaptor.getAllValues();
        Assert.assertThat(((List) allValues.get(0)).get(0), Matchers.is(0L));
        Assert.assertThat(((List) allValues.get(allValues.size() - 1)).get(0), Matchers.is(99L));
        KafkaTridentSpoutBatchMetadata fromMap = KafkaTridentSpoutBatchMetadata.fromMap(doEmitNewBatchTest);
        Assert.assertThat("The batch should start at the first offset of the polled records", Long.valueOf(fromMap.getFirstOffset()), Matchers.is(0L));
        Assert.assertThat("The batch should end at the last offset of the polled messages", Long.valueOf(fromMap.getLastOffset()), Matchers.is(99L));
    }

    @Test
    public void testEmitNewBatchWithNullMetaUncommittedLatest() {
        Map<String, Object> doEmitNewBatchTest = doEmitNewBatchTest(FirstPollOffsetStrategy.UNCOMMITTED_LATEST, this.collectorMock, this.partition, null);
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(ArgumentMatchers.anyList());
        KafkaTridentSpoutBatchMetadata fromMap = KafkaTridentSpoutBatchMetadata.fromMap(doEmitNewBatchTest);
        Assert.assertThat("The batch should start at the first offset of the polled records", Long.valueOf(fromMap.getFirstOffset()), Matchers.is(99L));
        Assert.assertThat("The batch should end at the last offset of the polled messages", Long.valueOf(fromMap.getLastOffset()), Matchers.is(99L));
    }

    @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST", "TIMESTAMP"})
    @ParameterizedTest
    public void testEmitNewBatchWithPreviousMeta(FirstPollOffsetStrategy firstPollOffsetStrategy) {
        Map<String, Object> doEmitNewBatchTest = doEmitNewBatchTest(firstPollOffsetStrategy, this.collectorMock, this.partition, new KafkaTridentSpoutBatchMetadata(0L, 50 - 1, "topologyId").toMap());
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.times(50))).emit((List) this.emitCaptor.capture());
        List allValues = this.emitCaptor.getAllValues();
        Assert.assertThat(((List) allValues.get(0)).get(0), Matchers.is(50L));
        Assert.assertThat(((List) allValues.get(allValues.size() - 1)).get(0), Matchers.is(99L));
        KafkaTridentSpoutBatchMetadata fromMap = KafkaTridentSpoutBatchMetadata.fromMap(doEmitNewBatchTest);
        Assert.assertThat("The batch should start at the first offset of the polled records", Long.valueOf(fromMap.getFirstOffset()), Matchers.is(50L));
        Assert.assertThat("The batch should end at the last offset of the polled messages", Long.valueOf(fromMap.getLastOffset()), Matchers.is(99L));
    }

    @Test
    public void testEmitEmptyBatches() throws Exception {
        KafkaTridentSpoutEmitter<String, String> createEmitter = createEmitter(FirstPollOffsetStrategy.LATEST);
        KafkaTridentSpoutTopicPartition kafkaTridentSpoutTopicPartition = new KafkaTridentSpoutTopicPartition(this.partition);
        Map map = null;
        for (int i = 0; i < 10; i++) {
            map = createEmitter.emitPartitionBatchNew(new TransactionAttempt(Long.valueOf(i), 0), this.collectorMock, kafkaTridentSpoutTopicPartition, map);
            KafkaTridentSpoutBatchMetadata fromMap = KafkaTridentSpoutBatchMetadata.fromMap(map);
            Assert.assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", Long.valueOf(fromMap.getFirstOffset()), Matchers.is(99L));
            Assert.assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", Long.valueOf(fromMap.getLastOffset()), Matchers.is(99L));
        }
        List createRecords = SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 100L, 10);
        MockConsumer<String, String> mockConsumer = this.consumer;
        mockConsumer.getClass();
        createRecords.forEach(mockConsumer::addRecord);
        Map emitPartitionBatchNew = createEmitter.emitPartitionBatchNew(new TransactionAttempt(11L, 0), this.collectorMock, kafkaTridentSpoutTopicPartition, map);
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.times(10))).emit((List) this.emitCaptor.capture());
        List allValues = this.emitCaptor.getAllValues();
        Assert.assertThat(((List) allValues.get(0)).get(0), Matchers.is(100L));
        Assert.assertThat(((List) allValues.get(allValues.size() - 1)).get(0), Matchers.is(Long.valueOf((100 + 10) - 1)));
        KafkaTridentSpoutBatchMetadata fromMap2 = KafkaTridentSpoutBatchMetadata.fromMap(emitPartitionBatchNew);
        Assert.assertThat("The batch should start at the first offset of the polled records", Long.valueOf(fromMap2.getFirstOffset()), Matchers.is(100L));
        Assert.assertThat("The batch should end at the last offset of the polled messages", Long.valueOf(fromMap2.getLastOffset()), Matchers.is(Long.valueOf((100 + 10) - 1)));
    }

    @Test
    public void testReEmitBatch() {
        createEmitter(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST).reEmitPartitionBatch(new TransactionAttempt(10L, 0), this.collectorMock, new KafkaTridentSpoutTopicPartition(this.partition), new KafkaTridentSpoutBatchMetadata(50L, (50 + 10) - 1, "topologyId").toMap());
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.times(10))).emit((List) this.emitCaptor.capture());
        List allValues = this.emitCaptor.getAllValues();
        Assert.assertThat(((List) allValues.get(0)).get(0), Matchers.is(50L));
        Assert.assertThat(((List) allValues.get(allValues.size() - 1)).get(0), Matchers.is(Long.valueOf((50 + 10) - 1)));
    }

    @Test
    public void testReEmitBatchForOldTopologyWhenIgnoringCommittedOffsets() {
        KafkaTridentSpoutBatchMetadata kafkaTridentSpoutBatchMetadata = new KafkaTridentSpoutBatchMetadata(0L, 99L, "a new storm id");
        createEmitter(FirstPollOffsetStrategy.EARLIEST).reEmitPartitionBatch(new TransactionAttempt(10L, 0), this.collectorMock, new KafkaTridentSpoutTopicPartition(this.partition), kafkaTridentSpoutBatchMetadata.toMap());
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(ArgumentMatchers.anyList());
    }

    @Test
    public void testEmitEmptyFirstBatch() {
        KafkaTridentSpoutBatchMetadata kafkaTridentSpoutBatchMetadata = new KafkaTridentSpoutBatchMetadata(50L, (50 + 10) - 1, "an old topology");
        KafkaTridentSpoutEmitter<String, String> createEmitter = createEmitter(FirstPollOffsetStrategy.LATEST);
        TransactionAttempt transactionAttempt = new TransactionAttempt(0L, 0);
        KafkaTridentSpoutTopicPartition kafkaTridentSpoutTopicPartition = new KafkaTridentSpoutTopicPartition(this.partition);
        Map emitPartitionBatchNew = createEmitter.emitPartitionBatchNew(transactionAttempt, this.collectorMock, kafkaTridentSpoutTopicPartition, kafkaTridentSpoutBatchMetadata.toMap());
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(ArgumentMatchers.anyList());
        KafkaTridentSpoutBatchMetadata fromMap = KafkaTridentSpoutBatchMetadata.fromMap(emitPartitionBatchNew);
        Assert.assertThat(Long.valueOf(fromMap.getFirstOffset()), Matchers.is(99L));
        Assert.assertThat(Long.valueOf(fromMap.getLastOffset()), Matchers.is(99L));
        List createRecords = SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 100L, 10);
        MockConsumer<String, String> mockConsumer = this.consumer;
        mockConsumer.getClass();
        createRecords.forEach(mockConsumer::addRecord);
        Map emitPartitionBatchNew2 = createEmitter.emitPartitionBatchNew(transactionAttempt, this.collectorMock, kafkaTridentSpoutTopicPartition, emitPartitionBatchNew);
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.times(10))).emit((List) this.emitCaptor.capture());
        List allValues = this.emitCaptor.getAllValues();
        Assert.assertThat(((List) allValues.get(0)).get(0), Matchers.is(100L));
        Assert.assertThat(((List) allValues.get(allValues.size() - 1)).get(0), Matchers.is(Long.valueOf((100 + 10) - 1)));
        KafkaTridentSpoutBatchMetadata fromMap2 = KafkaTridentSpoutBatchMetadata.fromMap(emitPartitionBatchNew2);
        Assert.assertThat("The batch should start at the first offset of the polled records", Long.valueOf(fromMap2.getFirstOffset()), Matchers.is(100L));
        Assert.assertThat("The batch should end at the last offset of the polled messages", Long.valueOf(fromMap2.getLastOffset()), Matchers.is(Long.valueOf((100 + 10) - 1)));
    }

    @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST", "TIMESTAMP"})
    @ParameterizedTest
    public void testUnconditionalStrategyWhenSpoutWorkerIsRestarted(FirstPollOffsetStrategy firstPollOffsetStrategy) {
        Map emitPartitionBatchNew = createEmitter(firstPollOffsetStrategy).emitPartitionBatchNew(new TransactionAttempt(0L, 0), this.collectorMock, new KafkaTridentSpoutTopicPartition(this.partition), new KafkaTridentSpoutBatchMetadata(20L, (20 + 10) - 1, "topologyId").toMap());
        long j = 20 + 10;
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.times(100 - 30))).emit((List) this.emitCaptor.capture());
        List allValues = this.emitCaptor.getAllValues();
        Assert.assertThat(((List) allValues.get(0)).get(0), Matchers.is(Long.valueOf(j)));
        Assert.assertThat(((List) allValues.get(allValues.size() - 1)).get(0), Matchers.is(99L));
        KafkaTridentSpoutBatchMetadata fromMap = KafkaTridentSpoutBatchMetadata.fromMap(emitPartitionBatchNew);
        Assert.assertThat("The batch should start at the first offset of the polled records", Long.valueOf(fromMap.getFirstOffset()), Matchers.is(Long.valueOf(j)));
        Assert.assertThat("The batch should end at the last offset of the polled messages", Long.valueOf(fromMap.getLastOffset()), Matchers.is(99L));
    }

    @Test
    public void testEarliestStrategyWhenTopologyIsRedeployed() {
        Map emitPartitionBatchNew = createEmitter(FirstPollOffsetStrategy.EARLIEST).emitPartitionBatchNew(new TransactionAttempt(0L, 0), this.collectorMock, new KafkaTridentSpoutTopicPartition(this.partition), new KafkaTridentSpoutBatchMetadata(20L, (20 + 10) - 1, "Some older topology").toMap());
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.times(100))).emit((List) this.emitCaptor.capture());
        List allValues = this.emitCaptor.getAllValues();
        Assert.assertThat(((List) allValues.get(0)).get(0), Matchers.is(0L));
        Assert.assertThat(((List) allValues.get(allValues.size() - 1)).get(0), Matchers.is(99L));
        KafkaTridentSpoutBatchMetadata fromMap = KafkaTridentSpoutBatchMetadata.fromMap(emitPartitionBatchNew);
        Assert.assertThat("The batch should start at the first offset of the polled records", Long.valueOf(fromMap.getFirstOffset()), Matchers.is(0L));
        Assert.assertThat("The batch should end at the last offset of the polled messages", Long.valueOf(fromMap.getLastOffset()), Matchers.is(99L));
    }

    @Test
    public void testLatestStrategyWhenTopologyIsRedeployed() {
        createEmitter(FirstPollOffsetStrategy.LATEST).emitPartitionBatchNew(new TransactionAttempt(0L, 0), this.collectorMock, new KafkaTridentSpoutTopicPartition(this.partition), new KafkaTridentSpoutBatchMetadata(20L, (20 + 10) - 1, "Some older topology").toMap());
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(ArgumentMatchers.anyList());
    }

    @Test
    public void testTimeStampStrategyWhenTopologyIsRedeployed() {
        KafkaTridentSpoutBatchMetadata kafkaTridentSpoutBatchMetadata = new KafkaTridentSpoutBatchMetadata(20L, (20 + 10) - 1, "Some older topology");
        KafkaConsumer kafkaConsumer = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
        Mockito.when(kafkaConsumer.assignment()).thenReturn(Collections.singleton(this.partition));
        OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(2L, 1557214606103L);
        HashMap hashMap = new HashMap();
        hashMap.put(this.partition, offsetAndTimestamp);
        Mockito.when(kafkaConsumer.offsetsForTimes(Collections.singletonMap(this.partition, 1557214606103L))).thenReturn(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.partition, SpoutWithMockedConsumerSetupHelper.createRecords(this.partition, 2L, 100));
        Mockito.when(kafkaConsumer.poll(1L)).thenReturn(new ConsumerRecords(hashMap2));
        Map emitPartitionBatchNew = createEmitter(kafkaConsumer, FirstPollOffsetStrategy.TIMESTAMP).emitPartitionBatchNew(new TransactionAttempt(0L, 0), this.collectorMock, new KafkaTridentSpoutTopicPartition(this.partition), kafkaTridentSpoutBatchMetadata.toMap());
        ((TridentCollector) Mockito.verify(this.collectorMock, Mockito.times(100))).emit((List) this.emitCaptor.capture());
        ((KafkaConsumer) Mockito.verify(kafkaConsumer, Mockito.times(1))).seek(this.partition, 2L);
        Assert.assertThat(((List) this.emitCaptor.getAllValues().get(0)).get(0), Matchers.is(2L));
        Assert.assertThat("The batch should start at the first offset for startTimestamp", Long.valueOf(KafkaTridentSpoutBatchMetadata.fromMap(emitPartitionBatchNew).getFirstOffset()), Matchers.is(2L));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 630122060:
                if (implMethodName.equals("lambda$createEmitter$46b49885$1")) {
                    z = false;
                    break;
                }
                break;
            case 651868967:
                if (implMethodName.equals("lambda$createEmitter$44a25ad2$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/storm/kafka/spout/internal/ConsumerFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("createConsumer") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/util/Map;)Lorg/apache/kafka/clients/consumer/Consumer;") && serializedLambda.getImplClass().equals("org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/Consumer;Ljava/util/Map;)Lorg/apache/kafka/clients/consumer/Consumer;")) {
                    Consumer consumer = (Consumer) serializedLambda.getCapturedArg(0);
                    return map -> {
                        return consumer;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/storm/kafka/spout/Func") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/util/List;")) {
                    return consumerRecord -> {
                        return new Values(new Object[]{Long.valueOf(consumerRecord.offset())});
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
