package com.datatorrent.contrib.kafka;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.contrib.aerospike.AerospikeTestUtils;
import com.datatorrent.contrib.hbase.HBasePOJOPutOperatorTest;
import com.datatorrent.contrib.memcache.MemcachePOJOOperatorTest;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.partitioner.StatelessPartitionerTest;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.stram.StramLocalCluster;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaInputOperatorTest.class */
public class KafkaInputOperatorTest extends KafkaOperatorTestBase {
    static CountDownLatch latch;

    @Rule
    public TestMeta testMeta = new TestMeta();
    static final Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
    static AtomicInteger tupleCount = new AtomicInteger();
    static boolean isSuicide = false;
    static int suicideTrigger = AerospikeTestUtils.PORT;

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaInputOperatorTest$CollectorInputPort.class */
    public static class CollectorInputPort<T> extends DefaultInputPort<T> {
        private int k = 0;

        public CollectorInputPort(String str, Operator operator) {
        }

        public void process(T t) {
            if (KafkaInputOperatorTest.isSuicide) {
                int i = this.k;
                this.k = i + 1;
                if (i == KafkaInputOperatorTest.suicideTrigger) {
                    KafkaInputOperatorTest.isSuicide = false;
                    throw new RuntimeException();
                }
            }
            if (!t.equals("END_TUPLE")) {
                KafkaInputOperatorTest.tupleCount.incrementAndGet();
            } else if (KafkaInputOperatorTest.latch != null) {
                KafkaInputOperatorTest.latch.countDown();
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaInputOperatorTest$CollectorModule.class */
    public static class CollectorModule<T> extends BaseOperator {
        public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<>("myInput", this);
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaInputOperatorTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        String baseDir;
        String recoveryDir;
        KafkaSinglePortStringInputOperator operator;
        CollectorTestSink<Object> sink;
        Context.OperatorContext context;

        protected void starting(Description description) {
            this.baseDir = "target/" + description.getClassName() + "/" + description.getMethodName();
            this.recoveryDir = "recovery";
            try {
                FileUtils.deleteDirectory(new File(this.baseDir, "recovery"));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void testKafkaInputOperator(int i, int i2, KafkaConsumer kafkaConsumer, boolean z, boolean z2) throws Exception {
        latch = new CountDownLatch(1);
        KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(KafkaOperatorTestBase.TEST_TOPIC);
        kafkaTestProducer.setSendCount(i2);
        new Thread(kafkaTestProducer).start();
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        KafkaSinglePortStringInputOperator addOperator = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class);
        if (isSuicide) {
            dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
            dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent("target/ck", new Configuration()));
            addOperator.setMaxTuplesPerWindow(500);
        }
        if (z2) {
            addOperator.setWindowDataManager(new FSWindowDataManager());
        }
        kafkaConsumer.setTopic(KafkaOperatorTestBase.TEST_TOPIC);
        addOperator.setConsumer(kafkaConsumer);
        kafkaConsumer.setCacheSize(5000);
        if (z) {
            addOperator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
        }
        dag.addStream("Kafka message", addOperator.outputPort, dag.addOperator("TestMessageCollector", new CollectorModule()).inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        controller.runAsync();
        Assert.assertTrue("TIMEOUT: 30s ", latch.await(300000L, TimeUnit.MILLISECONDS));
        Assert.assertTrue("Expected count >= " + i2 + "; Actual count " + tupleCount.intValue(), i2 <= tupleCount.intValue());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(tupleCount.intValue())));
        kafkaTestProducer.close();
        controller.shutdown();
    }

    @Test
    public void testKafkaInputOperator_Highlevel() throws Exception {
        Properties properties = new Properties();
        properties.put("group.id", "group1");
        HighlevelKafkaConsumer highlevelKafkaConsumer = new HighlevelKafkaConsumer(properties);
        highlevelKafkaConsumer.setInitialOffset("earliest");
        testKafkaInputOperator(MemcachePOJOOperatorTest.TUPLE_SIZE, 10000, highlevelKafkaConsumer, true, false);
    }

    @Test
    public void testKafkaInputOperator_Simple() throws Exception {
        SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer();
        simpleKafkaConsumer.setInitialOffset("earliest");
        testKafkaInputOperator(MemcachePOJOOperatorTest.TUPLE_SIZE, 10000, simpleKafkaConsumer, true, false);
    }

    @Test
    public void testKafkaInputOperator_SimpleSuicide() throws Exception {
        SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer();
        simpleKafkaConsumer.setInitialOffset("earliest");
        isSuicide = true;
        testKafkaInputOperator(MemcachePOJOOperatorTest.TUPLE_SIZE, 10000, simpleKafkaConsumer, true, false);
    }

    @Test
    public void testKafkaInputOperator_Simple_Idempotent() throws Exception {
        SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer();
        simpleKafkaConsumer.setInitialOffset("earliest");
        testKafkaInputOperator(MemcachePOJOOperatorTest.TUPLE_SIZE, 10000, simpleKafkaConsumer, true, true);
    }

    @Test
    public void testKafkaInputOperator_Invalid() throws Exception {
        try {
            testKafkaInputOperator(MemcachePOJOOperatorTest.TUPLE_SIZE, 10000, new SimpleKafkaConsumer(), false, false);
        } catch (Exception e) {
            Assert.assertEquals("Error creating local cluster", e.getMessage());
        }
    }

    @Override // com.datatorrent.contrib.kafka.KafkaOperatorTestBase
    @Before
    public void beforeTest() {
        tupleCount.set(0);
        File file = new File("target", "ck");
        File file2 = new File("target" + StramLocalCluster.class.getName());
        try {
            try {
                FileUtils.deleteQuietly(file);
                FileUtils.deleteQuietly(file2);
                super.beforeTest();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            super.beforeTest();
            throw th;
        }
    }

    @Test
    public void testRecoveryAndIdempotency() throws Exception {
        latch = new CountDownLatch(50);
        KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(KafkaOperatorTestBase.TEST_TOPIC);
        kafkaTestProducer.setSendCount(HBasePOJOPutOperatorTest.WINDOW_SIZE);
        new Thread(kafkaTestProducer).start();
        KafkaSinglePortStringInputOperator createAndDeployOperator = createAndDeployOperator(true);
        latch.await(4000L, TimeUnit.MILLISECONDS);
        createAndDeployOperator.beginWindow(1L);
        createAndDeployOperator.emitTuples();
        createAndDeployOperator.endWindow();
        createAndDeployOperator.beginWindow(2L);
        createAndDeployOperator.emitTuples();
        createAndDeployOperator.endWindow();
        this.testMeta.sink.collectedTuples.clear();
        createAndDeployOperator.teardown();
        createAndDeployOperator.deactivate();
        KafkaSinglePortStringInputOperator createAndDeployOperator2 = createAndDeployOperator(true);
        Assert.assertEquals("largest recovery window", 2L, createAndDeployOperator2.getWindowDataManager().getLargestCompletedWindow());
        createAndDeployOperator2.beginWindow(1L);
        createAndDeployOperator2.emitTuples();
        createAndDeployOperator2.endWindow();
        createAndDeployOperator2.beginWindow(2L);
        createAndDeployOperator2.emitTuples();
        createAndDeployOperator2.endWindow();
        latch.await(3000L, TimeUnit.MILLISECONDS);
        createAndDeployOperator2.beginWindow(3L);
        createAndDeployOperator2.emitTuples();
        createAndDeployOperator2.endWindow();
        Assert.assertEquals("Total messages collected ", HBasePOJOPutOperatorTest.WINDOW_SIZE, this.testMeta.sink.collectedTuples.size());
        this.testMeta.sink.collectedTuples.clear();
        createAndDeployOperator2.teardown();
        createAndDeployOperator2.deactivate();
    }

    @Test
    public void testRecoveryAndExactlyOnce() throws Exception {
        latch = new CountDownLatch(50);
        KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(KafkaOperatorTestBase.TEST_TOPIC);
        kafkaTestProducer.setSendCount(HBasePOJOPutOperatorTest.WINDOW_SIZE);
        new Thread(kafkaTestProducer).start();
        KafkaSinglePortStringInputOperator createAndDeployOperator = createAndDeployOperator(false);
        latch.await(4000L, TimeUnit.MILLISECONDS);
        createAndDeployOperator.beginWindow(1L);
        createAndDeployOperator.emitTuples();
        createAndDeployOperator.endWindow();
        createAndDeployOperator.beginWindow(2L);
        createAndDeployOperator.emitTuples();
        createAndDeployOperator.endWindow();
        createAndDeployOperator.checkpointed(2L);
        createAndDeployOperator.committed(2L);
        Map map = createAndDeployOperator.offsetStats;
        int size = this.testMeta.sink.collectedTuples.size();
        this.testMeta.sink.collectedTuples.clear();
        createAndDeployOperator.teardown();
        createAndDeployOperator.deactivate();
        KafkaSinglePortStringInputOperator createOperator = createOperator(false);
        createOperator.offsetStats = map;
        createOperator.setup(this.testMeta.context);
        createOperator.activate(this.testMeta.context);
        latch.await(4000L, TimeUnit.MILLISECONDS);
        createOperator.beginWindow(3L);
        createOperator.emitTuples();
        createOperator.endWindow();
        createOperator.beginWindow(4L);
        createOperator.emitTuples();
        createOperator.endWindow();
        latch.await(3000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals("Total messages collected ", (HBasePOJOPutOperatorTest.WINDOW_SIZE - size) + 1, this.testMeta.sink.collectedTuples.size());
        this.testMeta.sink.collectedTuples.clear();
        createOperator.teardown();
        createOperator.deactivate();
    }

    private KafkaSinglePortStringInputOperator createOperator(boolean z) {
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
        defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, this.testMeta.baseDir);
        this.testMeta.context = new OperatorContextTestHelper.TestIdOperatorContext(1, defaultAttributeMap);
        this.testMeta.operator = new KafkaSinglePortStringInputOperator();
        SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer();
        simpleKafkaConsumer.setTopic(KafkaOperatorTestBase.TEST_TOPIC);
        simpleKafkaConsumer.setInitialOffset("earliest");
        if (z) {
            FSWindowDataManager fSWindowDataManager = new FSWindowDataManager();
            fSWindowDataManager.setStatePath(this.testMeta.recoveryDir);
            this.testMeta.operator.setWindowDataManager(fSWindowDataManager);
        }
        this.testMeta.operator.setConsumer(simpleKafkaConsumer);
        this.testMeta.operator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
        this.testMeta.operator.setMaxTuplesPerWindow(500);
        Collection definePartitions = this.testMeta.operator.definePartitions(new LinkedList(), new StatelessPartitionerTest.PartitioningContextImpl((List) null, 0));
        Assert.assertEquals(1L, definePartitions.size());
        KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator = (KafkaSinglePortStringInputOperator) ((Partitioner.Partition) definePartitions.iterator().next()).getPartitionedInstance();
        this.testMeta.sink = new CollectorTestSink<>();
        this.testMeta.operator.outputPort.setSink(this.testMeta.sink);
        kafkaSinglePortStringInputOperator.outputPort.setSink(this.testMeta.sink);
        return kafkaSinglePortStringInputOperator;
    }

    private KafkaSinglePortStringInputOperator createAndDeployOperator(boolean z) {
        KafkaSinglePortStringInputOperator createOperator = createOperator(z);
        createOperator.setup(this.testMeta.context);
        createOperator.activate(this.testMeta.context);
        return createOperator;
    }

    @Test
    public void testMaxTotalSize() throws InterruptedException {
        latch = new CountDownLatch(1);
        KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(KafkaOperatorTestBase.TEST_TOPIC);
        kafkaTestProducer.setSendCount(HBasePOJOPutOperatorTest.WINDOW_SIZE);
        Thread thread = new Thread(kafkaTestProducer);
        thread.start();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, this.testMeta.baseDir);
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, defaultAttributeMap);
        KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator = new KafkaSinglePortStringInputOperator();
        SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer();
        simpleKafkaConsumer.setTopic(KafkaOperatorTestBase.TEST_TOPIC);
        simpleKafkaConsumer.setInitialOffset("earliest");
        kafkaSinglePortStringInputOperator.setConsumer(simpleKafkaConsumer);
        kafkaSinglePortStringInputOperator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
        kafkaSinglePortStringInputOperator.setMaxTotalMsgSizePerWindow(500);
        Collection definePartitions = kafkaSinglePortStringInputOperator.definePartitions(new LinkedList(), new StatelessPartitionerTest.PartitioningContextImpl((List) null, 0));
        Assert.assertEquals(1L, definePartitions.size());
        KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator2 = (KafkaSinglePortStringInputOperator) ((Partitioner.Partition) definePartitions.iterator().next()).getPartitionedInstance();
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        kafkaSinglePortStringInputOperator2.outputPort.setSink(collectorTestSink);
        kafkaSinglePortStringInputOperator2.setup(testIdOperatorContext);
        kafkaSinglePortStringInputOperator2.activate(testIdOperatorContext);
        latch.await(4000L, TimeUnit.MILLISECONDS);
        kafkaSinglePortStringInputOperator2.beginWindow(1L);
        kafkaSinglePortStringInputOperator2.emitTuples();
        kafkaSinglePortStringInputOperator2.endWindow();
        thread.join();
        kafkaSinglePortStringInputOperator2.deactivate();
        kafkaSinglePortStringInputOperator2.teardown();
        int i = 0;
        Iterator it = collectorTestSink.collectedTuples.iterator();
        while (it.hasNext()) {
            i += ((String) it.next()).getBytes().length;
        }
        Assert.assertTrue("Total emitted size comparison", i < 500);
    }

    @Test
    public void testZookeeper() throws Exception {
        latch = new CountDownLatch(50);
        this.testMeta.operator = new KafkaSinglePortStringInputOperator();
        SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer();
        simpleKafkaConsumer.setTopic(KafkaOperatorTestBase.TEST_TOPIC);
        this.testMeta.operator.setConsumer(simpleKafkaConsumer);
        this.testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182/chroot/dir;cluster2::node4:2181");
        latch.await(500L, TimeUnit.MILLISECONDS);
        Assert.assertEquals("Total size of clusters ", 2L, this.testMeta.operator.getConsumer().zookeeperMap.size());
        Assert.assertEquals("Connection url for cluster1 ", "node0,node1,node2:2181,node3:2182/chroot/dir", this.testMeta.operator.getConsumer().zookeeperMap.get("cluster1").iterator().next());
        Assert.assertEquals("Connection url for cluster 2 ", "node4:2181", this.testMeta.operator.getConsumer().zookeeperMap.get("cluster2").iterator().next());
    }
}
