/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.event.StreamingJobDropEvent;
import org.apache.kylin.streaming.event.StreamingJobKillEvent;
import org.apache.kylin.streaming.event.StreamingJobMetaCleanEvent;
import org.apache.kylin.streaming.jobs.StreamingJobListener;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.launcher.SparkAppHandle;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;

public class StreamingJobListenerTest
extends StreamingTestCase {
    private static final String PROJECT = "streaming_test";
    private static final String MODEL_ID = "e78a89dd-847f-4574-8afa-8768b4228b72";
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public TestName testName = new TestName();
    private final StreamingJobListener eventListener = new StreamingJobListener();

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
        EventBusFactory.getInstance().register((Object)this.eventListener, true);
    }

    @After
    public void tearDown() {
        EventBusFactory.getInstance().unregister((Object)this.eventListener);
        EventBusFactory.getInstance().restart();
        this.cleanupTestMetadata();
    }

    @Test
    public void testStateChangedToRunning() {
        String jobId = StreamingUtils.getJobId((String)MODEL_ID, (String)JobTypeEnum.STREAMING_BUILD.toString());
        StreamingJobListener listener = new StreamingJobListener(PROJECT, jobId);
        KylinConfig testConfig = StreamingJobListenerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setSkipListener(true));
        listener.stateChanged(this.mockRunningState());
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
        Assert.assertFalse((boolean)jobMeta.isSkipListener());
    }

    @Test
    public void testStateChangedToFailure() {
        String jobId = StreamingUtils.getJobId((String)MODEL_ID, (String)JobTypeEnum.STREAMING_BUILD.toString());
        KylinConfig testConfig = StreamingJobListenerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        StreamingJobListener listener = new StreamingJobListener(PROJECT, jobId);
        listener.stateChanged(this.mockFailedState());
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.ERROR, (Object)jobMeta.getCurrentStatus());
        mgr.updateStreamingJob(jobId, copyForWrite -> {
            copyForWrite.setCurrentStatus(JobStatusEnum.STOPPING);
            copyForWrite.setSkipListener(true);
        });
        listener.stateChanged(this.mockFailedState());
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPING, (Object)jobMeta.getCurrentStatus());
        mgr.updateStreamingJob(jobId, copyForWrite -> {
            SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
            Calendar cal = Calendar.getInstance();
            cal.setTimeInMillis(System.currentTimeMillis() - 120000L);
            copyForWrite.setLastUpdateTime(simpleFormat.format(cal.getTime()));
        });
        listener.stateChanged(this.mockKilledState());
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPING, (Object)jobMeta.getCurrentStatus());
    }

    @Test
    public void testStateChangedToKilled() {
        String jobId = StreamingUtils.getJobId((String)MODEL_ID, (String)JobTypeEnum.STREAMING_BUILD.toString());
        KylinConfig testConfig = StreamingJobListenerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        StreamingJobListener listener = new StreamingJobListener(PROJECT, jobId);
        listener.stateChanged(this.mockKilledState());
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.ERROR, (Object)jobMeta.getCurrentStatus());
        mgr.updateStreamingJob(jobId, copyForWrite -> {
            copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING);
            copyForWrite.setSkipListener(true);
        });
        listener.stateChanged(this.mockKilledState());
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
        listener.stateChanged(this.mockKilledState());
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
    }

    @Test
    public void testStateChangedToFinish() {
        String jobId = StreamingUtils.getJobId((String)MODEL_ID, (String)JobTypeEnum.STREAMING_BUILD.toString());
        StreamingJobListener listener = new StreamingJobListener(PROJECT, jobId);
        listener.stateChanged(this.mockRunningState());
        KylinConfig testConfig = StreamingJobListenerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
        listener.stateChanged(this.mockFinishedState());
        mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)jobMeta.getCurrentStatus());
    }

    @Test
    public void testOnStreamingJobKill() {
        String modelId = MODEL_ID;
        String project = PROJECT;
        KylinConfig config = StreamingJobListenerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)project);
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)buildJobMeta.getCurrentStatus());
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)mergeJobMeta.getCurrentStatus());
        EventBusFactory.getInstance().postSync((Object)new StreamingJobKillEvent(project, modelId));
        buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)buildJobMeta.getCurrentStatus());
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testOnStreamingJobDrop() {
        String modelId = MODEL_ID;
        String project = PROJECT;
        KylinConfig config = StreamingJobListenerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)project);
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertNotNull((Object)buildJobMeta);
        Assert.assertNotNull((Object)mergeJobMeta);
        EventBusFactory.getInstance().postSync((Object)new StreamingJobDropEvent(project, modelId));
        buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertNull((Object)buildJobMeta);
        Assert.assertNull((Object)mergeJobMeta);
    }

    @Test
    public void testOnStreamingJobMetaCleanEvent() throws IOException {
        File mainDir = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir((File)mainDir);
        Assert.assertTrue((boolean)mainDir.exists());
        EventBusFactory.getInstance().postSync((Object)new StreamingJobMetaCleanEvent(Collections.singletonList(new Path(mainDir.getAbsolutePath()))));
        Assert.assertFalse((boolean)mainDir.exists());
    }

    @Test
    public void testOnStreamingJobMetaCleanEvent_EmptyPath() throws IOException {
        File mainDir = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir((File)mainDir);
        Assert.assertTrue((boolean)mainDir.exists());
        EventBusFactory.getInstance().postSync((Object)new StreamingJobMetaCleanEvent(Collections.emptyList()));
        Assert.assertTrue((boolean)mainDir.exists());
    }

    @Test
    public void testOnStreamingJobMetaCleanEvent_InvalidPath() throws IOException {
        File mainDir = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir((File)mainDir);
        Assert.assertTrue((boolean)mainDir.exists());
        EventBusFactory.getInstance().postSync((Object)new StreamingJobMetaCleanEvent(Collections.singletonList(new Path(mainDir.getAbsolutePath() + "xxx"))));
        Assert.assertTrue((boolean)mainDir.exists());
    }

    private SparkAppHandle mockRunningState() {
        return new AbstractSparkAppHandle(){

            public SparkAppHandle.State getState() {
                return SparkAppHandle.State.RUNNING;
            }
        };
    }

    private SparkAppHandle mockFailedState() {
        return new AbstractSparkAppHandle(){

            public SparkAppHandle.State getState() {
                return SparkAppHandle.State.FAILED;
            }
        };
    }

    private SparkAppHandle mockKilledState() {
        return new AbstractSparkAppHandle(){

            public SparkAppHandle.State getState() {
                return SparkAppHandle.State.KILLED;
            }
        };
    }

    private SparkAppHandle mockFinishedState() {
        return new AbstractSparkAppHandle(){

            public SparkAppHandle.State getState() {
                return SparkAppHandle.State.FINISHED;
            }
        };
    }

    public static abstract class AbstractSparkAppHandle
    implements SparkAppHandle {
        public void addListener(SparkAppHandle.Listener listener) {
        }

        public String getAppId() {
            return "local-" + RandomUtil.randomUUID();
        }

        public void stop() {
        }

        public void kill() {
        }

        public void disconnect() {
        }

        public Optional<Throwable> getError() {
            return Optional.empty();
        }
    }
}

