package org.apache.kylin.rest.config.initialize;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kylin.common.scheduler.EpochStartedNotifier;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.scheduler.JobFinishedNotifier;
import org.apache.kylin.common.scheduler.JobReadyNotifier;
import org.apache.kylin.common.scheduler.ProjectControlledNotifier;
import org.apache.kylin.common.scheduler.ProjectEscapedNotifier;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.DefaultExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.FiveSecondSucceedTestExecutable;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.rest.service.UserAclService;
import org.apache.kylin.rest.service.UserService;
import org.apache.kylin.rest.service.task.RecommendationTopNUpdateScheduler;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.security.authentication.TestingAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.test.util.ReflectionTestUtils;

/* loaded from: input_file:org/apache/kylin/rest/config/initialize/SchedulerEventBusTest.class */
public class SchedulerEventBusTest extends NLocalFileMetadataTestCase {
    private static final Logger logger = LoggerFactory.getLogger(SchedulerEventBusTest.class);
    private static final String PROJECT = "default";
    private static final String PROJECT_NEWTEN = "newten";
    private final AtomicInteger readyCalledCount = new AtomicInteger(0);
    private final AtomicInteger jobFinishedCalledCount = new AtomicInteger(0);

    @Before
    public void setup() {
        logger.info("SchedulerEventBusTest setup");
        createTestMetadata(new String[0]);
        SecurityContextHolder.getContext().setAuthentication(new TestingAuthenticationToken("ADMIN", "ADMIN", new String[]{"ROLE_ADMIN"}));
        overwriteSystemProp("kylin.job.max-local-consumption-ratio", "10");
        NDefaultScheduler.getInstance(PROJECT_NEWTEN).init(new JobEngineConfig(getTestConfig()));
        NDefaultScheduler.getInstance("default").init(new JobEngineConfig(getTestConfig()));
        JobSchedulerListener jobSchedulerListener = new JobSchedulerListener();
        JobSchedulerListener jobSchedulerListener2 = (JobSchedulerListener) Mockito.spy(jobSchedulerListener);
        JobSyncListener jobSyncListener = new JobSyncListener();
        JobSyncListener jobSyncListener2 = (JobSyncListener) Mockito.spy(jobSyncListener);
        ((JobSchedulerListener) Mockito.doAnswer(invocationOnMock -> {
            jobSchedulerListener.onJobIsReady((JobReadyNotifier) invocationOnMock.getArgument(0));
            this.readyCalledCount.incrementAndGet();
            return null;
        }).when(jobSchedulerListener2)).onJobIsReady((JobReadyNotifier) Mockito.any());
        ((JobSyncListener) Mockito.doAnswer(invocationOnMock2 -> {
            jobSyncListener.onJobFinished((JobFinishedNotifier) invocationOnMock2.getArgument(0));
            this.jobFinishedCalledCount.incrementAndGet();
            return null;
        }).when(jobSyncListener2)).onJobFinished((JobFinishedNotifier) Mockito.any());
        EventBusFactory.getInstance().register(jobSchedulerListener, false);
        EventBusFactory.getInstance().register(jobSyncListener, false);
    }

    @After
    public void cleanup() {
        logger.info("SchedulerEventBusTest cleanup");
        EventBusFactory.getInstance().restart();
        this.readyCalledCount.set(0);
        this.jobFinishedCalledCount.set(0);
        cleanupTestMetadata();
    }

    @Test
    @Ignore
    public void testJobSchedulerListener() throws InterruptedException {
        logger.info("SchedulerEventBusTest testJobSchedulerListener");
        overwriteSystemProp("kylin.scheduler.schedule-limit-per-minute", "6000");
        Assert.assertEquals(0L, this.readyCalledCount.get());
        Assert.assertEquals(0L, this.jobFinishedCalledCount.get());
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), "default").getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutable defaultExecutable = new DefaultExecutable();
        defaultExecutable.setProject("default");
        defaultExecutable.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutable.setJobType(JobTypeEnum.INC_BUILD);
        defaultExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondSucceedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutable.addTask(fiveSecondSucceedTestExecutable);
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(getTestConfig(), "default");
        nExecutableManager.addJob(defaultExecutable);
        Thread.sleep(100L);
        Assert.assertEquals(1L, this.readyCalledCount.get());
        Assert.assertEquals(0L, this.jobFinishedCalledCount.get());
        Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.SUCCEED, nExecutableManager.getOutput(defaultExecutable.getId()).getState());
            Assert.assertEquals(ExecutableState.SUCCEED, nExecutableManager.getOutput(fiveSecondSucceedTestExecutable.getId()).getState());
            Assert.assertEquals(1L, this.jobFinishedCalledCount.get());
        });
    }

    @Test
    public void testEpochChangedListener() throws Exception {
        EpochChangedListener epochChangedListener = new EpochChangedListener();
        RecommendationTopNUpdateScheduler recommendationTopNUpdateScheduler = new RecommendationTopNUpdateScheduler();
        ReflectionTestUtils.setField(epochChangedListener, "recommendationUpdateScheduler", recommendationTopNUpdateScheduler);
        NProjectManager.getInstance(getTestConfig()).createProject("test_epoch", "ADMIN", "", (LinkedHashMap) null);
        int size = NDefaultScheduler.listAllSchedulers().size();
        epochChangedListener.onProjectControlled(new ProjectControlledNotifier("test_epoch"));
        Assert.assertEquals(NDefaultScheduler.listAllSchedulers().size(), size + 1);
        epochChangedListener.onProjectEscaped(new ProjectEscapedNotifier("test_epoch"));
        Assert.assertEquals(NDefaultScheduler.listAllSchedulers().size(), size);
        recommendationTopNUpdateScheduler.close();
    }

    @Test
    public void testEpochChangedListenerOfGlobalPrj() throws Exception {
        EpochChangedListener epochChangedListener = new EpochChangedListener();
        ReflectionTestUtils.setField(epochChangedListener, "userService", Mockito.spy(UserService.class));
        ReflectionTestUtils.setField(epochChangedListener, "env", Mockito.spy(Environment.class));
        UserAclService userAclService = (UserAclService) Mockito.spy(UserAclService.class);
        ReflectionTestUtils.setField(epochChangedListener, "userAclService", userAclService);
        ((UserAclService) Mockito.doNothing().when(userAclService)).syncAdminUserAcl();
        NProjectManager.getInstance(getTestConfig()).createProject("test_epoch", "ADMIN", "", (LinkedHashMap) null);
        int size = NDefaultScheduler.listAllSchedulers().size();
        epochChangedListener.onEpochStarted(new EpochStartedNotifier());
        epochChangedListener.onProjectControlled(new ProjectControlledNotifier("_global"));
        Assert.assertEquals(NDefaultScheduler.listAllSchedulers().size(), size);
        epochChangedListener.onProjectEscaped(new ProjectEscapedNotifier("_global"));
        Assert.assertEquals(NDefaultScheduler.listAllSchedulers().size(), size);
    }
}
