package com.datatorrent.contrib.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.Pair;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.class */
public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase {
    private static final int maxTuple = 40;
    private static CountDownLatch latch;
    private static final Logger logger = LoggerFactory.getLogger(KafkaExactlyOnceOutputOperatorTest.class);
    private static boolean isRestarted = false;

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest$SimpleKafkaExactOnceOutputOperator.class */
    public static class SimpleKafkaExactOnceOutputOperator extends AbstractExactlyOnceKafkaOutputOperator<String, String, String> {
        protected int compareToLastMsg(Pair<String, String> pair, Pair<byte[], byte[]> pair2) {
            return Integer.parseInt((String) pair.first) - Integer.parseInt(new String((byte[]) pair2.first));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public Pair<String, String> tupleToKeyValue(String str) {
            return new Pair<>(str.split("###")[0], str.split("###")[1]);
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest$StringGeneratorInputOperator.class */
    public static class StringGeneratorInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext> {
        public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
        private final transient ArrayBlockingQueue<String> stringBuffer = new ArrayBlockingQueue<>(1024);
        private volatile Thread dataGeneratorThread;

        public void beginWindow(long j) {
        }

        public void endWindow() {
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }

        public void activate(Context.OperatorContext operatorContext) {
            this.dataGeneratorThread = new Thread("String Generator") { // from class: com.datatorrent.contrib.kafka.KafkaExactlyOnceOutputOperatorTest.StringGeneratorInputOperator.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    int i = 0;
                    while (StringGeneratorInputOperator.this.dataGeneratorThread != null && i < KafkaExactlyOnceOutputOperatorTest.maxTuple) {
                        try {
                            i++;
                            StringGeneratorInputOperator.this.stringBuffer.put(i + "###testString " + i);
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                    StringGeneratorInputOperator.this.stringBuffer.put("41###END_TUPLE");
                }
            };
            this.dataGeneratorThread.start();
        }

        public void deactivate() {
            this.dataGeneratorThread = null;
        }

        public void emitTuples() {
            int size = this.stringBuffer.size();
            while (true) {
                int i = size;
                size--;
                if (i <= 0) {
                    return;
                }
                if (size == 20 && !KafkaExactlyOnceOutputOperatorTest.isRestarted) {
                    boolean unused = KafkaExactlyOnceOutputOperatorTest.isRestarted = true;
                    throw new RuntimeException();
                }
                this.outputPort.emit(this.stringBuffer.poll());
            }
        }
    }

    @Test
    public void testKafkaExactOnceOutputOperator() throws Exception {
        latch = new CountDownLatch(maxTuple);
        KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer("topic1");
        kafkaTestConsumer.setLatch(latch);
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        StringGeneratorInputOperator addOperator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
        SimpleKafkaExactOnceOutputOperator addOperator2 = dag.addOperator("Kafka message producer", SimpleKafkaExactOnceOutputOperator.class);
        Properties properties = new Properties();
        properties.setProperty("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("metadata.broker.list", "localhost:9092");
        properties.setProperty("producer.type", "async");
        properties.setProperty("queue.buffering.max.ms", "200");
        properties.setProperty("queue.buffering.max.messages", "10");
        properties.setProperty("batch.num.messages", "5");
        addOperator2.setConfigProperties(properties);
        addOperator2.setTopic("topic1");
        dag.addStream("Kafka message", addOperator.outputPort, addOperator2.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        Executors.newFixedThreadPool(1).submit(kafkaTestConsumer).get(30L, TimeUnit.SECONDS);
        controller.shutdown();
        Assert.assertEquals("Number of emitted tuples", 40L, kafkaTestConsumer.holdingBuffer.size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(kafkaTestConsumer.holdingBuffer.size())));
        Assert.assertEquals("First tuple", "testString 1", kafkaTestConsumer.getMessage(kafkaTestConsumer.holdingBuffer.peek()));
        kafkaTestConsumer.close();
    }
}
