package co.cask.cdap.spark;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.TimeseriesTable;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.kafka.KafkaTester;
import co.cask.cdap.spark.app.KafkaSparkStreaming;
import co.cask.cdap.spark.app.TestSparkApp;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.DataSetManager;
import co.cask.cdap.test.SparkManager;
import co.cask.cdap.test.TestConfiguration;
import co.cask.cdap.test.base.TestFrameworkTestBase;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/spark/SparkStreamingTestRun.class */
public class SparkStreamingTestRun extends TestFrameworkTestBase {

    @ClassRule
    public static final KafkaTester KAFKA_TESTER = new KafkaTester();

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    @ClassRule
    public static final TestConfiguration CONFIG = new TestConfiguration(new Object[]{"explore.enabled", false});

    @Test
    public void test() throws Exception {
        File newFolder = TEMP_FOLDER.newFolder();
        KafkaPublisher publisher = KAFKA_TESTER.getKafkaClient().getPublisher(KafkaPublisher.Ack.LEADER_RECEIVED, Compression.NONE);
        ApplicationManager deployApplication = deployApplication(TestSparkApp.class, new File[0]);
        ImmutableMap of = ImmutableMap.of("checkpoint.path", newFolder.getAbsolutePath(), "kafka.brokers", KAFKA_TESTER.getBrokerService().getBrokerList(), "kafka.topics", "testtopic", "result.dataset", "TimeSeriesResult");
        SparkManager sparkManager = deployApplication.getSparkManager(KafkaSparkStreaming.class.getSimpleName());
        sparkManager.start(of);
        for (int i = 0; i < 100; i++) {
            publisher.prepare("testtopic").add(Charsets.UTF_8.encode("Message " + i), "1").send();
            TimeUnit.MILLISECONDS.sleep(50L);
        }
        final DataSetManager dataset = getDataset("TimeSeriesResult");
        final TimeseriesTable timeseriesTable = (TimeseriesTable) dataset.get();
        Tasks.waitFor(100L, new Callable<Long>() { // from class: co.cask.cdap.spark.SparkStreamingTestRun.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws Exception {
                dataset.flush();
                return Long.valueOf(SparkStreamingTestRun.this.getCounts("Message", timeseriesTable));
            }
        }, 1L, TimeUnit.MINUTES, 1L, TimeUnit.SECONDS);
        for (int i2 = 0; i2 < 100; i2++) {
            final int i3 = i2;
            Tasks.waitFor(1L, new Callable<Long>() { // from class: co.cask.cdap.spark.SparkStreamingTestRun.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Long call() throws Exception {
                    dataset.flush();
                    return Long.valueOf(SparkStreamingTestRun.this.getCounts(Integer.toString(i3), timeseriesTable));
                }
            }, 1L, TimeUnit.MINUTES, 1L, TimeUnit.SECONDS);
        }
        sparkManager.stop();
        sparkManager.waitForFinish(10L, TimeUnit.SECONDS);
        for (int i4 = 100; i4 < 200; i4++) {
            publisher.prepare("testtopic").add(Charsets.UTF_8.encode("Message " + i4), "1").send();
        }
        sparkManager.start(of);
        Tasks.waitFor(100L, new Callable<Long>() { // from class: co.cask.cdap.spark.SparkStreamingTestRun.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws Exception {
                dataset.flush();
                return Long.valueOf(SparkStreamingTestRun.this.getCounts("Message", timeseriesTable));
            }
        }, 1L, TimeUnit.MINUTES, 1L, TimeUnit.SECONDS);
        for (int i5 = 0; i5 < 200; i5++) {
            final int i6 = i5;
            Tasks.waitFor(1L, new Callable<Long>() { // from class: co.cask.cdap.spark.SparkStreamingTestRun.4
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Long call() throws Exception {
                    dataset.flush();
                    return Long.valueOf(SparkStreamingTestRun.this.getCounts(Integer.toString(i6), timeseriesTable));
                }
            }, 1L, TimeUnit.MINUTES, 1L, TimeUnit.SECONDS);
        }
        sparkManager.stop();
        sparkManager.waitForFinish(10L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
    public long getCounts(String str, TimeseriesTable timeseriesTable) {
        long j = 0;
        Iterator read = timeseriesTable.read(Bytes.toBytes(str), 0L, Long.MAX_VALUE, (byte[][]) new byte[0]);
        while (read.hasNext()) {
            j += Bytes.toLong(((TimeseriesTable.Entry) read.next()).getValue());
        }
        return j;
    }
}
