package org.apache.kylin.streaming.jobs;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.event.StreamingJobDropEvent;
import org.apache.kylin.streaming.event.StreamingJobKillEvent;
import org.apache.kylin.streaming.event.StreamingJobMetaCleanEvent;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.launcher.SparkAppHandle;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/StreamingJobListenerTest.class */
public class StreamingJobListenerTest extends StreamingTestCase {
    private static final String PROJECT = "streaming_test";
    private static final String MODEL_ID = "e78a89dd-847f-4574-8afa-8768b4228b72";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public TestName testName = new TestName();
    private final StreamingJobListener eventListener = new StreamingJobListener();

    /* loaded from: input_file:org/apache/kylin/streaming/jobs/StreamingJobListenerTest$AbstractSparkAppHandle.class */
    public static abstract class AbstractSparkAppHandle implements SparkAppHandle {
        public void addListener(SparkAppHandle.Listener listener) {
        }

        public String getAppId() {
            return "local-" + RandomUtil.randomUUID();
        }

        public void stop() {
        }

        public void kill() {
        }

        public void disconnect() {
        }

        public Optional<Throwable> getError() {
            return Optional.empty();
        }
    }

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
        EventBusFactory.getInstance().register(this.eventListener, true);
    }

    @After
    public void tearDown() {
        EventBusFactory.getInstance().unregister(this.eventListener);
        EventBusFactory.getInstance().restart();
        cleanupTestMetadata();
    }

    @Test
    public void testStateChangedToRunning() {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.toString());
        StreamingJobListener streamingJobListener = new StreamingJobListener(PROJECT, jobId);
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob(jobId, streamingJobMeta -> {
            streamingJobMeta.setSkipListener(true);
        });
        streamingJobListener.stateChanged(mockRunningState());
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid(jobId);
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobByUuid.getCurrentStatus());
        Assert.assertFalse(streamingJobByUuid.isSkipListener());
    }

    @Test
    public void testStateChangedToFailure() {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.toString());
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob(jobId, streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        StreamingJobListener streamingJobListener = new StreamingJobListener(PROJECT, jobId);
        streamingJobListener.stateChanged(mockFailedState());
        Assert.assertEquals(JobStatusEnum.ERROR, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
        streamingJobManager.updateStreamingJob(jobId, streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.STOPPING);
            streamingJobMeta2.setSkipListener(true);
        });
        streamingJobListener.stateChanged(mockFailedState());
        Assert.assertEquals(JobStatusEnum.STOPPING, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
        streamingJobManager.updateStreamingJob(jobId, streamingJobMeta3 -> {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
            Calendar calendar = Calendar.getInstance();
            calendar.setTimeInMillis(System.currentTimeMillis() - 120000);
            streamingJobMeta3.setLastUpdateTime(simpleDateFormat.format(calendar.getTime()));
        });
        streamingJobListener.stateChanged(mockKilledState());
        Assert.assertEquals(JobStatusEnum.STOPPING, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
    }

    @Test
    public void testStateChangedToKilled() {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.toString());
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob(jobId, streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        StreamingJobListener streamingJobListener = new StreamingJobListener(PROJECT, jobId);
        streamingJobListener.stateChanged(mockKilledState());
        Assert.assertEquals(JobStatusEnum.ERROR, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
        streamingJobManager.updateStreamingJob(jobId, streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.RUNNING);
            streamingJobMeta2.setSkipListener(true);
        });
        streamingJobListener.stateChanged(mockKilledState());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
        streamingJobListener.stateChanged(mockKilledState());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
    }

    @Test
    public void testStateChangedToFinish() {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.toString());
        StreamingJobListener streamingJobListener = new StreamingJobListener(PROJECT, jobId);
        streamingJobListener.stateChanged(mockRunningState());
        KylinConfig testConfig = getTestConfig();
        Assert.assertEquals(JobStatusEnum.RUNNING, StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid(jobId).getCurrentStatus());
        streamingJobListener.stateChanged(mockFinishedState());
        Assert.assertEquals(JobStatusEnum.STOPPED, StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid(jobId).getCurrentStatus());
    }

    @Test
    public void testOnStreamingJobKill() {
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobByUuid.getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobByUuid2.getCurrentStatus());
        EventBusFactory.getInstance().postSync(new StreamingJobKillEvent(PROJECT, MODEL_ID));
        StreamingJobMeta streamingJobByUuid3 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid4 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid3.getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid4.getCurrentStatus());
    }

    @Test
    public void testOnStreamingJobDrop() {
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertNotNull(streamingJobByUuid);
        Assert.assertNotNull(streamingJobByUuid2);
        EventBusFactory.getInstance().postSync(new StreamingJobDropEvent(PROJECT, MODEL_ID));
        StreamingJobMeta streamingJobByUuid3 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid4 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertNull(streamingJobByUuid3);
        Assert.assertNull(streamingJobByUuid4);
    }

    @Test
    public void testOnStreamingJobMetaCleanEvent() throws IOException {
        File file = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir(file);
        Assert.assertTrue(file.exists());
        EventBusFactory.getInstance().postSync(new StreamingJobMetaCleanEvent(Collections.singletonList(new Path(file.getAbsolutePath()))));
        Assert.assertFalse(file.exists());
    }

    @Test
    public void testOnStreamingJobMetaCleanEvent_EmptyPath() throws IOException {
        File file = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir(file);
        Assert.assertTrue(file.exists());
        EventBusFactory.getInstance().postSync(new StreamingJobMetaCleanEvent(Collections.emptyList()));
        Assert.assertTrue(file.exists());
    }

    @Test
    public void testOnStreamingJobMetaCleanEvent_InvalidPath() throws IOException {
        File file = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir(file);
        Assert.assertTrue(file.exists());
        EventBusFactory.getInstance().postSync(new StreamingJobMetaCleanEvent(Collections.singletonList(new Path(file.getAbsolutePath() + "xxx"))));
        Assert.assertTrue(file.exists());
    }

    private SparkAppHandle mockRunningState() {
        return new AbstractSparkAppHandle() { // from class: org.apache.kylin.streaming.jobs.StreamingJobListenerTest.1
            public SparkAppHandle.State getState() {
                return SparkAppHandle.State.RUNNING;
            }
        };
    }

    private SparkAppHandle mockFailedState() {
        return new AbstractSparkAppHandle() { // from class: org.apache.kylin.streaming.jobs.StreamingJobListenerTest.2
            public SparkAppHandle.State getState() {
                return SparkAppHandle.State.FAILED;
            }
        };
    }

    private SparkAppHandle mockKilledState() {
        return new AbstractSparkAppHandle() { // from class: org.apache.kylin.streaming.jobs.StreamingJobListenerTest.3
            public SparkAppHandle.State getState() {
                return SparkAppHandle.State.KILLED;
            }
        };
    }

    private SparkAppHandle mockFinishedState() {
        return new AbstractSparkAppHandle() { // from class: org.apache.kylin.streaming.jobs.StreamingJobListenerTest.4
            public SparkAppHandle.State getState() {
                return SparkAppHandle.State.FINISHED;
            }
        };
    }
}
