package io.kyligence.kap.secondstorage;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.kyligence.kap.clickhouse.job.ClickHouse;
import io.kyligence.kap.clickhouse.job.ClickHouseSegmentCleanJob;
import io.kyligence.kap.newten.clickhouse.AzuriteContainer;
import io.kyligence.kap.newten.clickhouse.ClickHouseUtils;
import io.kyligence.kap.secondstorage.enums.LockTypeEnum;
import io.kyligence.kap.secondstorage.management.SecondStorageEndpoint;
import io.kyligence.kap.secondstorage.management.SecondStorageService;
import io.kyligence.kap.secondstorage.management.request.StorageRequest;
import io.kyligence.kap.secondstorage.metadata.Manager;
import io.kyligence.kap.secondstorage.metadata.TableData;
import io.kyligence.kap.secondstorage.metadata.TableFlow;
import io.kyligence.kap.secondstorage.metadata.TablePartition;
import io.kyligence.kap.secondstorage.test.ClickHouseClassRule;
import io.kyligence.kap.secondstorage.test.EnableClickHouseJob;
import io.kyligence.kap.secondstorage.test.EnableTestUser;
import io.kyligence.kap.secondstorage.test.SharedSparkSession;
import io.kyligence.kap.secondstorage.test.utils.JobWaiter;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.engine.spark.IndexDataConstructor;
import org.apache.kylin.job.dao.ExecutablePO;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.job.manager.JobManager;
import org.apache.kylin.job.model.JobParam;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.AclUtil;
import org.apache.spark.sql.SparkSession;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;

/* loaded from: input_file:io/kyligence/kap/secondstorage/IncrementalWithIntPartitionTest.class */
public class IncrementalWithIntPartitionTest implements JobWaiter {
    private static final String modelId = "acfde546-2cc9-4eec-bc92-e3bd46d4e2ee";
    private static final String project = "table_index_incremental_with_int_date";
    private IndexDataConstructor indexDataConstructor;

    @ClassRule
    public static SharedSparkSession sharedSpark = new SharedSparkSession(ImmutableMap.of("spark.sql.extensions", "org.apache.kylin.query.SQLPushDownExtensions"));

    @ClassRule
    public static ClickHouseClassRule clickHouseClassRule = new ClickHouseClassRule(2);
    private final SparkSession sparkSession = sharedSpark.getSpark();
    public EnableTestUser enableTestUser = new EnableTestUser();
    public EnableClickHouseJob test = new EnableClickHouseJob(clickHouseClassRule.getClickhouse(), 1, project, Collections.singletonList(modelId), "src/test/resources/ut_meta");

    @Rule
    public TestRule rule = RuleChain.outerRule(this.enableTestUser).around(this.test);
    private SecondStorageService secondStorageService = new SecondStorageService();
    private SecondStorageEndpoint secondStorageEndpoint = new SecondStorageEndpoint();

    @Mock
    private final AclUtil aclUtil = (AclUtil) Mockito.spy(AclUtil.class);

    @Mock
    private final AclEvaluate aclEvaluate = (AclEvaluate) Mockito.spy(AclEvaluate.class);

    public static String getProject() {
        return project;
    }

    @Before
    public void setUp() {
        System.setProperty("kylin.second-storage.wait-index-build-second", "1");
        System.setProperty("kylin.job.scheduler.poll-interval-second", "1");
        this.secondStorageEndpoint.setSecondStorageService(this.secondStorageService);
        this.indexDataConstructor = new IndexDataConstructor(project);
        ReflectionTestUtils.setField(this.aclEvaluate, "aclUtil", this.aclUtil);
        this.secondStorageService.setAclEvaluate(this.aclEvaluate);
    }

