package org.apache.apex.malhar.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.common.util.BaseOperator;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaInputOperatorTest.class */
public class KafkaInputOperatorTest extends KafkaOperatorTestBase {
    private int totalBrokers;
    private String partition;
    private static CountDownLatch latch;
    private static final Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
    private static List<String> tupleCollection = new LinkedList();
    private static boolean hasFailure = false;
    private static int failureTrigger = 3000;
    private static int k = 0;

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaInputOperatorTest$CollectorInputPort.class */
    public static class CollectorInputPort extends DefaultInputPort<byte[]> {
        public void process(byte[] bArr) {
            String str = new String(bArr);
            if (KafkaInputOperatorTest.hasFailure && KafkaInputOperatorTest.access$108() == KafkaInputOperatorTest.failureTrigger) {
                boolean unused = KafkaInputOperatorTest.hasFailure = false;
                throw new RuntimeException();
            }
            if (!str.equals(KafkaOperatorTestBase.END_TUPLE)) {
                KafkaInputOperatorTest.tupleCollection.add(str);
            } else if (KafkaInputOperatorTest.latch != null) {
                KafkaInputOperatorTest.latch.countDown();
            }
        }

        public void setConnected(boolean z) {
            if (z) {
                KafkaInputOperatorTest.tupleCollection.clear();
            }
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaInputOperatorTest$CollectorModule.class */
    public static class CollectorModule extends BaseOperator {
        public final transient CollectorInputPort inputPort = new CollectorInputPort();
    }

    @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
    public static Collection<Object[]> testScenario() {
        return Arrays.asList(new Object[]{true, false, "one_to_one"}, new Object[]{true, false, "one_to_many"}, new Object[]{true, true, "one_to_one"}, new Object[]{true, true, "one_to_many"}, new Object[]{false, true, "one_to_one"}, new Object[]{false, true, "one_to_many"}, new Object[]{false, false, "one_to_one"}, new Object[]{false, false, "one_to_many"});
    }

    public KafkaInputOperatorTest(boolean z, boolean z2, String str) {
        this.totalBrokers = 0;
        this.partition = null;
        this.hasMultiCluster = z;
        this.hasMultiPartition = z2;
        this.totalBrokers = (1 + (z2 ? 1 : 0)) * (1 + (z ? 1 : 0));
        this.partition = str;
    }

    @Test
    public void testPartitionableInputOperator() throws Exception {
        hasFailure = false;
        testInputOperator(false);
    }

    @Test
    public void testPartitionableInputOperatorWithFailure() throws Exception {
        hasFailure = true;
        testInputOperator(true);
    }

    public void testInputOperator(boolean z) throws Exception {
        latch = new CountDownLatch(this.totalBrokers);
        KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(KafkaOperatorTestBase.TEST_TOPIC, this.hasMultiPartition, this.hasMultiCluster);
        kafkaTestProducer.setSendCount(10000);
        Thread thread = new Thread(kafkaTestProducer);
        thread.start();
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        KafkaSinglePortInputOperator kafkaSinglePortInputOperator = (KafkaSinglePortInputOperator) dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
        kafkaSinglePortInputOperator.setInitialPartitionCount(1);
        kafkaSinglePortInputOperator.setTopics(KafkaOperatorTestBase.TEST_TOPIC);
        kafkaSinglePortInputOperator.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
        kafkaSinglePortInputOperator.setClusters(getClusterConfig());
        kafkaSinglePortInputOperator.setStrategy(this.partition);
        dag.addStream("Kafka message", kafkaSinglePortInputOperator.outputPort, dag.addOperator("TestMessageCollector", new CollectorModule()).inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        if (z) {
            setupHasFailureTest(kafkaSinglePortInputOperator, dag);
        }
        controller.runAsync();
        Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, latch.await(40000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals("Tuple count", 10000, tupleCollection.size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(tupleCollection.size())));
        thread.join();
        kafkaTestProducer.close();
        controller.shutdown();
    }

    private void setupHasFailureTest(KafkaSinglePortInputOperator kafkaSinglePortInputOperator, DAG dag) {
        kafkaSinglePortInputOperator.setHoldingBufferSize(5000);
        dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
        kafkaSinglePortInputOperator.setMaxTuplesPerWindow(500);
    }

    private String getClusterConfig() {
        return "localhost:" + TEST_KAFKA_BROKER_PORT[0][0] + (this.hasMultiPartition ? ",localhost:" + TEST_KAFKA_BROKER_PORT[0][1] : "") + (this.hasMultiCluster ? ";localhost:" + TEST_KAFKA_BROKER_PORT[1][0] : "") + ((this.hasMultiCluster && this.hasMultiPartition) ? ",localhost:" + TEST_KAFKA_BROKER_PORT[1][1] : "");
    }

    static /* synthetic */ int access$108() {
        int i = k;
        k = i + 1;
        return i;
    }
}
