package org.apache.kylin.streaming.jobs.scheduler;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Locale;
import java.util.Map;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/scheduler/StreamingJobStatusWatcherTest.class */
public class StreamingJobStatusWatcherTest extends StreamingTestCase {
    private static String PROJECT = "streaming_test";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testExecute() {
        ArrayList arrayList = new ArrayList();
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        arrayList.add("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobStatusWatcher streamingJobStatusWatcher = new StreamingJobStatusWatcher();
        Assert.assertTrue(((Map) ReflectionUtils.getField(streamingJobStatusWatcher, "jobMap")).isEmpty());
        streamingJobStatusWatcher.execute(arrayList);
        Assert.assertTrue(((Map) ReflectionUtils.getField(streamingJobStatusWatcher, "jobMap")).containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_merge"));
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge").getCurrentStatus());
    }

    @Test
    public void testJobMap() {
        ArrayList arrayList = new ArrayList();
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta -> {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
            Calendar calendar = Calendar.getInstance();
            calendar.setTimeInMillis(System.currentTimeMillis() - 120000);
            streamingJobMeta.setLastUpdateTime(simpleDateFormat.format(calendar.getTime()));
            streamingJobMeta.setCurrentStatus(JobStatusEnum.ERROR);
        });
        arrayList.add("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobStatusWatcher streamingJobStatusWatcher = new StreamingJobStatusWatcher();
        Assert.assertTrue(((Map) ReflectionUtils.getField(streamingJobStatusWatcher, "jobMap")).isEmpty());
        streamingJobStatusWatcher.execute(arrayList);
        Map map = (Map) ReflectionUtils.getField(streamingJobStatusWatcher, "jobMap");
        Assert.assertTrue(map.containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_merge"));
        map.clear();
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta2 -> {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
            Calendar calendar = Calendar.getInstance();
            calendar.setTimeInMillis(System.currentTimeMillis() - 3000000);
            streamingJobMeta2.setLastUpdateTime(simpleDateFormat.format(calendar.getTime()));
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.ERROR);
        });
        Assert.assertFalse(((Map) ReflectionUtils.getField(streamingJobStatusWatcher, "jobMap")).containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_merge"));
    }

    @Test
    public void testKillBuildJob() {
        ArrayList arrayList = new ArrayList();
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        arrayList.add("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        StreamingJobStatusWatcher streamingJobStatusWatcher = new StreamingJobStatusWatcher();
        for (int i = 0; i < 5; i++) {
            streamingJobStatusWatcher.execute(arrayList);
        }
        Assert.assertEquals(JobStatusEnum.ERROR, streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build").getCurrentStatus());
    }

    @Test
    public void testKillMergeJob() {
        ArrayList arrayList = new ArrayList();
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.STOPPING);
        });
        arrayList.add("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobStatusWatcher streamingJobStatusWatcher = new StreamingJobStatusWatcher();
        for (int i = 0; i < 8; i++) {
            streamingJobStatusWatcher.execute(arrayList);
        }
        Assert.assertEquals(JobStatusEnum.ERROR, streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge").getCurrentStatus());
    }

    @Test
    public void testKillStartingJob() {
        ArrayList arrayList = new ArrayList();
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.STARTING);
        });
        arrayList.add("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobStatusWatcher streamingJobStatusWatcher = new StreamingJobStatusWatcher();
        for (int i = 0; i < 8; i++) {
            streamingJobStatusWatcher.execute(arrayList);
        }
        Assert.assertEquals(JobStatusEnum.ERROR, streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge").getCurrentStatus());
    }
}
