/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs.scheduler;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Locale;
import java.util.Map;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.streaming.jobs.scheduler.StreamingJobStatusWatcher;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class StreamingJobStatusWatcherTest
extends StreamingTestCase {
    private static String PROJECT = "streaming_test";
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testExecute() {
        ArrayList<String> runningJobs = new ArrayList<String>();
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingJobStatusWatcherTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        runningJobs.add("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobStatusWatcher watcher = new StreamingJobStatusWatcher();
        Map jobMap = (Map)ReflectionUtils.getField(watcher, "jobMap");
        Assert.assertTrue((boolean)jobMap.isEmpty());
        watcher.execute(runningJobs);
        jobMap = (Map)ReflectionUtils.getField(watcher, "jobMap");
        Assert.assertTrue((boolean)jobMap.containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_merge"));
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testJobMap() {
        ArrayList<String> runningJobs = new ArrayList<String>();
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingJobStatusWatcherTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> {
            SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
            Calendar cal = Calendar.getInstance();
            cal.setTimeInMillis(System.currentTimeMillis() - 120000L);
            copyForWrite.setLastUpdateTime(simpleFormat.format(cal.getTime()));
            copyForWrite.setCurrentStatus(JobStatusEnum.ERROR);
        });
        runningJobs.add("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobStatusWatcher watcher = new StreamingJobStatusWatcher();
        Map jobMap = (Map)ReflectionUtils.getField(watcher, "jobMap");
        Assert.assertTrue((boolean)jobMap.isEmpty());
        watcher.execute(runningJobs);
        jobMap = (Map)ReflectionUtils.getField(watcher, "jobMap");
        Assert.assertTrue((boolean)jobMap.containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_merge"));
        jobMap.clear();
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> {
            SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
            Calendar cal = Calendar.getInstance();
            cal.setTimeInMillis(System.currentTimeMillis() - 3000000L);
            copyForWrite.setLastUpdateTime(simpleFormat.format(cal.getTime()));
            copyForWrite.setCurrentStatus(JobStatusEnum.ERROR);
        });
        jobMap = (Map)ReflectionUtils.getField(watcher, "jobMap");
        Assert.assertFalse((boolean)jobMap.containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_merge"));
    }

    @Test
    public void testKillBuildJob() {
        ArrayList<String> runningJobs = new ArrayList<String>();
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingJobStatusWatcherTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        runningJobs.add("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        StreamingJobStatusWatcher watcher = new StreamingJobStatusWatcher();
        for (int i = 0; i < 5; ++i) {
            watcher.execute(runningJobs);
        }
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        Assert.assertEquals((Object)JobStatusEnum.ERROR, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testKillMergeJob() {
        ArrayList<String> runningJobs = new ArrayList<String>();
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingJobStatusWatcherTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.STOPPING));
        runningJobs.add("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobStatusWatcher watcher = new StreamingJobStatusWatcher();
        for (int i = 0; i < 8; ++i) {
            watcher.execute(runningJobs);
        }
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.ERROR, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testKillStartingJob() {
        ArrayList<String> runningJobs = new ArrayList<String>();
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingJobStatusWatcherTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.STARTING));
        runningJobs.add("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobStatusWatcher watcher = new StreamingJobStatusWatcher();
        for (int i = 0; i < 8; ++i) {
            watcher.execute(runningJobs);
        }
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.ERROR, (Object)mergeJobMeta.getCurrentStatus());
    }
}

