package org.apache.storm.kafka.bolt;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.storm.Testing;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.MkTupleParam;
import org.apache.storm.tuple.Tuple;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/bolt/KafkaBoltTest.class */
public class KafkaBoltTest {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class);

    private <K, V> KafkaBolt<K, V> makeBolt(final Producer<K, V> producer) {
        KafkaBolt<K, V> kafkaBolt = new KafkaBolt<K, V>() { // from class: org.apache.storm.kafka.bolt.KafkaBoltTest.1
            protected Producer<K, V> mkProducer(Properties properties) {
                return producer;
            }
        };
        kafkaBolt.withTopicSelector("MY_TOPIC");
        return kafkaBolt;
    }

    private Tuple createTestTuple(String... strArr) {
        MkTupleParam mkTupleParam = new MkTupleParam();
        mkTupleParam.setFields(new String[]{"key", "message"});
        return Testing.testTuple(Arrays.asList(strArr), mkTupleParam);
    }

    @Test
    public void testSimple() {
        MockProducer mockProducer = new MockProducer(Cluster.empty(), false, (Partitioner) null, (Serializer) null, (Serializer) null);
        KafkaBolt makeBolt = makeBolt(mockProducer);
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        makeBolt.prepare(new HashMap(), (TopologyContext) Mockito.mock(TopologyContext.class), outputCollector);
        Tuple createTestTuple = createTestTuple("KEY", "VALUE");
        makeBolt.execute(createTestTuple);
        MatcherAssert.assertThat(Integer.valueOf(mockProducer.history().size()), CoreMatchers.is(1));
        ProducerRecord producerRecord = (ProducerRecord) mockProducer.history().get(0);
        LOG.info("GOT {} ->", producerRecord);
        LOG.info("{}, {}, {}", new Object[]{producerRecord.topic(), producerRecord.key(), producerRecord.value()});
        MatcherAssert.assertThat(producerRecord.topic(), CoreMatchers.is("MY_TOPIC"));
        MatcherAssert.assertThat(producerRecord.key(), CoreMatchers.is("KEY"));
        MatcherAssert.assertThat(producerRecord.value(), CoreMatchers.is("VALUE"));
        mockProducer.completeNext();
        ((OutputCollector) Mockito.verify(outputCollector)).ack(createTestTuple);
    }

    @Test
    public void testSimpleWithError() {
        MockProducer mockProducer = new MockProducer(Cluster.empty(), false, (Partitioner) null, (Serializer) null, (Serializer) null);
        KafkaBolt makeBolt = makeBolt(mockProducer);
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        makeBolt.prepare(new HashMap(), (TopologyContext) Mockito.mock(TopologyContext.class), outputCollector);
        Tuple createTestTuple = createTestTuple("KEY", "VALUE");
        makeBolt.execute(createTestTuple);
        MatcherAssert.assertThat(Integer.valueOf(mockProducer.history().size()), CoreMatchers.is(1));
        ProducerRecord producerRecord = (ProducerRecord) mockProducer.history().get(0);
        LOG.info("GOT {} ->", producerRecord);
        LOG.info("{}, {}, {}", new Object[]{producerRecord.topic(), producerRecord.key(), producerRecord.value()});
        MatcherAssert.assertThat(producerRecord.topic(), CoreMatchers.is("MY_TOPIC"));
        MatcherAssert.assertThat(producerRecord.key(), CoreMatchers.is("KEY"));
        MatcherAssert.assertThat(producerRecord.value(), CoreMatchers.is("VALUE"));
        KafkaException kafkaException = new KafkaException();
        mockProducer.errorNext(kafkaException);
        ((OutputCollector) Mockito.verify(outputCollector)).reportError(kafkaException);
        ((OutputCollector) Mockito.verify(outputCollector)).fail(createTestTuple);
    }

    @Test
    public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() {
        MockProducer mockProducer = new MockProducer(Cluster.empty(), false, (Partitioner) null, (Serializer) null, (Serializer) null);
        KafkaBolt makeBolt = makeBolt(mockProducer);
        PreparableCallback preparableCallback = (PreparableCallback) Mockito.mock(PreparableCallback.class);
        makeBolt.withProducerCallback(preparableCallback);
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        TopologyContext topologyContext = (TopologyContext) Mockito.mock(TopologyContext.class);
        HashMap hashMap = new HashMap();
        makeBolt.prepare(hashMap, topologyContext, outputCollector);
        ((PreparableCallback) Mockito.verify(preparableCallback)).prepare(hashMap, topologyContext);
        Tuple createTestTuple = createTestTuple("KEY", "VALUE");
        makeBolt.execute(createTestTuple);
        MatcherAssert.assertThat(Integer.valueOf(mockProducer.history().size()), CoreMatchers.is(1));
        ProducerRecord producerRecord = (ProducerRecord) mockProducer.history().get(0);
        LOG.info("GOT {} ->", producerRecord);
        LOG.info("{}, {}, {}", new Object[]{producerRecord.topic(), producerRecord.key(), producerRecord.value()});
        MatcherAssert.assertThat(producerRecord.topic(), CoreMatchers.is("MY_TOPIC"));
        MatcherAssert.assertThat(producerRecord.key(), CoreMatchers.is("KEY"));
        MatcherAssert.assertThat(producerRecord.value(), CoreMatchers.is("VALUE"));
        KafkaException kafkaException = new KafkaException();
        mockProducer.errorNext(kafkaException);
        ((PreparableCallback) Mockito.verify(preparableCallback)).onCompletion((RecordMetadata) ArgumentMatchers.any(), (Exception) ArgumentMatchers.eq(kafkaException));
        ((OutputCollector) Mockito.verify(outputCollector)).reportError(kafkaException);
        ((OutputCollector) Mockito.verify(outputCollector)).fail(createTestTuple);
    }
}