    private void buildIncrementalLoadQuery(String str, String str2) throws Exception {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
        NDataflow dataflow = nDataflowManager.getDataflow(modelId);
        this.indexDataConstructor.buildIndex(modelId, new SegmentRange.TimePartitionedSegmentRange(str, str2), new HashSet(dataflow.getIndexPlan().getAllLayouts()), true);
        waitJobFinish(project, triggerClickHouseLoadJob(project, modelId, EnableTestUser.ADMIN, (List) nDataflowManager.getDataflow(modelId).getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList())));
    }

    private void mergeSegments(List<String> list) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(instanceFromEnv, getProject());
        NDataflow dataflow = nDataflowManager.getDataflow(modelId);
        JobManager jobManager = JobManager.getInstance(instanceFromEnv, getProject());
        long j = Long.MAX_VALUE;
        long j2 = -1;
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            NDataSegment segment = dataflow.getSegment(it.next());
            long start = segment.getTSRange().getStart();
            long end = segment.getTSRange().getEnd();
            if (start < j) {
                j = start;
            }
            if (end > j2) {
                j2 = end;
            }
        }
        waitJobFinish(getProject(), jobManager.mergeSegmentJob(new JobParam(nDataflowManager.mergeSegments(dataflow, new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(j), Long.valueOf(j2)), true), modelId, this.enableTestUser.getUser())));
    }

    @Test
    public void testMergeSegmentWhenSegmentNotInSecondStorage() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        buildIncrementalLoadQuery("2012-01-02", "2012-01-03");
        List<String> list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        StorageRequest storageRequest = new StorageRequest();
        storageRequest.setProject(project);
        storageRequest.setModel(modelId);
        this.secondStorageEndpoint.cleanStorage(storageRequest, list.subList(0, 1));
        NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject()).listExecByModelAndStatus(modelId, (v0) -> {
            return v0.isRunning();
        }, (JobTypeEnum[]) null).forEach(abstractExecutable -> {
            waitJobFinish(getProject(), abstractExecutable.getId());
        });
        mergeSegments(list);
        Assert.assertEquals(1L, r0.getDataflow(modelId).getQueryableSegments().size());
        checkSizeInNode();
        this.secondStorageService.sizeInNode(project);
        checkSizeInNode();
    }

    @Test
    public void testRemoveSegmentWhenHasLoadTask() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        buildIncrementalLoadQuery("2012-01-02", "2012-01-03");
        List list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        SegmentRange.TimePartitionedSegmentRange createInfinite = SegmentRange.TimePartitionedSegmentRange.createInfinite();
        SecondStorageLockUtils.acquireLock(modelId, createInfinite).lock();
        try {
            SecondStorageUtil.checkSegmentRemove(project, modelId, (String[]) list.toArray(new String[0]));
            SecondStorageLockUtils.unlock(modelId, createInfinite);
            Assert.fail();
        } catch (KylinException e) {
            SecondStorageLockUtils.unlock(modelId, createInfinite);
            Assert.assertEquals(ServerErrorCode.SEGMENT_DROP_FAILED.toErrorCode(), e.getErrorCode());
        }
    }

    private void checkSizeInNode() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
        Optional tableFlowManager = SecondStorageUtil.tableFlowManager(KylinConfig.getInstanceFromEnv(), project);
        Set set = (Set) nDataflowManager.getDataflow(modelId).getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet());
        TableData tableData = (TableData) ((TableFlow) ((Manager) tableFlowManager.orElseThrow(null)).get(modelId).orElseThrow(null)).getTableDataList().get(0);
        Assert.assertTrue(tableData.containSegments(set));
        Assert.assertTrue(((Long) ((TablePartition) tableData.getPartitions().get(0)).getSizeInNode().values().stream().reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).orElse(0L)).longValue() > 0);
    }

    @Test
    public void testRemoveSegmentFromSecondStorage() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject());
        nExecutableManager.listExecByModelAndStatus(modelId, (v0) -> {
            return v0.isRunning();
        }, (JobTypeEnum[]) null).forEach(abstractExecutable -> {
            waitJobFinish(getProject(), abstractExecutable.getId());
        });
        Optional tableFlowManager = SecondStorageUtil.tableFlowManager(KylinConfig.getInstanceFromEnv(), project);
        Assert.assertEquals(1L, ((TableData) ((TableFlow) ((Manager) tableFlowManager.orElseThrow(null)).get(modelId).orElseThrow(null)).getTableDataList().get(0)).getPartitions().size());
        Assert.assertTrue(getModelRowCount(project, modelId) > 0);
        List list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        StorageRequest storageRequest = new StorageRequest();
        storageRequest.setProject(project);
        storageRequest.setModel(modelId);
        this.secondStorageEndpoint.cleanStorage(storageRequest, list.subList(0, 1));
        nExecutableManager.listExecByModelAndStatus(modelId, (v0) -> {
            return v0.isRunning();
        }, (JobTypeEnum[]) null).forEach(abstractExecutable2 -> {
            waitJobFinish(getProject(), abstractExecutable2.getId());
        });
        Assert.assertEquals(0L, ((TableData) ((TableFlow) ((Manager) tableFlowManager.orElseThrow(null)).get(modelId).orElseThrow(null)).getTableDataList().get(0)).getPartitions().size());
        Assert.assertTrue(getModelRowCount(project, modelId) == 0);
    }

    public static int getModelRowCount(String str, String str2) throws SQLException {
        String database = NameUtil.getDatabase(KylinConfig.getInstanceFromEnv(), str);
        String table = NameUtil.getTable(str2, 20000000001L);
        return ((Integer) SecondStorageNodeHelper.getAllNames().stream().map(SecondStorageNodeHelper::resolve).map(str3 -> {
            try {
                ClickHouse clickHouse = new ClickHouse(str3);
                Throwable th = null;
                try {
                    try {
                        List query = clickHouse.query("select count(*) from `" + database + "`.`" + table + "`", resultSet -> {
                            try {
                                return Integer.valueOf(resultSet.getInt(1));
                            } catch (SQLException e) {
                                return (Integer) ExceptionUtils.rethrow(e);
                            }
                        });
                        Assert.assertFalse(query.isEmpty());
                        Integer num = (Integer) query.get(0);
                        if (clickHouse != null) {
                            if (0 != 0) {
                                try {
                                    clickHouse.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                clickHouse.close();
                            }
                        }
                        return num;
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                return (Integer) ExceptionUtils.rethrow(e);
            }
        }).reduce((v0, v1) -> {
            return Integer.sum(v0, v1);
        }).get()).intValue();
    }

    @Test
    public void testRefreshSegmentWhenLocked() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        NDataSegment firstSegment = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getSegments().getFirstSegment();
        this.secondStorageService.lockOperate(project, Collections.singletonList("LOAD"), "LOCK");
        try {
            refreshSegment(firstSegment.getId(), true);
            Assert.fail();
        } catch (Exception e) {
            Assert.assertEquals(ServerErrorCode.SECOND_STORAGE_PROJECT_LOCKING.toErrorCode(), e.getCause().getErrorCode());
            this.secondStorageService.lockOperate(project, Collections.singletonList("LOAD"), "UNLOCK");
        }
    }

    private String refreshSegment(String str, boolean z) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(instanceFromEnv, getProject());
        NDataflow dataflow = nDataflowManager.getDataflow(modelId);
        String refreshSegmentJob = JobManager.getInstance(instanceFromEnv, getProject()).refreshSegmentJob(new JobParam(nDataflowManager.refreshSegment(dataflow, dataflow.getSegment(str).getSegRange()), dataflow.getModel().getId(), this.enableTestUser.getUser()));
        if (z) {
            waitJobFinish(project, refreshSegmentJob);
        }
        return refreshSegmentJob;
    }

    @Test
    public void testMergeSegmentsWhenLocked() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        buildIncrementalLoadQuery("2012-01-02", "2012-01-03");
        List<String> list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        this.secondStorageService.lockOperate(project, Collections.singletonList("LOAD"), "LOCK");
        try {
            mergeSegments(list);
            Assert.fail();
        } catch (Exception e) {
            Assert.assertEquals(ServerErrorCode.SECOND_STORAGE_PROJECT_LOCKING.toErrorCode(), e.getCause().getErrorCode());
            this.secondStorageService.lockOperate(project, Collections.singletonList("LOAD"), "UNLOCK");
        }
    }

    @Test
    public void testCheckLock() {
        this.secondStorageService.lockOperate(project, Collections.singletonList("LOAD"), "LOCK");
        try {
            LockTypeEnum.checkLock(LockTypeEnum.LOAD.name(), SecondStorageUtil.getProjectLocks(project));
            Assert.fail();
        } catch (KylinException e) {
            Assert.assertEquals(ServerErrorCode.SECOND_STORAGE_PROJECT_LOCKING.toErrorCode(), e.getErrorCode());
            this.secondStorageService.lockOperate(project, Collections.singletonList("LOAD"), "UNLOCK");
        }
    }

    @Test
    public void testCleanSegmentWhenDatabaseNotExists() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        new ClickHouse(SecondStorageNodeHelper.resolve((String) SecondStorageNodeHelper.getAllNames().get(0))).apply("DROP DATABASE " + NameUtil.getDatabase(KylinConfig.getInstanceFromEnv(), project));
        waitJobFinish(project, this.secondStorageService.triggerSegmentsClean(project, modelId, Sets.newHashSet((List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()))));
    }

    @Test
    public void testCleanSegmentWhenModelNotExists() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        new ClickHouse(SecondStorageNodeHelper.resolve((String) SecondStorageNodeHelper.getAllNames().get(0))).apply("DROP TABLE " + NameUtil.getDatabase(KylinConfig.getInstanceFromEnv(), project) + "." + NameUtil.getTable(modelId, 20000000001L));
        waitJobFinish(project, this.secondStorageService.triggerSegmentsClean(project, modelId, Sets.newHashSet((List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()))));
    }

    @Test
    public void testCleanProjectSegments() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        waitJobFinish(project, (String) ((Map) this.secondStorageService.projectClean(Arrays.asList(project, ClickHouseUtils.PrepareTestData.db)).get(project)).get(modelId));
        Assert.assertEquals(0L, ((TableData) ((TableFlow) ((Manager) SecondStorageUtil.tableFlowManager(KylinConfig.getInstanceFromEnv(), project).orElseThrow(null)).get(modelId).orElseThrow(null)).getTableDataList().get(0)).getPartitions().size());
        Assert.assertTrue(SecondStorageUtil.isModelEnable(project, modelId));
    }

    private void cleanSegments(List<String> list) {
        StorageRequest storageRequest = new StorageRequest();
        storageRequest.setProject(project);
        storageRequest.setModel(modelId);
        storageRequest.setSegmentIds(list);
        this.secondStorageEndpoint.cleanStorage(storageRequest, list);
        Stream stream = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getAllExecutables().stream();
        Class<ClickHouseSegmentCleanJob> cls = ClickHouseSegmentCleanJob.class;
        ClickHouseSegmentCleanJob.class.getClass();
        Optional findFirst = stream.filter((v1) -> {
            return r1.isInstance(v1);
        }).findFirst();
        Assert.assertTrue(findFirst.isPresent());
        waitJobFinish(project, ((AbstractExecutable) findFirst.get()).getId());
    }

    @Ignore("TODO: mark it.")
    public void testJobPaused() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        buildIncrementalLoadQuery("2012-01-02", "2012-01-03");
        List list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        cleanSegments(list.subList(1, 2));
        SecondStorageConcurrentTestUtil.registerWaitPoint("WAIT_PAUSED", AzuriteContainer.DEFAULT_BLOB_PORT);
        String refreshSegment = refreshSegment((String) list.get(1), false);
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(((ExecutablePO) NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getAllJobs().stream().filter(executablePO -> {
                return executablePO.getId().equals(refreshSegment);
            }).findFirst().orElseThrow(() -> {
                return new IllegalStateException("Job not found");
            })).getTasks().stream().filter(executablePO2 -> {
                return "SUCCEED".equals(executablePO2.getOutput().getStatus());
            }).count() >= 3);
        });
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).pauseJob(refreshSegment);
            return null;
        }, project, 1, -1L);
        waitJobEnd(project, refreshSegment);
        try {
            SecondStorageUtil.checkJobResume(project, refreshSegment);
        } catch (KylinException e) {
            Assert.assertEquals(ServerErrorCode.JOB_RESUME_FAILED.toErrorCode(), e.getErrorCode());
        }
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(project);
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(nDefaultScheduler.getContext().getRunningJobs().values().size() == 0);
        });
        Assert.assertEquals(1L, ((TableData) ((TableFlow) ((Manager) SecondStorageUtil.tableFlowManager(KylinConfig.getInstanceFromEnv(), project).get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
        Assert.assertFalse(SecondStorageLockUtils.containsKey(modelId, SegmentRange.TimePartitionedSegmentRange.createInfinite()));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.PAUSED, NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getJob(refreshSegment).getStatus());
        });
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).resumeJob(refreshSegment);
            return null;
        }, project, 1, -1L);
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getJob(refreshSegment).getStatus() == ExecutableState.RUNNING);
        });
        waitJobFinish(project, refreshSegment);
        Assert.assertEquals(24L, getModelRowCount(project, modelId));
        SecondStorageUtil.checkSecondStorageData(project);
    }

    @Test
    public void testJobPausedAfterCommit() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        buildIncrementalLoadQuery("2012-01-02", "2012-01-03");
        buildIncrementalLoadQuery("2012-01-03", "2012-01-04");
        buildIncrementalLoadQuery("2012-01-04", "2012-01-05");
        List<String> list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        cleanSegments(list);
        SecondStorageConcurrentTestUtil.registerWaitPoint("WAIT_AFTER_COMMIT", AzuriteContainer.DEFAULT_BLOB_PORT);
        String triggerClickHouseLoadJob = triggerClickHouseLoadJob(project, modelId, this.enableTestUser.getUser(), list);
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(SecondStorageConcurrentTestUtil.isWaiting("WAIT_AFTER_COMMIT"));
        });
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).pauseJob(triggerClickHouseLoadJob);
            return null;
        }, project, 1, -1L);
        waitJobEnd(project, triggerClickHouseLoadJob);
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(project);
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(nDefaultScheduler.getContext().getRunningJobs().values().size() == 0);
        });
        Optional tableFlowManager = SecondStorageUtil.tableFlowManager(KylinConfig.getInstanceFromEnv(), project);
        Assert.assertEquals(0L, ((TableData) ((TableFlow) ((Manager) tableFlowManager.get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
        Assert.assertFalse(SecondStorageLockUtils.containsKey(modelId, SegmentRange.TimePartitionedSegmentRange.createInfinite()));
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).resumeJob(triggerClickHouseLoadJob);
            return null;
        }, project, 1, -1L);
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getJob(triggerClickHouseLoadJob).getStatus() == ExecutableState.RUNNING);
        });
        waitJobFinish(project, triggerClickHouseLoadJob);
        Assert.assertEquals(4L, ((TableData) ((TableFlow) ((Manager) tableFlowManager.get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
        Assert.assertEquals(48L, getModelRowCount(project, modelId));
        SecondStorageUtil.checkSecondStorageData(project);
    }

    @Test
    public void testJobPausedBeforeCommit() throws Exception {
        buildIncrementalLoadQuery("2012-01-01", "2012-01-02");
        buildIncrementalLoadQuery("2012-01-02", "2012-01-03");
        buildIncrementalLoadQuery("2012-01-03", "2012-01-04");
        buildIncrementalLoadQuery("2012-01-04", "2012-01-05");
        List<String> list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        cleanSegments(list);
        SecondStorageConcurrentTestUtil.registerWaitPoint("WAIT_BEFORE_COMMIT", AzuriteContainer.DEFAULT_BLOB_PORT);
        String triggerClickHouseLoadJob = triggerClickHouseLoadJob(project, modelId, this.enableTestUser.getUser(), list);
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(SecondStorageConcurrentTestUtil.isWaiting("WAIT_BEFORE_COMMIT"));
        });
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).pauseJob(triggerClickHouseLoadJob);
            return null;
        }, project, 1, -1L);
        waitJobEnd(project, triggerClickHouseLoadJob);
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(project);
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(nDefaultScheduler.getContext().getRunningJobs().values().size() == 0);
        });
        Optional tableFlowManager = SecondStorageUtil.tableFlowManager(KylinConfig.getInstanceFromEnv(), project);
        Assert.assertEquals(0L, ((TableData) ((TableFlow) ((Manager) tableFlowManager.get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
        Assert.assertFalse(SecondStorageLockUtils.containsKey(modelId, SegmentRange.TimePartitionedSegmentRange.createInfinite()));
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).resumeJob(triggerClickHouseLoadJob);
            return null;
        }, project, 1, -1L);
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getJob(triggerClickHouseLoadJob).getStatus() == ExecutableState.RUNNING);
        });
        waitJobFinish(project, triggerClickHouseLoadJob);
        Assert.assertEquals(4L, ((TableData) ((TableFlow) ((Manager) tableFlowManager.get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
        Assert.assertEquals(48L, getModelRowCount(project, modelId));
        SecondStorageUtil.checkSecondStorageData(project);
    }
}
