package co.cask.cdap.kafka.flow;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.FlowManager;
import co.cask.cdap.test.RuntimeMetrics;
import co.cask.cdap.test.RuntimeStats;
import co.cask.cdap.test.TestBase;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/kafka/flow/KafkaConsumerFlowletTestBase.class */
public abstract class KafkaConsumerFlowletTestBase extends TestBase {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerFlowletTestBase.class);

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    protected static final int PARTITIONS = 6;
    protected static InMemoryZKServer zkServer;
    protected static EmbeddedKafkaServer kafkaServer;
    protected static int kafkaPort;

    @BeforeClass
    public static void initialize() throws IOException {
        zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
        zkServer.startAndWait();
        kafkaPort = Networks.getRandomPort();
        kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr(), kafkaPort, TMP_FOLDER.newFolder()));
        kafkaServer.startAndWait();
    }

    @AfterClass
    public static void cleanup() {
        kafkaServer.stopAndWait();
        zkServer.stopAndWait();
    }

    @After
    public void cleanUpMetrics() throws Exception {
        RuntimeStats.resetAll();
        clear();
    }

    protected abstract Class<? extends KafkaConsumingApp> getApplication();

    protected abstract void sendMessage(String str, Map<String, String> map);

    protected abstract boolean supportBrokerList();

    private Map<String, String> getRuntimeArgs(String str, int i, boolean z) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("kafka.topic", str);
        if (!supportBrokerList() || z) {
            newHashMap.put("kafka.zookeeper", zkServer.getConnectionStr());
        } else {
            newHashMap.put("kafka.brokers", "localhost:" + kafkaPort);
        }
        newHashMap.put("kafka.partitions", Integer.toString(i));
        return newHashMap;
    }

    protected Map<String, String> getRuntimeArgs(String str, int i, boolean z, long j) {
        Map<String, String> runtimeArgs = getRuntimeArgs(str, i, z);
        runtimeArgs.put("kafka.default.offset", Long.toString(j));
        return runtimeArgs;
    }

    @Test
    public final void testFlowlet() throws Exception {
        ApplicationManager deployApplication = deployApplication(getApplication(), new File[0]);
        FlowManager startFlow = deployApplication.startFlow("KafkaConsumingFlow", getRuntimeArgs("testTopic", PARTITIONS, false));
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < 5; i++) {
            newHashMap.put(Integer.toString(i), "Message " + i);
        }
        sendMessage("testTopic", newHashMap);
        RuntimeMetrics flowletMetrics = RuntimeStats.getFlowletMetrics("KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        flowletMetrics.waitForProcessed(5, 10L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertEquals(5, flowletMetrics.getProcessed());
        startFlow.stop();
        newHashMap.clear();
        int i2 = 5 + 1;
        newHashMap.put(Integer.toString(5), "Message 5");
        newHashMap.put("Failure", "Failure");
        newHashMap.put(Integer.toString(i2), "Message " + i2);
        sendMessage("testTopic", newHashMap);
        RuntimeStats.resetAll();
        FlowManager startFlowWithRetry = startFlowWithRetry(deployApplication, "KafkaConsumingFlow", getRuntimeArgs("testTopic", PARTITIONS, true), 5);
        flowletMetrics.waitForProcessed(2L, 10L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertEquals(2L, flowletMetrics.getProcessed());
        startFlowWithRetry.stop();
        assertDatasetCount(deployApplication, i2 + 1);
    }

    @Test
    public void testChangeInstances() throws TimeoutException, InterruptedException {
        ApplicationManager deployApplication = deployApplication(getApplication(), new File[0]);
        FlowManager startFlow = deployApplication.startFlow("KafkaConsumingFlow", getRuntimeArgs("testChangeInstances", PARTITIONS, false));
        for (int i = 0; i < 100; i++) {
            sendMessage("testChangeInstances", ImmutableMap.of(Integer.toString(i), "TestInstances " + i));
        }
        RuntimeMetrics flowletMetrics = RuntimeStats.getFlowletMetrics("KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        flowletMetrics.waitForProcessed(100, 10L, TimeUnit.SECONDS);
        startFlow.setFlowletInstances("KafkaSource", 3);
        for (int i2 = 0; i2 < 100; i2++) {
            sendMessage("testChangeInstances", ImmutableMap.of(Integer.toString(i2 + 100), "TestInstances " + (i2 + 100)));
        }
        int i3 = 100 * 2;
        flowletMetrics.waitForProcessed(i3, 10L, TimeUnit.SECONDS);
        startFlow.stop();
        assertDatasetCount(deployApplication, i3);
    }

    @Test
    public final void testStartOffset() throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < 5; i++) {
            newHashMap.put(Integer.toString(i), "Message " + i);
        }
        sendMessage("testStartOffset", newHashMap);
        ApplicationManager deployApplication = deployApplication(getApplication(), new File[0]);
        FlowManager startFlow = deployApplication.startFlow("KafkaConsumingFlow", getRuntimeArgs("testStartOffset", PARTITIONS, false, -1L));
        TimeUnit.SECONDS.sleep(2L);
        HashMap newHashMap2 = Maps.newHashMap();
        for (int i2 = 5; i2 < 5 + 5; i2++) {
            newHashMap2.put(Integer.toString(i2), "Message " + i2);
        }
        sendMessage("testStartOffset", newHashMap2);
        RuntimeMetrics flowletMetrics = RuntimeStats.getFlowletMetrics("KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        flowletMetrics.waitForProcessed(5L, 10L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertEquals(5, flowletMetrics.getProcessed());
        startFlow.stop();
        assertDatasetCount(deployApplication, 5);
    }

    @Test
    public final void testInvalidStartOffsetLarger() throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < 5; i++) {
            newHashMap.put(Integer.toString(i), "Message " + i);
        }
        sendMessage("testInvalidStartOffsetLarger", newHashMap);
        ApplicationManager deployApplication = deployApplication(getApplication(), new File[0]);
        FlowManager startFlow = deployApplication.startFlow("KafkaConsumingFlow", getRuntimeArgs("testInvalidStartOffsetLarger", PARTITIONS, false, 12345678901234L));
        TimeUnit.SECONDS.sleep(2L);
        HashMap newHashMap2 = Maps.newHashMap();
        for (int i2 = 5; i2 < 5 + 5; i2++) {
            newHashMap2.put(Integer.toString(i2), "Message " + i2);
        }
        sendMessage("testInvalidStartOffsetLarger", newHashMap2);
        RuntimeMetrics flowletMetrics = RuntimeStats.getFlowletMetrics("KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        flowletMetrics.waitForProcessed(5L, 10L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertEquals(5, flowletMetrics.getProcessed());
        startFlow.stop();
        assertDatasetCount(deployApplication, 5);
    }

    @Test
    public final void testInvalidStartOffsetSmaller() throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < 500; i++) {
            newHashMap.put(Integer.toString(i), "Test Invalid Start Offset Message " + i);
        }
        sendMessage("testInvalidStartOffsetSmaller", newHashMap);
        newHashMap.clear();
        for (int i2 = 500; i2 < 2 * 500; i2++) {
            newHashMap.put(Integer.toString(i2), "Test Invalid Start Offset Message " + i2);
        }
        sendMessage("testInvalidStartOffsetSmaller", newHashMap);
        TimeUnit.SECONDS.sleep(80L);
        FlowManager startFlow = deployApplication(getApplication(), new File[0]).startFlow("KafkaConsumingFlow", getRuntimeArgs("testInvalidStartOffsetSmaller", PARTITIONS, false, -2L));
        TimeUnit.SECONDS.sleep(2L);
        RuntimeMetrics flowletMetrics = RuntimeStats.getFlowletMetrics("KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        flowletMetrics.waitForProcessed(10L, 30L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(10L);
        startFlow.stop();
        long processed = flowletMetrics.getProcessed();
        LOG.info("Fetched {} messages from Kafka", Long.valueOf(processed));
        Assert.assertTrue(processed > 1);
        Assert.assertTrue(processed < ((long) (2 * 500)));
        clear();
        ApplicationManager deployApplication = deployApplication(getApplication(), new File[0]);
        FlowManager startFlow2 = deployApplication.startFlow("KafkaConsumingFlow", getRuntimeArgs("testInvalidStartOffsetSmaller", PARTITIONS, false, 0L));
        TimeUnit.SECONDS.sleep(2L);
        RuntimeMetrics flowletMetrics2 = RuntimeStats.getFlowletMetrics("KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        flowletMetrics2.waitForProcessed(processed, 30L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        startFlow2.stop();
        Assert.assertEquals(processed, flowletMetrics2.getProcessed());
        assertDatasetCount(deployApplication, processed);
    }

    private void assertDatasetCount(ApplicationManager applicationManager, long j) {
        CloseableIterator scan = ((KeyValueTable) applicationManager.getDataSet("counter").get()).scan((byte[]) null, (byte[]) null);
        int i = 0;
        while (scan.hasNext()) {
            try {
                Assert.assertEquals(1L, Bytes.toLong((byte[]) ((KeyValue) scan.next()).getValue()));
                i++;
            } catch (Throwable th) {
                scan.close();
                throw th;
            }
        }
        Assert.assertEquals(j, i);
        scan.close();
    }

    private FlowManager startFlowWithRetry(ApplicationManager applicationManager, String str, Map<String, String> map, int i) {
        Throwable th = null;
        do {
            if (th != null) {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    throw Throwables.propagate(e);
                } catch (Throwable th2) {
                    th = th2;
                    i--;
                }
            }
            return applicationManager.startFlow(str, map);
        } while (i > 0);
        throw Throwables.propagate(th);
    }

    private static Properties generateKafkaConfig(String str, int i, File file) {
        Properties properties = new Properties();
        properties.setProperty("log.dir", file.getAbsolutePath());
        properties.setProperty("port", Integer.toString(i));
        properties.setProperty("broker.id", "1");
        properties.setProperty("socket.send.buffer.bytes", "1048576");
        properties.setProperty("socket.receive.buffer.bytes", "1048576");
        properties.setProperty("socket.request.max.bytes", "104857600");
        properties.setProperty("num.partitions", Integer.toString(PARTITIONS));
        properties.setProperty("log.retention.hours", "24");
        properties.setProperty("log.flush.interval.messages", "10");
        properties.setProperty("log.flush.interval.ms", "1000");
        properties.setProperty("log.segment.bytes", "100");
        properties.setProperty("zookeeper.connect", str);
        properties.setProperty("zookeeper.connection.timeout.ms", "1000000");
        properties.setProperty("default.replication.factor", "1");
        properties.setProperty("log.retention.bytes", "1000");
        properties.setProperty("log.retention.check.interval.ms", "60000");
        properties.setProperty("brokerid", "1");
        properties.setProperty("zk.connect", str);
        properties.setProperty("zk.connectiontimeout.ms", "1000000");
        properties.setProperty("log.retention.size", "1000");
        properties.setProperty("log.cleanup.interval.mins", "1");
        properties.setProperty("log.file.size", "1000");
        return properties;
    }
}
