package org.apache.storm.kafka.spout;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.KafkaUnitRule;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.class */
public abstract class KafkaSpoutAbstractTest {

    @Rule
    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
    final TopologyContext topologyContext = (TopologyContext) Mockito.mock(TopologyContext.class);
    final Map<String, Object> conf = new HashMap();
    final SpoutOutputCollector collectorMock = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    final long commitOffsetPeriodMs;
    KafkaConsumer<String, String> consumerSpy;
    KafkaSpout<String, String> spout;

    @Captor
    ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
    private Time.SimulatedTime simulatedTime;
    private KafkaSpoutConfig<String, String> spoutConfig;

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaSpoutAbstractTest(long j) {
        this.commitOffsetPeriodMs = j;
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        this.spoutConfig = createSpoutConfig();
        this.consumerSpy = createConsumerSpy();
        this.spout = new KafkaSpout<>(this.spoutConfig, createConsumerFactory());
        this.simulatedTime = new Time.SimulatedTime();
    }

    private KafkaConsumerFactory<String, String> createConsumerFactory() {
        return new KafkaConsumerFactory<String, String>() { // from class: org.apache.storm.kafka.spout.KafkaSpoutAbstractTest.1
            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
                return KafkaSpoutAbstractTest.this.consumerSpy;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConsumer<String, String> createConsumerSpy() {
        return (KafkaConsumer) Mockito.spy(new KafkaConsumerFactoryDefault().createConsumer(this.spoutConfig));
    }

    @After
    public void tearDown() throws Exception {
        this.simulatedTime.close();
    }

    abstract KafkaSpoutConfig<String, String> createSpoutConfig();

    /* JADX INFO: Access modifiers changed from: package-private */
    public void prepareSpout(int i) throws Exception {
        SingleTopicKafkaUnitSetupHelper.populateTopicData(this.kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, i);
        SingleTopicKafkaUnitSetupHelper.initializeSpout(this.spout, this.conf, this.topologyContext, this.collectorMock);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ArgumentCaptor<Object> nextTuple_verifyEmitted_ack_resetCollector(int i) {
        this.spout.nextTuple();
        ArgumentCaptor<Object> verifyMessageEmitted = verifyMessageEmitted(i);
        this.spout.ack(verifyMessageEmitted.getValue());
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        return verifyMessageEmitted;
    }

    ArgumentCaptor<Object> verifyMessageEmitted(int i) {
        ArgumentCaptor<Object> forClass = ArgumentCaptor.forClass(Object.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit((String) Matchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), (List) Matchers.eq(new Values(new Object[]{SingleTopicKafkaSpoutConfiguration.TOPIC, Integer.toString(i), Integer.toString(i)})), forClass.capture());
        return forClass;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitAndVerifyAllMessagesCommitted(long j) {
        Time.advanceTime(this.commitOffsetPeriodMs + 500);
        this.spout.nextTuple();
        verifyAllMessagesCommitted(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void verifyAllMessagesCommitted(long j) {
        ((KafkaConsumer) Mockito.verify(this.consumerSpy)).commitSync((Map) this.commitCapture.capture());
        Map map = (Map) this.commitCapture.getValue();
        MatcherAssert.assertThat("Expected commits for only one topic partition", Integer.valueOf(map.entrySet().size()), CoreMatchers.is(1));
        MatcherAssert.assertThat("Expected committed offset to cover all emitted messages", Long.valueOf(((OffsetAndMetadata) ((Map.Entry) map.entrySet().iterator().next()).getValue()).offset()), CoreMatchers.is(Long.valueOf(j)));
        Mockito.reset(new KafkaConsumer[]{this.consumerSpy});
    }
}
