package org.apache.apex.malhar.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
@Ignore
/* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaInputOperatorTest.class */
public class KafkaInputOperatorTest extends KafkaOperatorTestBase {
    private int totalBrokers;
    private String partition;
    private String testName = "";
    private static CountDownLatch latch;
    public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
    private static final Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
    private static List<String> tupleCollection = new LinkedList();
    private static Map<String, Set<String>> tupleCollectedInWindow = new HashMap();
    private static boolean hasFailure = false;
    private static int failureTrigger = 3000;
    private static int k = 0;

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaInputOperatorTest$CollectorInputPort.class */
    public static class CollectorInputPort extends DefaultInputPort<byte[]> {
        CollectorModule ownerNode;

        CollectorInputPort(CollectorModule collectorModule) {
            this.ownerNode = collectorModule;
        }

        public void process(byte[] bArr) {
            String str = new String(bArr);
            if (KafkaInputOperatorTest.hasFailure && KafkaInputOperatorTest.access$208() == KafkaInputOperatorTest.failureTrigger) {
                boolean unused = KafkaInputOperatorTest.hasFailure = false;
                throw new RuntimeException();
            }
            if (str.equals(KafkaOperatorTestBase.END_TUPLE)) {
                if (KafkaInputOperatorTest.latch != null) {
                    KafkaInputOperatorTest.latch.countDown();
                }
            } else {
                KafkaInputOperatorTest.tupleCollection.add(str);
                if (this.ownerNode.isIdempotentTest) {
                    this.ownerNode.windowTupleCollector.add(str);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaInputOperatorTest$CollectorModule.class */
    public static class CollectorModule extends BaseOperator {
        long currentWindowId;
        long operatorId;
        public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
        boolean isIdempotentTest = false;
        transient Set<String> windowTupleCollector = new HashSet();

        public void setup(Context.OperatorContext operatorContext) {
            super.setup(operatorContext);
            this.operatorId = operatorContext.getId();
        }

        public void beginWindow(long j) {
            super.beginWindow(j);
            this.currentWindowId = j;
        }

        public void endWindow() {
            super.endWindow();
            if (this.isIdempotentTest) {
                String str = this.operatorId + "," + this.currentWindowId;
                Set set = (Set) KafkaInputOperatorTest.tupleCollectedInWindow.get(str);
                if (set != null) {
                    Assert.assertEquals("replay messages should be exactly same as previous window", set, this.windowTupleCollector);
                } else {
                    HashSet hashSet = new HashSet();
                    hashSet.addAll(this.windowTupleCollector);
                    KafkaInputOperatorTest.tupleCollectedInWindow.put(str, hashSet);
                }
            }
            this.windowTupleCollector.clear();
        }
    }

    @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
    public static Collection<Object[]> testScenario() {
        return Arrays.asList(new Object[]{true, false, "one_to_one"}, new Object[]{true, false, "one_to_many"}, new Object[]{true, true, "one_to_one"}, new Object[]{true, true, "one_to_many"}, new Object[]{false, true, "one_to_one"}, new Object[]{false, true, "one_to_many"}, new Object[]{false, false, "one_to_one"}, new Object[]{false, false, "one_to_many"});
    }

    @Before
    public void before() {
        FileUtils.deleteQuietly(new File(APPLICATION_PATH));
        tupleCollection.clear();
        StringBuilder append = new StringBuilder().append(KafkaOperatorTestBase.TEST_TOPIC);
        int i = testCounter;
        testCounter = i + 1;
        this.testName = append.append(i).toString();
        createTopic(0, this.testName);
        if (this.hasMultiCluster) {
            createTopic(1, this.testName);
        }
    }

    public KafkaInputOperatorTest(boolean z, boolean z2, String str) {
        this.totalBrokers = 0;
        this.partition = null;
        this.hasMultiCluster = z;
        this.hasMultiPartition = z2;
        this.totalBrokers = (1 + (z2 ? 1 : 0)) * (1 + (z ? 1 : 0));
        this.partition = str;
    }

    @Test
    public void testInputOperator() throws Exception {
        hasFailure = false;
        testInputOperator(false, false);
    }

    @Test
    public void testInputOperatorWithFailure() throws Exception {
        hasFailure = true;
        testInputOperator(true, false);
    }

    @Test
    public void testIdempotentInputOperatorWithFailure() throws Exception {
        hasFailure = true;
        testInputOperator(true, true);
    }

    public void testInputOperator(boolean z, boolean z2) throws Exception {
        latch = new CountDownLatch(this.totalBrokers);
        KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(this.testName, this.hasMultiPartition, this.hasMultiCluster);
        kafkaTestProducer.setSendCount(10000);
        Thread thread = new Thread(kafkaTestProducer);
        thread.start();
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        KafkaSinglePortInputOperator kafkaSinglePortInputOperator = (KafkaSinglePortInputOperator) dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
        kafkaSinglePortInputOperator.setInitialPartitionCount(1);
        kafkaSinglePortInputOperator.setTopics(this.testName);
        kafkaSinglePortInputOperator.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
        kafkaSinglePortInputOperator.setClusters(getClusterConfig());
        kafkaSinglePortInputOperator.setStrategy(this.partition);
        if (z2) {
            kafkaSinglePortInputOperator.setWindowDataManager(new FSWindowDataManager());
        }
        CollectorModule addOperator = dag.addOperator("TestMessageCollector", CollectorModule.class);
        addOperator.isIdempotentTest = z2;
        dag.addStream("Kafka message", kafkaSinglePortInputOperator.outputPort, addOperator.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        if (z) {
            setupHasFailureTest(kafkaSinglePortInputOperator, dag);
        }
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        controller.runAsync();
        boolean await = latch.await(40000L, TimeUnit.MILLISECONDS);
        Collections.sort(tupleCollection, new Comparator<String>() { // from class: org.apache.apex.malhar.kafka.KafkaInputOperatorTest.1
            @Override // java.util.Comparator
            public int compare(String str, String str2) {
                return Integer.parseInt(str.split("_")[1]) - Integer.parseInt(str2.split("_")[1]);
            }
        });
        Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, await);
        Assert.assertTrue("Collected tuples " + tupleCollection + " Tuple count is not expected", 10000 <= tupleCollection.size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(tupleCollection.size())));
        thread.join();
        kafkaTestProducer.close();
        controller.shutdown();
    }

    private void setupHasFailureTest(KafkaSinglePortInputOperator kafkaSinglePortInputOperator, DAG dag) {
        kafkaSinglePortInputOperator.setHoldingBufferSize(5000);
        dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
        kafkaSinglePortInputOperator.setMaxTuplesPerWindow(500);
    }

    private String getClusterConfig() {
        return "localhost:" + TEST_KAFKA_BROKER_PORT[0][0] + (this.hasMultiPartition ? ",localhost:" + TEST_KAFKA_BROKER_PORT[0][1] : "") + (this.hasMultiCluster ? ";localhost:" + TEST_KAFKA_BROKER_PORT[1][0] : "") + ((this.hasMultiCluster && this.hasMultiPartition) ? ",localhost:" + TEST_KAFKA_BROKER_PORT[1][1] : "");
    }

    static /* synthetic */ int access$208() {
        int i = k;
        k = i + 1;
        return i;
    }
}
