package org.apache.kylin.streaming.app;

import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.sql.SparkSession;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/streaming/app/StreamingApplicationTest.class */
public class StreamingApplicationTest extends StreamingTestCase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingApplicationTest.class);
    private static final String PROJECT = "streaming_test";
    private static final String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testExecute_PrepareBeforeExecute() {
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        String[] strArr = {PROJECT, DATAFLOW_ID, "1", "", "xx"};
        ((StreamingEntry) Mockito.doNothing().when(streamingEntry)).getOrCreateSparkSession(KylinBuildEnv.getOrCreate(getTestConfig()).sparkConf());
        ((StreamingEntry) Mockito.doNothing().when(streamingEntry)).doExecute();
        ((StreamingEntry) Mockito.doReturn(123).when(streamingEntry)).reportApplicationInfo();
        streamingEntry.execute(strArr);
    }

    @Test
    public void testExecute_DoExecute() {
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        String[] strArr = {PROJECT, DATAFLOW_ID, "1", "", "xx"};
        ((StreamingEntry) Mockito.doNothing().when(streamingEntry)).getOrCreateSparkSession(KylinBuildEnv.getOrCreate(getTestConfig()).sparkConf());
        ((StreamingEntry) Mockito.doNothing().when(streamingEntry)).doExecute();
        ((StreamingEntry) Mockito.doReturn(123).when(streamingEntry)).reportApplicationInfo();
        streamingEntry.execute(strArr);
    }

    @Test
    public void testSystemExit_False() {
        overwriteSystemProp("streaming.local", "true");
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        ((StreamingEntry) Mockito.doReturn(false).when(streamingEntry)).isJobOnCluster();
        streamingEntry.systemExit(0);
    }

    @Test
    public void testIsJobOnCluster_True() {
        overwriteSystemProp("streaming.local", "false");
        getTestConfig().setProperty("kylin.env", "PROD");
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        Assert.assertFalse(getTestConfig().isUTEnv());
        Assert.assertFalse(StreamingUtils.isLocalMode());
        Assert.assertTrue(streamingEntry.isJobOnCluster());
    }

    @Test
    public void testIsJobOnCluster_False() {
        overwriteSystemProp("streaming.local", "false");
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        Assert.assertTrue(getTestConfig().isUTEnv());
        Assert.assertFalse(StreamingUtils.isLocalMode());
        Assert.assertFalse(streamingEntry.isJobOnCluster());
        overwriteSystemProp("streaming.local", "true");
        StreamingEntry streamingEntry2 = (StreamingEntry) Mockito.spy(new StreamingEntry());
        getTestConfig().setProperty("kylin.env", "PROD");
        Assert.assertFalse(getTestConfig().isUTEnv());
        Assert.assertTrue(StreamingUtils.isLocalMode());
        Assert.assertFalse(streamingEntry2.isJobOnCluster());
    }

    @Test
    public void testCloseSparkSession_True() {
        overwriteSystemProp("streaming.local", "false");
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.setSparkSession(createSparkSession());
        Assert.assertFalse(streamingEntry.getSparkSession().sparkContext().isStopped());
        streamingEntry.closeSparkSession();
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(streamingEntry.getSparkSession().sparkContext().isStopped());
        });
    }

    @Test
    public void testCloseSparkSession_False() {
        overwriteSystemProp("streaming.local", "true");
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.setSparkSession(createSparkSession());
        Assert.assertFalse(streamingEntry.getSparkSession().sparkContext().isStopped());
        streamingEntry.closeSparkSession();
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!streamingEntry.getSparkSession().sparkContext().isStopped());
        });
    }

    @Test
    public void testCloseSparkSession_AlreadyStop() {
        overwriteSystemProp("streaming.local", "false");
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        SparkSession createSparkSession = createSparkSession();
        streamingEntry.setSparkSession(createSparkSession);
        createSparkSession.stop();
        Assert.assertTrue(streamingEntry.getSparkSession().sparkContext().isStopped());
        streamingEntry.closeSparkSession();
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(streamingEntry.getSparkSession().sparkContext().isStopped());
        });
    }

    @Test
    public void testStartJobExecutionIdCheckThread_NotRunning() {
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        ((StreamingEntry) Mockito.doReturn(false).when(streamingEntry)).isRunning();
        Assert.assertFalse(streamingEntry.isRunning());
        streamingEntry.startJobExecutionIdCheckThread();
        Awaitility.waitAtMost(3L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!streamingEntry.isRunning());
        });
    }

    @Test
    public void testStartJobExecutionIdCheckThread_Exception() {
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        ((StreamingEntry) Mockito.doReturn(true).when(streamingEntry)).isRunning();
        Assert.assertTrue(streamingEntry.isRunning());
        streamingEntry.startJobExecutionIdCheckThread();
        ConditionFactory waitAtMost = Awaitility.waitAtMost(3L, TimeUnit.SECONDS);
        streamingEntry.getClass();
        waitAtMost.until(streamingEntry::isRunning);
    }
}
