package org.apache.kylin.streaming.app;

import java.io.File;
import java.util.HashMap;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StreamingTestConstant;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree;
import org.apache.kylin.metadata.cube.model.NCubeJoinedFlatTableDesc;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.CreateStreamingFlatTable;
import org.apache.kylin.streaming.common.CreateFlatTableEntry;
import org.apache.kylin.streaming.jobs.StreamingJobUtils;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.AwaitUtils;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.streaming.Trigger;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple3;

/* loaded from: input_file:org/apache/kylin/streaming/app/StreamingEntryTest.class */
public class StreamingEntryTest extends StreamingTestCase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingEntryTest.class);
    private static String PROJECT = "streaming_test";
    private static String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testBuild() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(instanceFromEnv);
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(instanceFromEnv, PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow(DATAFLOW_ID);
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegsWithArray((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflow2 = nDataflowManager.getDataflow(dataflow.getId());
        nDataflowManager.appendSegmentForStreaming(dataflow2, createSegmentRange()).setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate2 = new NDataflowUpdate(dataflow2.getUuid());
        nDataflowUpdate2.setToRemoveSegsWithArray((NDataSegment[]) dataflow2.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate2);
        NCubeJoinedFlatTableDesc nCubeJoinedFlatTableDesc = new NCubeJoinedFlatTableDesc(dataflow2.getIndexPlan());
        Assert.assertNotNull(StreamingUtils.getToBuildLayouts(dataflow2));
        String[] strArr = {PROJECT, DATAFLOW_ID, "1", "", "xx"};
        StreamingEntry streamingEntry = new StreamingEntry();
        streamingEntry.parseParams(strArr);
        NSpanningTree createSpanningTree = streamingEntry.createSpanningTree(dataflow2);
        Assert.assertNotNull(createSpanningTree);
        SparkSession createSparkSession = createSparkSession();
        CreateStreamingFlatTable apply = CreateStreamingFlatTable.apply(new CreateFlatTableEntry(nCubeJoinedFlatTableDesc, (NDataSegment) null, createSpanningTree, createSparkSession, (NBuildSourceInfo) null, (String) null, (String) null, "org.apache.kylin.parser.TimedJsonStreamParser"));
        Assert.assertEquals(1L, apply.generateStreamingDataset(instanceFromEnv).count());
        NDataModel dataModel = nCubeJoinedFlatTableDesc.getDataModel();
        Assert.assertEquals("earliest", dataModel.getRootFactTable().getTableDesc().getKafkaConfig().getKafkaParam().get("startingOffsets"));
        HashMap hashMap = new HashMap();
        hashMap.put("kylin.streaming.kafka-conf.startingOffsets", "latest");
        KylinConfig streamingKylinConfig = StreamingJobUtils.getStreamingKylinConfig(instanceFromEnv, hashMap, dataModel.getId(), PROJECT);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        apply.generateStreamingDataset(streamingKylinConfig);
        Assert.assertEquals("latest", nCubeJoinedFlatTableDesc.getDataModel().getRootFactTable().getTableDesc().getKafkaConfig().getKafkaParam().get("startingOffsets"));
        createSparkSession.stop();
        Assert.assertEquals("LO_PARTITIONCOLUMN", apply.partitionColumn());
    }

    @Test
    public void testMain() {
        KylinConfig testConfig = getTestConfig();
        clearCheckpoint(DATAFLOW_ID);
        createSparkKafkaSource(testConfig).enableMemoryStream(true);
        String[] strArr = {PROJECT, DATAFLOW_ID, "1", "", ""};
        this.thrown.expect(Exception.class);
        StreamingEntry.main(strArr);
    }

    @Test
    public void testInitBuildEntry_EmptyCheckPoint() {
        getTestConfig().setProperty("kylin.engine.streaming-checkpoint-location", "");
        this.thrown.expectMessage("base checkpoint location must be configured,");
        new StreamingEntry();
    }

    @Test
    public void testInitBuildEntry_TriggerOnce() {
        getTestConfig().setProperty("kylin.engine.streaming-trigger-once", "true");
        Trigger trigger = new StreamingEntry().trigger();
        Assert.assertNotNull(trigger);
        Assert.assertTrue(trigger instanceof OneTimeTrigger$);
        getTestConfig().setProperty("kylin.engine.streaming-trigger-once", "false");
        Trigger trigger2 = new StreamingEntry().trigger();
        Assert.assertNotNull(trigger2);
        Assert.assertTrue(trigger2 instanceof ProcessingTimeTrigger);
    }

    @Test
    public void testInitBuildEntry_DataFlow() {
        String[] strArr = {PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(strArr);
        Assert.assertNotNull(streamingEntry.dataflow());
    }

    @Test
    public void testExecute() {
        KylinConfig testConfig = getTestConfig();
        clearCheckpoint(DATAFLOW_ID);
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(testConfig);
        createSparkKafkaSource.enableMemoryStream(true);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String[] strArr = {PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(strArr);
        streamingEntry.setSparkSession(createSparkSession());
        StreamingEntry.entry_$eq(streamingEntry);
        Assert.assertNotNull(StreamingEntry.entry());
        Mockito.when(streamingEntry.createRestSupport(testConfig)).thenReturn(new RestSupport(testConfig) { // from class: org.apache.kylin.streaming.app.StreamingEntryTest.1
            public RestResponse execute(HttpRequestBase httpRequestBase, Object obj) {
                StreamingJobManager.getInstance(NLocalFileMetadataTestCase.getTestConfig(), StreamingEntryTest.PROJECT).updateStreamingJob(StreamingEntryTest.DATAFLOW_ID + "_build", streamingJobMeta -> {
                    streamingJobMeta.setJobExecutionId(1);
                });
                return RestResponse.ok("1");
            }
        });
        AwaitUtils.await(() -> {
        }, 5000, () -> {
            StreamingJobManager.getInstance(getTestConfig(), PROJECT).updateStreamingJob(DATAFLOW_ID + "_build", streamingJobMeta -> {
                streamingJobMeta.setAction("GRACEFUL_SHUTDOWN");
            });
        });
        try {
            streamingEntry.doExecute();
            Assert.assertTrue(streamingEntry.getSparkSession().sparkContext().isStopped());
        } catch (Exception e) {
            Assert.fail(e.getMessage());
        }
        clearCheckpoint(DATAFLOW_ID);
    }

    @Test
    public void testExecute_IllegalProject() {
        String[] strArr = {"xxxx", DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(strArr);
        this.thrown.expectMessage("metastore can not find this project xxxx");
        streamingEntry.doExecute();
    }

    @Test
    public void testExecute_EmptyFlatTableDataSet() {
        String[] strArr = {PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(strArr);
        ((StreamingEntry) Mockito.doNothing().when(streamingEntry)).registerStreamListener();
        ((StreamingEntry) Mockito.doReturn(new Tuple3((Object) null, (Object) null, (Object) null)).when(streamingEntry)).generateStreamQueryForOneModel();
        this.thrown.expectMessage(String.format(Locale.ROOT, "generate query for one model failed for project:  %s dataflowId: %s", PROJECT, DATAFLOW_ID));
        streamingEntry.doExecute();
    }

    @Test
    public void testExecute_EmptyTimeColumn() {
        String[] strArr = {PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(strArr);
        ((StreamingEntry) Mockito.doNothing().when(streamingEntry)).registerStreamListener();
        ((StreamingEntry) Mockito.doReturn(new Tuple3(createSparkSession().range(1L), (Object) null, (Object) null)).when(streamingEntry)).generateStreamQueryForOneModel();
        this.thrown.expectMessage(String.format(Locale.ROOT, "streaming query must have time partition column for project:  %s dataflowId: %s", PROJECT, DATAFLOW_ID));
        streamingEntry.doExecute();
    }

    @Test
    public void testDimensionTableRefresh() {
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(getTestConfig());
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        streamingEntry.setSparkSession(createSparkSession());
        Tuple3 generateStreamQueryForOneModel = streamingEntry.generateStreamQueryForOneModel();
        ReflectionUtils.setField((Object) streamingEntry, "rateTriggerDuration", (Object) 1000L);
        Assert.assertEquals(1000L, ReflectionUtils.getField(streamingEntry, "rateTriggerDuration"));
        CreateStreamingFlatTable createStreamingFlatTable = (CreateStreamingFlatTable) generateStreamQueryForOneModel._3();
        createStreamingFlatTable.tableRefreshInterval_$eq(5L);
        Assert.assertEquals("511a9163-7888-4a60-aa24-ae735937cc87", createStreamingFlatTable.model().getId());
        try {
            streamingEntry.startTableRefreshThread(createStreamingFlatTable);
            AwaitUtils.await(() -> {
            }, 10000, () -> {
                streamingEntry.refreshTable(createStreamingFlatTable);
                Assert.assertEquals(0L, streamingEntry.tableRefreshAcc().get());
                StreamingEntry.stop();
                streamingEntry.ss.stop();
            });
        } catch (Exception e) {
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testDimensionTableRefresh_Skip() {
        CreateStreamingFlatTable createStreamingFlatTable = (CreateStreamingFlatTable) Mockito.spy(new CreateStreamingFlatTable(new CreateFlatTableEntry((IJoinedFlatTableDesc) null, (NDataSegment) null, (NSpanningTree) null, (SparkSession) null, (NBuildSourceInfo) null, (String) null, (String) null, (String) null)));
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.refreshTable(createStreamingFlatTable);
        ((CreateStreamingFlatTable) Mockito.doReturn(false).when(createStreamingFlatTable)).shouldRefreshTable();
        Assert.assertFalse(createStreamingFlatTable.shouldRefreshTable());
        streamingEntry.refreshTable(createStreamingFlatTable);
        CreateStreamingFlatTable createStreamingFlatTable2 = (CreateStreamingFlatTable) Mockito.spy(new CreateStreamingFlatTable(new CreateFlatTableEntry((IJoinedFlatTableDesc) null, (NDataSegment) null, (NSpanningTree) null, (SparkSession) null, (NBuildSourceInfo) null, (String) null, (String) null, (String) null)));
        StreamingEntry streamingEntry2 = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry2.refreshTable(createStreamingFlatTable2);
        ((CreateStreamingFlatTable) Mockito.doReturn(true).when(createStreamingFlatTable2)).shouldRefreshTable();
        ((CreateStreamingFlatTable) Mockito.doReturn(100L).when(createStreamingFlatTable2)).tableRefreshInterval();
        Assert.assertTrue(createStreamingFlatTable2.shouldRefreshTable());
        Assert.assertTrue(streamingEntry2.tableRefreshAcc().get() < createStreamingFlatTable2.tableRefreshInterval());
        streamingEntry2.refreshTable(createStreamingFlatTable2);
    }

    @Test
    public void testDimensionTableRefresh_SkipRefreshThread() {
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(getTestConfig());
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        streamingEntry.setSparkSession(createSparkSession());
        Tuple3 generateStreamQueryForOneModel = streamingEntry.generateStreamQueryForOneModel();
        ReflectionUtils.setField((Object) streamingEntry, "rateTriggerDuration", (Object) 1000L);
        Assert.assertEquals(1000L, ReflectionUtils.getField(streamingEntry, "rateTriggerDuration"));
        CreateStreamingFlatTable createStreamingFlatTable = (CreateStreamingFlatTable) generateStreamQueryForOneModel._3();
        createStreamingFlatTable.tableRefreshInterval_$eq(-1L);
        streamingEntry.startTableRefreshThread(createStreamingFlatTable);
        Assert.assertFalse(createStreamingFlatTable.shouldRefreshTable());
    }

    @Test
    public void testDimensionTableRefresh_Running() {
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(getTestConfig());
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        streamingEntry.setSparkSession(createSparkSession());
        Tuple3 generateStreamQueryForOneModel = streamingEntry.generateStreamQueryForOneModel();
        ReflectionUtils.setField((Object) streamingEntry, "rateTriggerDuration", (Object) 100L);
        Assert.assertEquals(100L, ReflectionUtils.getField(streamingEntry, "rateTriggerDuration"));
        CreateStreamingFlatTable createStreamingFlatTable = (CreateStreamingFlatTable) generateStreamQueryForOneModel._3();
        createStreamingFlatTable.tableRefreshInterval_$eq(1L);
        Assert.assertTrue(createStreamingFlatTable.shouldRefreshTable());
        streamingEntry.startTableRefreshThread(createStreamingFlatTable);
        Awaitility.waitAtMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(streamingEntry.tableRefreshAcc().get() > 0);
        });
    }

    @Test
    public void testDimensionTableRefresh_NotRunning() {
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(getTestConfig());
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        streamingEntry.setSparkSession(createSparkSession());
        Tuple3 generateStreamQueryForOneModel = streamingEntry.generateStreamQueryForOneModel();
        ReflectionUtils.setField((Object) streamingEntry, "rateTriggerDuration", (Object) 100L);
        Assert.assertEquals(100L, ReflectionUtils.getField(streamingEntry, "rateTriggerDuration"));
        CreateStreamingFlatTable createStreamingFlatTable = (CreateStreamingFlatTable) generateStreamQueryForOneModel._3();
        createStreamingFlatTable.tableRefreshInterval_$eq(1L);
        Assert.assertTrue(createStreamingFlatTable.shouldRefreshTable());
        streamingEntry.setStopFlag(true);
        streamingEntry.startTableRefreshThread(createStreamingFlatTable);
        Long l = 1000L;
        AwaitUtils.sleep(l.intValue());
        Assert.assertEquals(0L, streamingEntry.tableRefreshAcc().get());
    }

    @Test
    public void testPrepareKylinConfig() {
        KylinConfig testConfig = getTestConfig();
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(testConfig);
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        ReflectionUtils.invokeGetterMethod(streamingEntry, "prepareKylinConfig");
        Assert.assertEquals("xx", testConfig.getMetadataUrl().toString());
    }

    @Test
    public void testInitMetaPathSet() {
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(getTestConfig());
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        Set set = (Set) ReflectionUtils.invokeGetterMethod(streamingEntry, "initMetaPathSet");
        Assert.assertEquals(11L, set.size());
        Assert.assertTrue(set.contains("/streaming_test/streaming/511a9163-7888-4a60-aa24-ae735937cc87_build"));
        Assert.assertTrue(set.contains("/streaming_test/dataflow/511a9163-7888-4a60-aa24-ae735937cc87.json"));
        Assert.assertTrue(set.contains("/streaming_test/index_plan/511a9163-7888-4a60-aa24-ae735937cc87.json"));
        Assert.assertTrue(set.contains("/_global/project/streaming_test.json"));
        Assert.assertTrue(set.contains("/streaming_test/model_desc/511a9163-7888-4a60-aa24-ae735937cc87.json"));
        Assert.assertTrue(set.contains("/streaming_test/table/SSB.P_LINEORDER.json"));
        Assert.assertTrue(set.contains("/streaming_test/kafka/SSB.P_LINEORDER.json"));
        Assert.assertTrue(set.contains("/streaming_test/table/SSB.DATES.json"));
        Assert.assertTrue(set.contains("/streaming_test/table/SSB.CUSTOMER.json"));
        Assert.assertTrue(set.contains("/streaming_test/table/SSB.SUPPLIER.json"));
        Assert.assertTrue(set.contains("/streaming_test/table/SSB.PART.json"));
    }

    @Test
    public void testPrepareBeforeExecute_Local() throws ExecuteException {
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(getTestConfig());
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        ((StreamingEntry) Mockito.doNothing().when(streamingEntry)).getOrCreateSparkSession(KylinBuildEnv.getOrCreate(getTestConfig()).sparkConf());
        ((StreamingEntry) Mockito.doReturn(123).when(streamingEntry)).reportApplicationInfo();
        streamingEntry.prepareBeforeExecute();
    }

    @Test
    public void testPrepareBeforeExecute_JobOnCluster() throws ExecuteException {
        NSparkKafkaSource createSparkKafkaSource = createSparkKafkaSource(getTestConfig());
        createSparkKafkaSource.enableMemoryStream(false);
        createSparkKafkaSource.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        ((StreamingEntry) Mockito.doNothing().when(streamingEntry)).getOrCreateSparkSession(KylinBuildEnv.getOrCreate(getTestConfig()).sparkConf());
        ((StreamingEntry) Mockito.doReturn(true).when(streamingEntry)).isJobOnCluster();
        ((StreamingEntry) Mockito.doReturn(123).when(streamingEntry)).reportApplicationInfo();
        streamingEntry.prepareBeforeExecute();
    }

    private void clearCheckpoint(String str) {
        File file = new File(getTestConfig().getStreamingBaseCheckpointLocation() + "/" + str);
        boolean z = false;
        for (int i = 0; !z && i < 5 && file.exists(); i++) {
            z = file.delete();
            StreamingUtils.sleep(5000L);
        }
    }

    @Test
    public void testIsRunning() {
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.setSparkSession(createSparkSession());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        ReflectionUtils.setField(streamingEntry, "gracefulStop", atomicBoolean);
        Assert.assertFalse(streamingEntry.isRunning());
        atomicBoolean.set(false);
        Assert.assertTrue(streamingEntry.isRunning());
        streamingEntry.getSparkSession().close();
        Assert.assertFalse(streamingEntry.isRunning());
    }

    @Test
    public void testStartJobExecutionIdCheckThread() {
        String[] strArr = {PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(strArr);
        streamingEntry.setSparkSession(createSparkSession());
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.streaming.job-execution-id-check-interval", "0m");
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when(streamingEntry.createRestSupport(testConfig)).thenReturn(new RestSupport(testConfig) { // from class: org.apache.kylin.streaming.app.StreamingEntryTest.2
            public RestResponse execute(HttpRequestBase httpRequestBase, Object obj) {
                return atomicInteger.getAndIncrement() < 3 ? RestResponse.ok(0) : RestResponse.ok(1);
            }
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ReflectionUtils.setField(streamingEntry, "gracefulStop", atomicBoolean);
        ReflectionUtils.setField((Object) streamingEntry, "jobExecId", (Object) 0);
        streamingEntry.getClass();
        AwaitUtils.await(streamingEntry::startJobExecutionIdCheckThread, 5000, () -> {
            atomicBoolean.set(true);
        });
        Assert.assertTrue(streamingEntry.getSparkSession().sparkContext().isStopped());
    }

    @Test
    public void testStartJobExecutionIdCheckThread_DiffJobExecId() {
        String[] strArr = {PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry streamingEntry = (StreamingEntry) Mockito.spy(new StreamingEntry());
        streamingEntry.parseParams(strArr);
        streamingEntry.setSparkSession(createSparkSession());
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.streaming.job-execution-id-check-interval", "0m");
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when(streamingEntry.createRestSupport(testConfig)).thenReturn(new RestSupport(testConfig) { // from class: org.apache.kylin.streaming.app.StreamingEntryTest.3
            public RestResponse execute(HttpRequestBase httpRequestBase, Object obj) {
                return atomicInteger.getAndIncrement() < 3 ? RestResponse.ok(0) : RestResponse.ok(1);
            }
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ReflectionUtils.setField(streamingEntry, "gracefulStop", atomicBoolean);
        ReflectionUtils.setField((Object) streamingEntry, "jobExecId", (Object) 3);
        streamingEntry.getClass();
        AwaitUtils.await(streamingEntry::startJobExecutionIdCheckThread, 1000, () -> {
            atomicBoolean.set(true);
        });
        Awaitility.waitAtMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(streamingEntry.getSparkSession().sparkContext().isStopped());
        });
    }

    @Test
    public void testGracefulStopInterface() {
        StreamingEntry streamingEntry = new StreamingEntry();
        streamingEntry.setStopFlag(true);
        Assert.assertTrue(streamingEntry.getStopFlag());
        streamingEntry.setStopFlag(false);
        Assert.assertFalse(streamingEntry.getStopFlag());
    }
}
