package com.datatorrent.contrib.kafka;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest.class */
public class KafkaPartitionableInputOperatorTest extends KafkaOperatorTestBase {
    private int totalBrokers;
    static final Logger logger = LoggerFactory.getLogger(KafkaPartitionableInputOperatorTest.class);
    static HashMap<String, List<?>> collections = new HashMap<>();
    static AtomicInteger tupleCount = new AtomicInteger();
    static CountDownLatch latch;

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest$CollectorInputPort.class */
    public static class CollectorInputPort<T> extends DefaultInputPort<T> {
        ArrayList<T> list;
        final String id;

        public CollectorInputPort(String str, Operator operator) {
            this.id = str;
        }

        public void process(T t) {
            if (!t.equals("END_TUPLE")) {
                this.list.add(t);
                KafkaPartitionableInputOperatorTest.tupleCount.incrementAndGet();
            } else if (KafkaPartitionableInputOperatorTest.latch != null) {
                KafkaPartitionableInputOperatorTest.latch.countDown();
            }
        }

        public void setConnected(boolean z) {
            if (z) {
                HashMap<String, List<?>> hashMap = KafkaPartitionableInputOperatorTest.collections;
                String str = this.id;
                ArrayList<T> arrayList = new ArrayList<>();
                this.list = arrayList;
                hashMap.put(str, arrayList);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest$CollectorModule.class */
    public static class CollectorModule<T> extends BaseOperator {
        public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<>("myInput", this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
    public static Collection<Boolean[]> testScenario() {
        return Arrays.asList(new Boolean[]{true, false}, new Boolean[]{true, true}, new Boolean[]{false, true}, new Boolean[]{false, false});
    }

    public KafkaPartitionableInputOperatorTest(boolean z, boolean z2) {
        this.totalBrokers = 0;
        this.hasMultiCluster = z;
        this.hasMultiPartition = z2;
        this.totalBrokers = (1 + (z2 ? 1 : 0)) * (1 + (z ? 1 : 0));
    }

    @Test
    public void testPartitionableSimpleConsumerInputOperator() throws Exception {
        testPartitionableInputOperator(new SimpleKafkaConsumer());
    }

    @Test
    public void testPartitionableHighlevelConsumerInputOperator() throws Exception {
        Properties properties = new Properties();
        properties.put("group.id", "main_group");
        testPartitionableInputOperator(new HighlevelKafkaConsumer(properties));
    }

    public void testPartitionableInputOperator(KafkaConsumer kafkaConsumer) throws Exception {
        latch = new CountDownLatch(this.totalBrokers);
        KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(KafkaOperatorTestBase.TEST_TOPIC, this.hasMultiPartition, this.hasMultiCluster);
        kafkaTestProducer.setSendCount(10000);
        new Thread(kafkaTestProducer).start();
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        KafkaSinglePortStringInputOperator addOperator = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class);
        addOperator.setInitialPartitionCount(1);
        kafkaConsumer.setTopic(KafkaOperatorTestBase.TEST_TOPIC);
        kafkaConsumer.setInitialOffset("earliest");
        addOperator.setConsumer(kafkaConsumer);
        addOperator.setZookeeper("cluster1::localhost:" + TEST_ZOOKEEPER_PORT[0] + (this.hasMultiCluster ? ";cluster2::localhost:" + TEST_ZOOKEEPER_PORT[1] : ""));
        CollectorModule addOperator2 = dag.addOperator("TestMessageCollector", new CollectorModule());
        dag.addStream("Kafka message", addOperator.outputPort, addOperator2.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        controller.runAsync();
        Assert.assertTrue("TIMEOUT: 40s ", latch.await(40000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals("Collections size", 1L, collections.size());
        Assert.assertEquals("Tuple count", 10000, collections.get(addOperator2.inputPort.id).size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(collections.get(addOperator2.inputPort.id).size())));
        kafkaTestProducer.close();
        controller.shutdown();
        Thread.sleep(5000L);
    }
}
