/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.app;

import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingApplicationTest
extends StreamingTestCase {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingApplicationTest.class);
    private static final String PROJECT = "streaming_test";
    private static final String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testExecute_PrepareBeforeExecute() {
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "1", "", "xx"};
        SparkConf sparkConf = KylinBuildEnv.getOrCreate((KylinConfig)StreamingApplicationTest.getTestConfig()).sparkConf();
        ((StreamingEntry)Mockito.doNothing().when((Object)entry)).getOrCreateSparkSession(sparkConf);
        ((StreamingEntry)Mockito.doNothing().when((Object)entry)).doExecute();
        ((StreamingEntry)Mockito.doReturn((Object)123).when((Object)entry)).reportApplicationInfo();
        entry.execute(args);
    }

    @Test
    public void testExecute_DoExecute() {
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "1", "", "xx"};
        SparkConf sparkConf = KylinBuildEnv.getOrCreate((KylinConfig)StreamingApplicationTest.getTestConfig()).sparkConf();
        ((StreamingEntry)Mockito.doNothing().when((Object)entry)).getOrCreateSparkSession(sparkConf);
        ((StreamingEntry)Mockito.doNothing().when((Object)entry)).doExecute();
        ((StreamingEntry)Mockito.doReturn((Object)123).when((Object)entry)).reportApplicationInfo();
        entry.execute(args);
    }

    @Test
    public void testSystemExit_False() {
        this.overwriteSystemProp("streaming.local", "true");
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        ((StreamingEntry)Mockito.doReturn((Object)false).when((Object)entry)).isJobOnCluster();
        entry.systemExit(0);
    }

    @Test
    public void testIsJobOnCluster_True() {
        this.overwriteSystemProp("streaming.local", "false");
        StreamingApplicationTest.getTestConfig().setProperty("kylin.env", "PROD");
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        Assert.assertFalse((boolean)StreamingApplicationTest.getTestConfig().isUTEnv());
        Assert.assertFalse((boolean)StreamingUtils.isLocalMode());
        Assert.assertTrue((boolean)entry.isJobOnCluster());
    }

    @Test
    public void testIsJobOnCluster_False() {
        this.overwriteSystemProp("streaming.local", "false");
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        Assert.assertTrue((boolean)StreamingApplicationTest.getTestConfig().isUTEnv());
        Assert.assertFalse((boolean)StreamingUtils.isLocalMode());
        Assert.assertFalse((boolean)entry.isJobOnCluster());
        this.overwriteSystemProp("streaming.local", "true");
        entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        StreamingApplicationTest.getTestConfig().setProperty("kylin.env", "PROD");
        Assert.assertFalse((boolean)StreamingApplicationTest.getTestConfig().isUTEnv());
        Assert.assertTrue((boolean)StreamingUtils.isLocalMode());
        Assert.assertFalse((boolean)entry.isJobOnCluster());
    }

    @Test
    public void testCloseSparkSession_True() {
        this.overwriteSystemProp("streaming.local", "false");
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.setSparkSession(this.createSparkSession());
        Assert.assertFalse((boolean)entry.getSparkSession().sparkContext().isStopped());
        entry.closeSparkSession();
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> entry.getSparkSession().sparkContext().isStopped());
    }

    @Test
    public void testCloseSparkSession_False() {
        this.overwriteSystemProp("streaming.local", "true");
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.setSparkSession(this.createSparkSession());
        Assert.assertFalse((boolean)entry.getSparkSession().sparkContext().isStopped());
        entry.closeSparkSession();
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> !entry.getSparkSession().sparkContext().isStopped());
    }

    @Test
    public void testCloseSparkSession_AlreadyStop() {
        this.overwriteSystemProp("streaming.local", "false");
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        SparkSession ss = this.createSparkSession();
        entry.setSparkSession(ss);
        ss.stop();
        Assert.assertTrue((boolean)entry.getSparkSession().sparkContext().isStopped());
        entry.closeSparkSession();
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> entry.getSparkSession().sparkContext().isStopped());
    }

    @Test
    public void testStartJobExecutionIdCheckThread_NotRunning() {
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        ((StreamingEntry)Mockito.doReturn((Object)false).when((Object)entry)).isRunning();
        Assert.assertFalse((boolean)entry.isRunning());
        entry.startJobExecutionIdCheckThread();
        Awaitility.waitAtMost((long)3L, (TimeUnit)TimeUnit.SECONDS).until(() -> !entry.isRunning());
    }

    @Test
    public void testStartJobExecutionIdCheckThread_Exception() {
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        ((StreamingEntry)Mockito.doReturn((Object)true).when((Object)entry)).isRunning();
        Assert.assertTrue((boolean)entry.isRunning());
        entry.startJobExecutionIdCheckThread();
        Awaitility.waitAtMost((long)3L, (TimeUnit)TimeUnit.SECONDS).until(() -> ((StreamingEntry)entry).isRunning());
    }
}

