package com.datatorrent.contrib.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.class */
public class KafkaOutputOperatorTest extends KafkaOperatorTestBase {
    private static final Logger logger = LoggerFactory.getLogger(KafkaOutputOperatorTest.class);
    private static int tupleCount = 0;
    private static final int maxTuple = 20;
    private static CountDownLatch latch;

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaOutputOperatorTest$StringGeneratorInputOperator.class */
    public static class StringGeneratorInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext> {
        public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
        private final transient ArrayBlockingQueue<String> stringBuffer = new ArrayBlockingQueue<>(1024);
        private volatile Thread dataGeneratorThread;

        public void beginWindow(long j) {
        }

        public void endWindow() {
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }

        public void activate(Context.OperatorContext operatorContext) {
            this.dataGeneratorThread = new Thread("String Generator") { // from class: com.datatorrent.contrib.kafka.KafkaOutputOperatorTest.StringGeneratorInputOperator.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    int i = 0;
                    while (StringGeneratorInputOperator.this.dataGeneratorThread != null && i < 20) {
                        try {
                            i++;
                            StringGeneratorInputOperator.this.stringBuffer.put("testString " + i);
                            KafkaOutputOperatorTest.access$208();
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                    StringGeneratorInputOperator.this.stringBuffer.put("END_TUPLE");
                }
            };
            this.dataGeneratorThread.start();
        }

        public void deactivate() {
            this.dataGeneratorThread = null;
        }

        public void emitTuples() {
            int size = this.stringBuffer.size();
            while (true) {
                int i = size;
                size--;
                if (i <= 0) {
                    return;
                } else {
                    this.outputPort.emit(this.stringBuffer.poll());
                }
            }
        }
    }

    @Test
    public void testKafkaOutputOperator() throws Exception {
        latch = new CountDownLatch(20);
        KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer("topic1");
        kafkaTestConsumer.setLatch(latch);
        new Thread(kafkaTestConsumer).start();
        LocalMode newInstance = LocalMode.newInstance();
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.contrib.kafka.KafkaOutputOperatorTest.1
            public void populateDAG(DAG dag, Configuration configuration) {
            }
        };
        DAG dag = newInstance.getDAG();
        StringGeneratorInputOperator addOperator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
        KafkaSinglePortOutputOperator addOperator2 = dag.addOperator("KafkaMessageProducer", KafkaSinglePortOutputOperator.class);
        Properties properties = new Properties();
        properties.setProperty("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("metadata.broker.list", "invalidhost:9092");
        properties.setProperty("producer.type", "async");
        properties.setProperty("queue.buffering.max.ms", "200");
        properties.setProperty("queue.buffering.max.messages", "10");
        properties.setProperty("batch.num.messages", "5");
        addOperator2.setConfigProperties(properties);
        addOperator2.setTopic("topic1");
        dag.addStream("Kafka message", addOperator.outputPort, addOperator2.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        Configuration configuration = new Configuration(false);
        configuration.set("dt.operator.KafkaMessageProducer.prop.configProperties(metadata.broker.list)", "localhost:9092");
        newInstance.prepareDAG(streamingApplication, configuration);
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        latch.await(15L, TimeUnit.SECONDS);
        controller.shutdown();
        Assert.assertEquals("Number of emitted tuples", tupleCount, kafkaTestConsumer.holdingBuffer.size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(kafkaTestConsumer.holdingBuffer.size())));
        Assert.assertEquals("First tuple", "testString 1", kafkaTestConsumer.getMessage(kafkaTestConsumer.holdingBuffer.peek()));
        kafkaTestConsumer.close();
    }

    @Test
    public void testPOJOKafkaOutputOperator() throws Exception {
        tupleCount = 0;
        latch = new CountDownLatch(20);
        KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer("topic1");
        kafkaTestConsumer.setLatch(latch);
        new Thread(kafkaTestConsumer).start();
        LocalMode newInstance = LocalMode.newInstance();
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.contrib.kafka.KafkaOutputOperatorTest.2
            public void populateDAG(DAG dag, Configuration configuration) {
            }
        };
        DAG dag = newInstance.getDAG();
        StringGeneratorInputOperator addOperator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
        POJOKafkaOutputOperator addOperator2 = dag.addOperator("KafkaMessageProducer", POJOKafkaOutputOperator.class);
        Properties properties = new Properties();
        properties.setProperty("serializer.class", "kafka.serializer.StringEncoder");
        properties.setProperty("producer.type", "async");
        properties.setProperty("queue.buffering.max.ms", "200");
        properties.setProperty("queue.buffering.max.messages", "10");
        addOperator2.setConfigProperties(properties);
        addOperator2.setTopic("topic1");
        addOperator2.setBrokerList("localhost:9092");
        addOperator2.setBatchSize(5);
        dag.addStream("Kafka message", addOperator.outputPort, addOperator2.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        newInstance.prepareDAG(streamingApplication, new Configuration(false));
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        latch.await(20L, TimeUnit.SECONDS);
        controller.shutdown();
        Assert.assertEquals("Number of emitted tuples", 20L, kafkaTestConsumer.holdingBuffer.size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(kafkaTestConsumer.holdingBuffer.size())));
        Assert.assertEquals("First tuple", "testString 1", kafkaTestConsumer.getMessage(kafkaTestConsumer.holdingBuffer.peek()));
        kafkaTestConsumer.close();
    }

    static /* synthetic */ int access$208() {
        int i = tupleCount;
        tupleCount = i + 1;
        return i;
    }
}
