/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.app;

import java.io.File;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StreamingTestConstant;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree;
import org.apache.kylin.metadata.cube.model.NCubeJoinedFlatTableDesc;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.kafka.NSparkKafkaSource;
import org.apache.kylin.streaming.CreateStreamingFlatTable;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.common.CreateFlatTableEntry;
import org.apache.kylin.streaming.jobs.StreamingJobUtils;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.AwaitUtils;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.streaming.Trigger;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple3;

public class StreamingEntryTest
extends StreamingTestCase {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingEntryTest.class);
    private static String PROJECT = "streaming_test";
    private static String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testBuild() {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)config, (String)PROJECT);
        NDataflow df = dfMgr.getDataflow(DATAFLOW_ID);
        NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
        update.setToRemoveSegsWithArray((NDataSegment[])df.getSegments().toArray((Object[])new NDataSegment[0]));
        dfMgr.updateDataflow(update);
        df = dfMgr.getDataflow(df.getId());
        NDataSegment seg1 = dfMgr.appendSegmentForStreaming(df, (SegmentRange)this.createSegmentRange());
        seg1.setStatus(SegmentStatusEnum.READY);
        update = new NDataflowUpdate(df.getUuid());
        update.setToRemoveSegsWithArray((NDataSegment[])df.getSegments().toArray((Object[])new NDataSegment[0]));
        dfMgr.updateDataflow(update);
        NCubeJoinedFlatTableDesc flatTableDesc = new NCubeJoinedFlatTableDesc(df.getIndexPlan());
        Set layouts = StreamingUtils.getToBuildLayouts((NDataflow)df);
        Assert.assertNotNull((Object)layouts);
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "1", "", "xx"};
        StreamingEntry entry = new StreamingEntry();
        entry.parseParams(args);
        NSpanningTree nSpanningTree = entry.createSpanningTree(df);
        Assert.assertNotNull((Object)nSpanningTree);
        SparkSession ss = this.createSparkSession();
        CreateFlatTableEntry flatTableEntry = new CreateFlatTableEntry((IJoinedFlatTableDesc)flatTableDesc, null, nSpanningTree, ss, null, null, null, "org.apache.kylin.parser.TimedJsonStreamParser");
        CreateStreamingFlatTable flatTable = CreateStreamingFlatTable.apply((CreateFlatTableEntry)flatTableEntry);
        Dataset ds = flatTable.generateStreamingDataset(config);
        Assert.assertEquals((long)1L, (long)ds.count());
        NDataModel model = flatTableDesc.getDataModel();
        TableDesc tableDesc = model.getRootFactTable().getTableDesc();
        Map kafkaParam = tableDesc.getKafkaConfig().getKafkaParam();
        Assert.assertEquals((Object)"earliest", kafkaParam.get("startingOffsets"));
        HashMap<String, String> jobParams = new HashMap<String, String>();
        jobParams.put("kylin.streaming.kafka-conf.startingOffsets", "latest");
        KylinConfig newConfig = StreamingJobUtils.getStreamingKylinConfig((KylinConfig)config, jobParams, (String)model.getId(), (String)PROJECT);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        flatTable.generateStreamingDataset(newConfig);
        model = flatTableDesc.getDataModel();
        tableDesc = model.getRootFactTable().getTableDesc();
        kafkaParam = tableDesc.getKafkaConfig().getKafkaParam();
        Assert.assertEquals((Object)"latest", kafkaParam.get("startingOffsets"));
        ss.stop();
        Assert.assertEquals((Object)"LO_PARTITIONCOLUMN", (Object)flatTable.partitionColumn());
    }

    @Test
    public void testMain() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        this.clearCheckpoint(DATAFLOW_ID);
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(true);
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "1", "", ""};
        this.thrown.expect(Exception.class);
        StreamingEntry.main((String[])args);
    }

    @Test
    public void testInitBuildEntry_EmptyCheckPoint() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        config.setProperty("kylin.engine.streaming-checkpoint-location", "");
        this.thrown.expectMessage("base checkpoint location must be configured,");
        new StreamingEntry();
    }

    @Test
    public void testInitBuildEntry_TriggerOnce() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        config.setProperty("kylin.engine.streaming-trigger-once", "true");
        StreamingEntry entry = new StreamingEntry();
        Trigger trigger = entry.trigger();
        Assert.assertNotNull((Object)trigger);
        Assert.assertTrue((boolean)(trigger instanceof OneTimeTrigger$));
        config = StreamingEntryTest.getTestConfig();
        config.setProperty("kylin.engine.streaming-trigger-once", "false");
        entry = new StreamingEntry();
        trigger = entry.trigger();
        Assert.assertNotNull((Object)trigger);
        Assert.assertTrue((boolean)(trigger instanceof ProcessingTimeTrigger));
    }

    @Test
    public void testInitBuildEntry_DataFlow() {
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(args);
        NDataflow dataflow = entry.dataflow();
        Assert.assertNotNull((Object)dataflow);
    }

    @Test
    public void testExecute() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        this.clearCheckpoint(DATAFLOW_ID);
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(true);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(args);
        entry.setSparkSession(this.createSparkSession());
        StreamingEntry.entry_$eq((StreamingEntry)entry);
        Assert.assertNotNull((Object)StreamingEntry.entry());
        Mockito.when((Object)entry.createRestSupport(config)).thenReturn((Object)new RestSupport(config){

            public RestResponse execute(HttpRequestBase httpReqBase, Object param) {
                StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)NLocalFileMetadataTestCase.getTestConfig(), (String)PROJECT);
                String jobId = DATAFLOW_ID + "_build";
                mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setJobExecutionId(Integer.valueOf(1)));
                return RestResponse.ok((Object)"1");
            }
        });
        AwaitUtils.await(() -> {}, 5000, () -> {
            StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)StreamingEntryTest.getTestConfig(), (String)PROJECT);
            String jobId = DATAFLOW_ID + "_build";
            mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setAction("GRACEFUL_SHUTDOWN"));
        });
        try {
            entry.doExecute();
            Assert.assertTrue((boolean)entry.getSparkSession().sparkContext().isStopped());
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        this.clearCheckpoint(DATAFLOW_ID);
    }

    @Test
    public void testExecute_IllegalProject() {
        String illegalProject = "xxxx";
        String[] args = new String[]{"xxxx", DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(args);
        this.thrown.expectMessage("metastore can not find this project xxxx");
        entry.doExecute();
    }

    @Test
    public void testExecute_EmptyFlatTableDataSet() {
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(args);
        ((StreamingEntry)Mockito.doNothing().when((Object)entry)).registerStreamListener();
        ((StreamingEntry)Mockito.doReturn((Object)new Tuple3(null, null, null)).when((Object)entry)).generateStreamQueryForOneModel();
        this.thrown.expectMessage(String.format(Locale.ROOT, "generate query for one model failed for project:  %s dataflowId: %s", PROJECT, DATAFLOW_ID));
        entry.doExecute();
    }

    @Test
    public void testExecute_EmptyTimeColumn() {
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(args);
        ((StreamingEntry)Mockito.doNothing().when((Object)entry)).registerStreamListener();
        SparkSession sparkSession = this.createSparkSession();
        ((StreamingEntry)Mockito.doReturn((Object)new Tuple3((Object)sparkSession.range(1L), null, null)).when((Object)entry)).generateStreamQueryForOneModel();
        this.thrown.expectMessage(String.format(Locale.ROOT, "streaming query must have time partition column for project:  %s dataflowId: %s", PROJECT, DATAFLOW_ID));
        entry.doExecute();
    }

    @Test
    public void testDimensionTableRefresh() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String dataflowId = "511a9163-7888-4a60-aa24-ae735937cc87";
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        entry.setSparkSession(this.createSparkSession());
        Tuple3 tuple4 = entry.generateStreamQueryForOneModel();
        ReflectionUtils.setField(entry, "rateTriggerDuration", (Object)1000L);
        Assert.assertEquals((Object)1000L, (Object)ReflectionUtils.getField(entry, "rateTriggerDuration"));
        CreateStreamingFlatTable flatTable = (CreateStreamingFlatTable)tuple4._3();
        flatTable.tableRefreshInterval_$eq(5L);
        Assert.assertEquals((Object)"511a9163-7888-4a60-aa24-ae735937cc87", (Object)flatTable.model().getId());
        try {
            entry.startTableRefreshThread(flatTable);
            AwaitUtils.await(() -> {}, 10000, () -> {
                entry.refreshTable(flatTable);
                Assert.assertEquals((long)0L, (long)entry.tableRefreshAcc().get());
                StreamingEntry.stop();
                entry.ss.stop();
            });
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDimensionTableRefresh_Skip() {
        CreateStreamingFlatTable fakeStreamingTable = (CreateStreamingFlatTable)Mockito.spy((Object)new CreateStreamingFlatTable(new CreateFlatTableEntry(null, null, null, null, null, null, null, null)));
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.refreshTable(fakeStreamingTable);
        ((CreateStreamingFlatTable)Mockito.doReturn((Object)false).when((Object)fakeStreamingTable)).shouldRefreshTable();
        Assert.assertFalse((boolean)fakeStreamingTable.shouldRefreshTable());
        entry.refreshTable(fakeStreamingTable);
        fakeStreamingTable = (CreateStreamingFlatTable)Mockito.spy((Object)new CreateStreamingFlatTable(new CreateFlatTableEntry(null, null, null, null, null, null, null, null)));
        entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.refreshTable(fakeStreamingTable);
        ((CreateStreamingFlatTable)Mockito.doReturn((Object)true).when((Object)fakeStreamingTable)).shouldRefreshTable();
        ((CreateStreamingFlatTable)Mockito.doReturn((Object)100L).when((Object)fakeStreamingTable)).tableRefreshInterval();
        Assert.assertTrue((boolean)fakeStreamingTable.shouldRefreshTable());
        Assert.assertTrue((entry.tableRefreshAcc().get() < fakeStreamingTable.tableRefreshInterval() ? 1 : 0) != 0);
        entry.refreshTable(fakeStreamingTable);
    }

    @Test
    public void testDimensionTableRefresh_SkipRefreshThread() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String dataflowId = "511a9163-7888-4a60-aa24-ae735937cc87";
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        entry.setSparkSession(this.createSparkSession());
        Tuple3 tuple4 = entry.generateStreamQueryForOneModel();
        ReflectionUtils.setField(entry, "rateTriggerDuration", (Object)1000L);
        Assert.assertEquals((Object)1000L, (Object)ReflectionUtils.getField(entry, "rateTriggerDuration"));
        CreateStreamingFlatTable flatTable = (CreateStreamingFlatTable)tuple4._3();
        flatTable.tableRefreshInterval_$eq(-1L);
        entry.startTableRefreshThread(flatTable);
        Assert.assertFalse((boolean)flatTable.shouldRefreshTable());
    }

    @Test
    public void testDimensionTableRefresh_Running() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String dataflowId = "511a9163-7888-4a60-aa24-ae735937cc87";
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        entry.setSparkSession(this.createSparkSession());
        Tuple3 tuple4 = entry.generateStreamQueryForOneModel();
        ReflectionUtils.setField(entry, "rateTriggerDuration", (Object)100L);
        Assert.assertEquals((Object)100L, (Object)ReflectionUtils.getField(entry, "rateTriggerDuration"));
        CreateStreamingFlatTable flatTable = (CreateStreamingFlatTable)tuple4._3();
        flatTable.tableRefreshInterval_$eq(1L);
        Assert.assertTrue((boolean)flatTable.shouldRefreshTable());
        entry.startTableRefreshThread(flatTable);
        Awaitility.waitAtMost((long)30L, (TimeUnit)TimeUnit.SECONDS).until(() -> entry.tableRefreshAcc().get() > 0L);
    }

    @Test
    public void testDimensionTableRefresh_NotRunning() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String dataflowId = "511a9163-7888-4a60-aa24-ae735937cc87";
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        entry.setSparkSession(this.createSparkSession());
        Tuple3 tuple4 = entry.generateStreamQueryForOneModel();
        long refreshTableInterval = 100L;
        ReflectionUtils.setField(entry, "rateTriggerDuration", (Object)100L);
        Assert.assertEquals((Object)100L, (Object)ReflectionUtils.getField(entry, "rateTriggerDuration"));
        CreateStreamingFlatTable flatTable = (CreateStreamingFlatTable)tuple4._3();
        flatTable.tableRefreshInterval_$eq(1L);
        Assert.assertTrue((boolean)flatTable.shouldRefreshTable());
        entry.setStopFlag(true);
        entry.startTableRefreshThread(flatTable);
        AwaitUtils.sleep(Long.valueOf(1000L).intValue());
        Assert.assertEquals((long)0L, (long)entry.tableRefreshAcc().get());
    }

    @Test
    public void testPrepareKylinConfig() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String dataflowId = "511a9163-7888-4a60-aa24-ae735937cc87";
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        ReflectionUtils.invokeGetterMethod(entry, "prepareKylinConfig");
        Assert.assertEquals((Object)"xx", (Object)config.getMetadataUrl().toString());
    }

    @Test
    public void testInitMetaPathSet() {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String dataflowId = "511a9163-7888-4a60-aa24-ae735937cc87";
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        Set metaPathSet = (Set)ReflectionUtils.invokeGetterMethod(entry, "initMetaPathSet");
        Assert.assertEquals((long)11L, (long)metaPathSet.size());
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/streaming/511a9163-7888-4a60-aa24-ae735937cc87_build"));
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/dataflow/511a9163-7888-4a60-aa24-ae735937cc87.json"));
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/index_plan/511a9163-7888-4a60-aa24-ae735937cc87.json"));
        Assert.assertTrue((boolean)metaPathSet.contains("/_global/project/streaming_test.json"));
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/model_desc/511a9163-7888-4a60-aa24-ae735937cc87.json"));
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/table/SSB.P_LINEORDER.json"));
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/kafka/SSB.P_LINEORDER.json"));
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/table/SSB.DATES.json"));
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/table/SSB.CUSTOMER.json"));
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/table/SSB.SUPPLIER.json"));
        Assert.assertTrue((boolean)metaPathSet.contains("/streaming_test/table/SSB.PART.json"));
    }

    @Test
    public void testPrepareBeforeExecute_Local() throws ExecuteException {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String dataflowId = "511a9163-7888-4a60-aa24-ae735937cc87";
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        SparkConf sparkConf = KylinBuildEnv.getOrCreate((KylinConfig)StreamingEntryTest.getTestConfig()).sparkConf();
        ((StreamingEntry)Mockito.doNothing().when((Object)entry)).getOrCreateSparkSession(sparkConf);
        ((StreamingEntry)Mockito.doReturn((Object)123).when((Object)entry)).reportApplicationInfo();
        entry.prepareBeforeExecute();
    }

    @Test
    public void testPrepareBeforeExecute_JobOnCluster() throws ExecuteException {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        NSparkKafkaSource source = this.createSparkKafkaSource(config);
        source.enableMemoryStream(false);
        source.post(StreamingTestConstant.KAP_SSB_STREAMING_JSON_FILE());
        String dataflowId = "511a9163-7888-4a60-aa24-ae735937cc87";
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(new String[]{PROJECT, "511a9163-7888-4a60-aa24-ae735937cc87", "5", "", "xx"});
        SparkConf sparkConf = KylinBuildEnv.getOrCreate((KylinConfig)StreamingEntryTest.getTestConfig()).sparkConf();
        ((StreamingEntry)Mockito.doNothing().when((Object)entry)).getOrCreateSparkSession(sparkConf);
        ((StreamingEntry)Mockito.doReturn((Object)true).when((Object)entry)).isJobOnCluster();
        ((StreamingEntry)Mockito.doReturn((Object)123).when((Object)entry)).reportApplicationInfo();
        entry.prepareBeforeExecute();
    }

    private void clearCheckpoint(String dataflowId) {
        KylinConfig config = StreamingEntryTest.getTestConfig();
        File checkpointFile = new File(config.getStreamingBaseCheckpointLocation() + "/" + dataflowId);
        boolean result = false;
        for (int retry = 0; !result && retry < 5 && checkpointFile.exists(); ++retry) {
            result = checkpointFile.delete();
            StreamingUtils.sleep((long)5000L);
        }
    }

    @Test
    public void testIsRunning() {
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.setSparkSession(this.createSparkSession());
        AtomicBoolean stopFlag = new AtomicBoolean(true);
        ReflectionUtils.setField(entry, "gracefulStop", (Object)stopFlag);
        Assert.assertFalse((boolean)entry.isRunning());
        stopFlag.set(false);
        Assert.assertTrue((boolean)entry.isRunning());
        entry.getSparkSession().close();
        Assert.assertFalse((boolean)entry.isRunning());
    }

    @Test
    public void testStartJobExecutionIdCheckThread() {
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(args);
        entry.setSparkSession(this.createSparkSession());
        KylinConfig config = StreamingEntryTest.getTestConfig();
        config.setProperty("kylin.streaming.job-execution-id-check-interval", "0m");
        final AtomicInteger counter = new AtomicInteger(0);
        Mockito.when((Object)entry.createRestSupport(config)).thenReturn((Object)new RestSupport(config){

            public RestResponse execute(HttpRequestBase httpReqBase, Object param) {
                if (counter.getAndIncrement() < 3) {
                    return RestResponse.ok((Object)0);
                }
                return RestResponse.ok((Object)1);
            }
        });
        AtomicBoolean stopFlag = new AtomicBoolean(false);
        ReflectionUtils.setField(entry, "gracefulStop", (Object)stopFlag);
        ReflectionUtils.setField(entry, "jobExecId", (Object)0);
        AwaitUtils.await(() -> ((StreamingEntry)entry).startJobExecutionIdCheckThread(), 5000, () -> stopFlag.set(true));
        Assert.assertTrue((boolean)entry.getSparkSession().sparkContext().isStopped());
    }

    @Test
    public void testStartJobExecutionIdCheckThread_DiffJobExecId() {
        String[] args = new String[]{PROJECT, DATAFLOW_ID, "2", "", "xx"};
        StreamingEntry entry = (StreamingEntry)Mockito.spy((Object)new StreamingEntry());
        entry.parseParams(args);
        entry.setSparkSession(this.createSparkSession());
        KylinConfig config = StreamingEntryTest.getTestConfig();
        config.setProperty("kylin.streaming.job-execution-id-check-interval", "0m");
        final AtomicInteger counter = new AtomicInteger(0);
        Mockito.when((Object)entry.createRestSupport(config)).thenReturn((Object)new RestSupport(config){

            public RestResponse execute(HttpRequestBase httpReqBase, Object param) {
                if (counter.getAndIncrement() < 3) {
                    return RestResponse.ok((Object)0);
                }
                return RestResponse.ok((Object)1);
            }
        });
        AtomicBoolean stopFlag = new AtomicBoolean(false);
        ReflectionUtils.setField(entry, "gracefulStop", (Object)stopFlag);
        ReflectionUtils.setField(entry, "jobExecId", (Object)3);
        AwaitUtils.await(() -> ((StreamingEntry)entry).startJobExecutionIdCheckThread(), 1000, () -> stopFlag.set(true));
        Awaitility.waitAtMost((long)30L, (TimeUnit)TimeUnit.SECONDS).until(() -> entry.getSparkSession().sparkContext().isStopped());
    }

    @Test
    public void testGracefulStopInterface() {
        StreamingEntry gracefulStop = new StreamingEntry();
        gracefulStop.setStopFlag(true);
        Assert.assertTrue((boolean)gracefulStop.getStopFlag());
        gracefulStop.setStopFlag(false);
        Assert.assertFalse((boolean)gracefulStop.getStopFlag());
    }
}

