package io.kyligence.kap.secondstorage;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.kyligence.kap.clickhouse.database.ClickHouseOperator;
import io.kyligence.kap.clickhouse.ddl.ClickHouseCreateTable;
import io.kyligence.kap.clickhouse.ddl.ClickHouseRender;
import io.kyligence.kap.clickhouse.job.ClickHouse;
import io.kyligence.kap.clickhouse.job.ClickHouseLoad;
import io.kyligence.kap.clickhouse.job.ClickHouseSegmentCleanJob;
import io.kyligence.kap.clickhouse.job.LoadContext;
import io.kyligence.kap.clickhouse.management.ClickHouseConfigLoader;
import io.kyligence.kap.newten.clickhouse.AzuriteContainer;
import io.kyligence.kap.newten.clickhouse.ClickHouseSimpleITWithS3Test;
import io.kyligence.kap.newten.clickhouse.ClickHouseUtils;
import io.kyligence.kap.secondstorage.config.ClusterInfo;
import io.kyligence.kap.secondstorage.config.Node;
import io.kyligence.kap.secondstorage.ddl.exp.ColumnWithType;
import io.kyligence.kap.secondstorage.management.OpenSecondStorageEndpoint;
import io.kyligence.kap.secondstorage.management.SecondStorageEndpoint;
import io.kyligence.kap.secondstorage.management.SecondStorageScheduleService;
import io.kyligence.kap.secondstorage.management.SecondStorageService;
import io.kyligence.kap.secondstorage.management.request.RecoverRequest;
import io.kyligence.kap.secondstorage.management.request.StorageRequest;
import io.kyligence.kap.secondstorage.metadata.Manager;
import io.kyligence.kap.secondstorage.metadata.TableData;
import io.kyligence.kap.secondstorage.metadata.TableFlow;
import io.kyligence.kap.secondstorage.test.ClickHouseClassRule;
import io.kyligence.kap.secondstorage.test.EnableClickHouseJob;
import io.kyligence.kap.secondstorage.test.EnableTestUser;
import io.kyligence.kap.secondstorage.test.SharedSparkSession;
import io.kyligence.kap.secondstorage.test.utils.JobWaiter;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.JobErrorCode;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.Unsafe;
import org.apache.kylin.engine.spark.IndexDataConstructor;
import org.apache.kylin.engine.spark.job.NResourceDetectStep;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.JobSchedulerModeEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.rest.response.NDataSegmentResponse;
import org.apache.kylin.rest.service.ModelService;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.util.ExecAndComp;
import org.apache.spark.sql.SparkSession;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;
import org.testcontainers.containers.JdbcDatabaseContainer;

/* loaded from: input_file:io/kyligence/kap/secondstorage/SecondStorageJavaTest.class */
public class SecondStorageJavaTest implements JobWaiter {
    private static final String modelName = "test_table_index";
    private static final String modelId = "acfde546-2cc9-4eec-bc92-e3bd46d4e2ee";
    private static final String project = "table_index";
    public EnableTestUser enableTestUser = new EnableTestUser();
    public EnableClickHouseJob test = new EnableClickHouseJob(clickHouseClassRule.getClickhouse(), 1, project, Collections.singletonList(modelId), "src/test/resources/ut_meta");

    @Rule
    public TestRule rule = RuleChain.outerRule(this.enableTestUser).around(this.test);
    private ModelService modelService = new ModelService();
    private SecondStorageScheduleService secondStorageScheduleService = new SecondStorageScheduleService();
    private SecondStorageService secondStorageService = new SecondStorageService();
    private SecondStorageEndpoint secondStorageEndpoint = new SecondStorageEndpoint();
    private OpenSecondStorageEndpoint openSecondStorageEndpoint = new OpenSecondStorageEndpoint();
    private AclEvaluate aclEvaluate = (AclEvaluate) Mockito.mock(AclEvaluate.class);
    private final SparkSession sparkSession = sharedSpark.getSpark();

    @ClassRule
    public static SharedSparkSession sharedSpark = new SharedSparkSession(ImmutableMap.of("spark.sql.extensions", "org.apache.kylin.query.SQLPushDownExtensions"));

    @ClassRule
    public static ClickHouseClassRule clickHouseClassRule = new ClickHouseClassRule(1);

    @Before
    public void setUp() {
        System.setProperty("kylin.second-storage.wait-index-build-second", "1");
        System.setProperty("kylin.job.scheduler.poll-interval-second", "1");
        ReflectionTestUtils.setField(this.modelService, "aclEvaluate", this.aclEvaluate);
        this.secondStorageEndpoint.setSecondStorageService(this.secondStorageService);
        this.secondStorageService.setAclEvaluate(this.aclEvaluate);
        this.openSecondStorageEndpoint.setSecondStorageService(this.secondStorageService);
        this.openSecondStorageEndpoint.setSecondStorageEndpoint(this.secondStorageEndpoint);
        this.openSecondStorageEndpoint.setModelService(this.modelService);
    }

    @Test
    public void testCleanSegment() throws Exception {
        Assert.assertTrue(SecondStorageUtil.tableFlowManager(KylinConfig.getInstanceFromEnv(), project).isPresent());
        buildModel();
        Assert.assertEquals(1L, ((TableData) ((TableFlow) ((Manager) r0.get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
        NDataflow dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId);
        List list = (List) dataflow.getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        Assert.assertTrue(new NDataSegmentResponse(dataflow, dataflow.getFirstSegment()).isHasBaseTableIndexData());
        StorageRequest storageRequest = new StorageRequest();
        storageRequest.setProject(project);
        storageRequest.setModel(modelId);
        storageRequest.setSegmentIds(list);
        this.secondStorageEndpoint.cleanStorage(storageRequest, list);
        Stream stream = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getAllExecutables().stream();
        Class<ClickHouseSegmentCleanJob> cls = ClickHouseSegmentCleanJob.class;
        ClickHouseSegmentCleanJob.class.getClass();
        Optional findFirst = stream.filter((v1) -> {
            return r1.isInstance(v1);
        }).findFirst();
        Assert.assertTrue(findFirst.isPresent());
        waitJobFinish(project, ((AbstractExecutable) findFirst.get()).getId());
        Assert.assertEquals(0L, ((TableData) ((TableFlow) ((Manager) r0.get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
    }

    @Test
    public void testOpenCleanSegment() throws Exception {
        Assert.assertTrue(SecondStorageUtil.tableFlowManager(KylinConfig.getInstanceFromEnv(), project).isPresent());
        buildModel();
        Assert.assertEquals(1L, ((TableData) ((TableFlow) ((Manager) r0.get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
        NDataflow dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId);
        List list = (List) dataflow.getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        Assert.assertTrue(new NDataSegmentResponse(dataflow, dataflow.getFirstSegment()).isHasBaseTableIndexData());
        StorageRequest storageRequest = new StorageRequest();
        storageRequest.setProject(project);
        storageRequest.setModelName(modelName);
        storageRequest.setSegmentIds(list);
        this.openSecondStorageEndpoint.cleanStorage(storageRequest);
        Stream stream = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getAllExecutables().stream();
        Class<ClickHouseSegmentCleanJob> cls = ClickHouseSegmentCleanJob.class;
        ClickHouseSegmentCleanJob.class.getClass();
        stream.filter((v1) -> {
            return r1.isInstance(v1);
        }).findFirst();
        Assert.assertEquals(0L, ((TableData) ((TableFlow) ((Manager) r0.get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
    }

    @Test
    public void testDoubleTriggerSegmentLoad() throws Exception {
        buildModel();
        List<String> list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        new StorageRequest();
        try {
            SecondStorageUtil.checkJobResume(project, triggerClickHouseLoadJob(project, modelId, this.enableTestUser.getUser(), list));
        } catch (KylinException e) {
            Assert.assertEquals(ServerErrorCode.JOB_RESUME_FAILED.toErrorCode(), e.getErrorCode());
        }
        try {
            triggerClickHouseLoadJob(project, modelId, this.enableTestUser.getUser(), list);
            Assert.fail();
        } catch (KylinException e2) {
            Assert.assertEquals(ServerErrorCode.FAILED_CREATE_JOB.toErrorCode(), e2.getErrorCode());
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testRecoverModelNotEnableSecondStorage() {
        RecoverRequest recoverRequest = new RecoverRequest();
        recoverRequest.setProject(project);
        recoverRequest.setModelName(modelName);
        waitJobFinish(project, this.secondStorageService.disableModel(project, modelId));
        this.openSecondStorageEndpoint.recoverModel(recoverRequest);
        Assert.fail();
    }

    @Test
    public void testRecoverModelWhenHasLoadTask() throws Exception {
        buildModel();
        triggerClickHouseLoadJob(project, modelId, this.enableTestUser.getUser(), (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        RecoverRequest recoverRequest = new RecoverRequest();
        recoverRequest.setProject(project);
        recoverRequest.setModelName(modelName);
        try {
            this.openSecondStorageEndpoint.recoverModel(recoverRequest);
            Assert.fail();
        } catch (KylinException e) {
            Assert.assertEquals(JobErrorCode.SECOND_STORAGE_JOB_EXISTS.toErrorCode(), e.getErrorCode());
        }
    }

    @Test
    public void testCleanSegmentWhenHasLoadTask() throws Exception {
        buildModel();
        List<String> list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        triggerClickHouseLoadJob(project, modelId, this.enableTestUser.getUser(), list);
        StorageRequest storageRequest = new StorageRequest();
        storageRequest.setProject(project);
        storageRequest.setModel(modelId);
        try {
            this.secondStorageEndpoint.cleanStorage(storageRequest, list);
            Assert.fail();
        } catch (KylinException e) {
            Assert.assertEquals(JobErrorCode.SECOND_STORAGE_JOB_EXISTS.toErrorCode(), e.getErrorCode());
        }
    }

    @Test(expected = KylinException.class)
    public void testRecoverModelNotExist() {
        RecoverRequest recoverRequest = new RecoverRequest();
        recoverRequest.setProject(project);
        recoverRequest.setModelName("test_table_index123");
        this.openSecondStorageEndpoint.recoverModel(recoverRequest);
        Assert.fail();
    }

    @Test
    public void testModelCleanJobWithoutSegments() {
        AbstractExecutable job = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getJob(triggerModelCleanJob(project, modelId, this.enableTestUser.getUser()));
        Assert.assertTrue(job.getDataRangeStart() < job.getDataRangeEnd());
    }

    @Test
    public void testEnableModelWithoutBaseLayout() {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project).updateIndexPlan(modelId, indexPlan -> {
                indexPlan.removeLayouts(Sets.newHashSet(new Long[]{Long.valueOf(indexPlan.getBaseTableLayout().getId())}), true, true);
            });
            return null;
        }, project);
        NDataflow dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId);
        Assert.assertFalse(new NDataSegmentResponse(dataflow, dataflow.getFirstSegment()).isHasBaseTableIndexData());
        this.secondStorageService.updateIndex(project, modelId);
        this.secondStorageService.disableModel(project, modelId);
        this.secondStorageService.enableModelSecondStorage(project, modelId);
        this.secondStorageService.updateIndex(project, modelId);
        this.secondStorageService.enableModelSecondStorage(project, modelId);
        Assert.assertTrue(SecondStorageUtil.isModelEnable(project, modelId));
    }

    @Test
    public void testEnableProjectNodeNotAvailable() {
        try {
            this.secondStorageService.changeProjectSecondStorageState("table_index_incremental", SecondStorageNodeHelper.getAllNames(), true);
            Assert.fail();
        } catch (KylinException e) {
            Assert.assertEquals(ServerErrorCode.SECOND_STORAGE_NODE_NOT_AVAILABLE.toErrorCode(), e.getErrorCode());
        }
    }

    @Test
    public void testResetStorage() {
        Assert.assertTrue(SecondStorageUtil.isProjectEnable(project));
        this.secondStorageEndpoint.resetStorage();
        Assert.assertFalse(SecondStorageUtil.isProjectEnable(project));
    }

    @Test
    public void testQueryWithClickHouseSuccess() throws Exception {
        Unsafe.setProperty("kylin.second-storage.jdbc-catalog", "testQueryWithClickHouseSuccess");
        this.secondStorageEndpoint.refreshConf();
        ((AclEvaluate) Mockito.verify(this.aclEvaluate)).checkIsGlobalAdmin();
        this.secondStorageService.sizeInNode(project);
        buildModel();
        Assert.assertEquals(3L, SecondStorageUtil.setSecondStorageSizeInfo(NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project).listAllModels()).size());
        this.test.checkHttpServer();
        this.test.overwriteSystemProp("kylin.query.use-tableindex-answer-non-raw-query", "true");
        JdbcDatabaseContainer<?> clickhouse = clickHouseClassRule.getClickhouse(0);
        this.sparkSession.sessionState().conf().setConfString("spark.sql.catalog.testQueryWithClickHouseSuccess", "org.apache.spark.sql.execution.datasources.jdbc.v2.SecondStorageCatalog");
        this.sparkSession.sessionState().conf().setConfString("spark.sql.catalog.testQueryWithClickHouseSuccess.url", clickhouse.getJdbcUrl());
        this.sparkSession.sessionState().conf().setConfString("spark.sql.catalog.testQueryWithClickHouseSuccess.driver", clickhouse.getDriverClassName());
        ExecAndComp.queryModel(project, "select sum(PRICE) from TEST_KYLIN_FACT group by PRICE");
        Assert.assertTrue(OLAPContext.getNativeRealizations().stream().allMatch((v0) -> {
            return v0.isSecondStorage();
        }));
    }

    @Test
    public void testClickHouseOperator() throws Exception {
        String resolve = SecondStorageNodeHelper.resolve((String) SecondStorageNodeHelper.getAllNames().get(0));
        ClickHouseOperator clickHouseOperator = new ClickHouseOperator(SecondStorageNodeHelper.resolve((String) SecondStorageNodeHelper.getAllNames().get(0)));
        Assert.assertEquals(4L, clickHouseOperator.listDatabases().size());
        ClickHouse clickHouse = new ClickHouse(resolve);
        clickHouse.apply("CREATE TABLE test(a int) engine=Memory()");
        Assert.assertEquals(1L, clickHouseOperator.listTables(ClickHouseUtils.PrepareTestData.db).size());
        clickHouseOperator.dropTable(ClickHouseUtils.PrepareTestData.db, ClickHouseSimpleITWithS3Test.ACCESS_KEY);
        Assert.assertEquals(0L, clickHouseOperator.listTables(ClickHouseUtils.PrepareTestData.db).size());
        clickHouseOperator.close();
        clickHouse.close();
    }

    @Test
    public void testSchedulerService() throws Exception {
        buildModel();
        ClickHouse clickHouse = new ClickHouse(SecondStorageNodeHelper.resolve((String) SecondStorageNodeHelper.getAllNames().get(0)));
        ClickHouseOperator clickHouseOperator = new ClickHouseOperator(SecondStorageNodeHelper.resolve((String) SecondStorageNodeHelper.getAllNames().get(0)));
        ClickHouseRender clickHouseRender = new ClickHouseRender();
        String replace = RandomUtil.randomUUIDStr().replace("-", "_");
        String str = replace + "@test_temp";
        String database = NameUtil.getDatabase(KylinConfig.getInstanceFromEnv(), project);
        clickHouse.apply(ClickHouseCreateTable.createCKTable(database, str).columns(new ColumnWithType[]{new ColumnWithType("i1", "Int32")}).columns(new ColumnWithType[]{new ColumnWithType("i2", "Nullable(Int64)")}).engine("MergeTree()").toSql(clickHouseRender));
        String str2 = replace + "@test_src_0";
        clickHouse.apply(ClickHouseCreateTable.createCKTable(database, str2).columns(new ColumnWithType[]{new ColumnWithType("i1", "Int32")}).columns(new ColumnWithType[]{new ColumnWithType("i2", "Nullable(Int64)")}).engine("MergeTree()").toSql(clickHouseRender));
        this.secondStorageScheduleService.secondStorageTempTableCleanTask();
        List listTables = clickHouseOperator.listTables(database);
        Assert.assertFalse(listTables.contains(str));
        Assert.assertFalse(listTables.contains(str2));
    }

    private void cleanSegments(List<String> list) {
        StorageRequest storageRequest = new StorageRequest();
        storageRequest.setProject(project);
        storageRequest.setModel(modelId);
        storageRequest.setSegmentIds(list);
        this.secondStorageEndpoint.cleanStorage(storageRequest, list);
        Stream stream = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getAllExecutables().stream();
        Class<ClickHouseSegmentCleanJob> cls = ClickHouseSegmentCleanJob.class;
        ClickHouseSegmentCleanJob.class.getClass();
        Optional findFirst = stream.filter((v1) -> {
            return r1.isInstance(v1);
        }).findFirst();
        Assert.assertTrue(findFirst.isPresent());
        waitJobFinish(project, ((AbstractExecutable) findFirst.get()).getId());
    }

    @Test
    public void testJobPaused() throws Exception {
        buildModel();
        List<String> list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        cleanSegments(list);
        SecondStorageConcurrentTestUtil.registerWaitPoint("WAIT_BEFORE_COMMIT", AzuriteContainer.DEFAULT_BLOB_PORT);
        String triggerClickHouseLoadJob = triggerClickHouseLoadJob(project, modelId, this.enableTestUser.getUser(), list);
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getJob(triggerClickHouseLoadJob).getTasks().stream().anyMatch(abstractExecutable -> {
                return abstractExecutable.getStatus() == ExecutableState.RUNNING;
            }));
        });
        Awaitility.await().pollDelay(5L, TimeUnit.SECONDS).until(() -> {
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).pauseJob(triggerClickHouseLoadJob);
                return null;
            }, project, 1, -1L, triggerClickHouseLoadJob);
            return true;
        });
        waitJobEnd(project, triggerClickHouseLoadJob);
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(project);
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(nDefaultScheduler.getContext().getRunningJobs().values().size() == 0);
        });
        Assert.assertEquals(0L, ((TableData) ((TableFlow) ((Manager) SecondStorageUtil.tableFlowManager(KylinConfig.getInstanceFromEnv(), project).get()).get(modelId).orElseThrow(() -> {
            return new IllegalStateException("tableflow not found");
        })).getTableDataList().get(0)).getPartitions().size());
        Assert.assertFalse(SecondStorageLockUtils.containsKey(modelId, SegmentRange.TimePartitionedSegmentRange.createInfinite()));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.PAUSED, NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getJob(triggerClickHouseLoadJob).getStatus());
        });
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).resumeJob(triggerClickHouseLoadJob);
            return null;
        }, project, 1, -1L, triggerClickHouseLoadJob);
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getJob(triggerClickHouseLoadJob).getStatus() == ExecutableState.RUNNING);
        });
        waitJobFinish(project, triggerClickHouseLoadJob);
        Assert.assertEquals(10000L, IncrementalWithIntPartitionTest.getModelRowCount(project, modelId));
        SecondStorageUtil.checkSecondStorageData(project);
    }

    @Test
    public void testJobResume() throws Exception {
        buildModel();
        List<String> list = (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getQueryableSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        SecondStorageConcurrentTestUtil.registerWaitPoint("WAIT_BEFORE_COMMIT", AzuriteContainer.DEFAULT_BLOB_PORT);
        String triggerClickHouseLoadJob = triggerClickHouseLoadJob(project, modelId, this.enableTestUser.getUser(), list);
        Awaitility.await().atMost(15L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getJob(triggerClickHouseLoadJob).getTasks().stream().anyMatch(abstractExecutable -> {
                return abstractExecutable.getStatus() == ExecutableState.RUNNING;
            }));
        });
        try {
            SecondStorageUtil.checkJobResume(project, triggerClickHouseLoadJob);
        } catch (Exception e) {
            Assert.assertTrue(e instanceof KylinException);
        }
        waitJobFinish(project, triggerClickHouseLoadJob);
    }

    @Test
    public void testJobPausedWithAny() throws Exception {
        wrapWithOnlineMode("ANY", () -> {
            testJobPaused();
            return null;
        });
    }

    @Test
    public void testJobPausedDeserialize() throws Exception {
        LoadContext loadContext = new LoadContext((ClickHouseLoad) null);
        LoadContext loadContext2 = new LoadContext((ClickHouseLoad) null);
        loadContext2.deserializeToString(loadContext.serializeToString());
        loadContext2.finishSingleFile(new LoadContext.CompletedFileKeyUtil(ClickHouseSimpleITWithS3Test.ACCESS_KEY, 20000010001L), "file1");
        loadContext2.finishSegment("segment1", new LoadContext.CompletedSegmentKeyUtil(20000010001L));
        LoadContext loadContext3 = new LoadContext((ClickHouseLoad) null);
        loadContext3.deserializeToString(loadContext2.serializeToString());
        Assert.assertTrue(loadContext3.getHistory(new LoadContext.CompletedFileKeyUtil(ClickHouseSimpleITWithS3Test.ACCESS_KEY, 20000010001L)).contains("file1"));
        Assert.assertTrue(loadContext3.getHistorySegments(new LoadContext.CompletedSegmentKeyUtil(20000010001L)).contains("segment1"));
    }

    @Test(expected = KylinException.class)
    public void testCheckJobRestart() throws Exception {
        buildModel();
        String triggerClickHouseLoadJob = triggerClickHouseLoadJob(project, modelId, EnableTestUser.ADMIN, (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        waitJobFinish(project, triggerClickHouseLoadJob);
        SecondStorageUtil.checkJobRestart(project, triggerClickHouseLoadJob);
    }

    @Test
    public void testCheckJobRestartMock() {
        DefaultExecutable defaultExecutable = new DefaultExecutable();
        defaultExecutable.setName("STEP_REFRESH_SECOND_STORAGE");
        defaultExecutable.setProject(project);
        NResourceDetectStep nResourceDetectStep = new NResourceDetectStep();
        nResourceDetectStep.setName("Detect Resource");
        nResourceDetectStep.setProject(project);
        DefaultExecutable defaultExecutable2 = new DefaultExecutable();
        defaultExecutable2.setName("Update Metadata");
        defaultExecutable2.setProject(project);
        DefaultExecutable defaultExecutable3 = new DefaultExecutable();
        defaultExecutable3.setProject(project);
        defaultExecutable3.setJobType(JobTypeEnum.EXPORT_TO_SECOND_STORAGE);
        defaultExecutable3.addTask(defaultExecutable);
        Assert.assertThrows(KylinException.class, () -> {
            SecondStorageUtil.checkJobRestart(defaultExecutable3);
        });
        DefaultExecutable defaultExecutable4 = new DefaultExecutable();
        defaultExecutable4.setProject(project);
        defaultExecutable4.setJobType(JobTypeEnum.INDEX_BUILD);
        defaultExecutable4.addTask(defaultExecutable2);
        SecondStorageUtil.checkJobRestart(defaultExecutable4);
        defaultExecutable4.addTask(defaultExecutable);
        SecondStorageUtil.checkJobRestart(defaultExecutable4);
        defaultExecutable4.setJobSchedulerMode(JobSchedulerModeEnum.DAG);
        Assert.assertThrows(KylinException.class, () -> {
            SecondStorageUtil.checkJobRestart(defaultExecutable4);
        });
        defaultExecutable4.getTasks().add(0, nResourceDetectStep);
        SecondStorageUtil.checkJobRestart(defaultExecutable4);
    }

    @Test
    public void testCheckJobResumeMock() {
        DefaultExecutable defaultExecutable = new DefaultExecutable();
        defaultExecutable.setName("STEP_REFRESH_SECOND_STORAGE");
        defaultExecutable.setProject(project);
        NResourceDetectStep nResourceDetectStep = new NResourceDetectStep();
        nResourceDetectStep.setName("Detect Resource");
        nResourceDetectStep.setProject(project);
        DefaultExecutable defaultExecutable2 = new DefaultExecutable();
        defaultExecutable2.setName("Update Metadata");
        defaultExecutable2.setProject(project);
        DefaultExecutable defaultExecutable3 = new DefaultExecutable();
        defaultExecutable3.setProject(project);
        defaultExecutable3.setJobType(JobTypeEnum.EXPORT_TO_SECOND_STORAGE);
        defaultExecutable3.addTask(nResourceDetectStep);
        defaultExecutable3.addTask(defaultExecutable2);
        SecondStorageUtil.checkJobResume(defaultExecutable3);
        defaultExecutable3.addTask(defaultExecutable);
        SecondStorageUtil.checkJobResume(defaultExecutable3);
    }

    @Test
    public void testCleanModelWhenTableNotExists() throws Exception {
        buildModel();
        ClickHouse clickHouse = new ClickHouse(SecondStorageNodeHelper.resolve((String) SecondStorageNodeHelper.getAllNames().get(0)));
        clickHouse.apply("DROP TABLE " + NameUtil.getDatabase(KylinConfig.getInstanceFromEnv(), project) + "." + NameUtil.getTable(modelId, 20000000001L));
        waitJobFinish(project, triggerModelCleanJob(project, modelId, this.enableTestUser.getUser()));
        waitJobFinish(project, triggerClickHouseLoadJob(project, modelId, EnableTestUser.ADMIN, (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList())));
        clickHouse.apply("DROP DATABASE " + NameUtil.getDatabase(KylinConfig.getInstanceFromEnv(), project));
        String triggerModelCleanJob = triggerModelCleanJob(project, modelId, this.enableTestUser.getUser());
        waitJobFinish(project, triggerModelCleanJob);
        SecondStorageUtil.checkJobResume(project, triggerModelCleanJob);
        SecondStorageUtil.checkJobRestart(project, triggerModelCleanJob);
    }

    @Test
    public void testSshPort() throws Exception {
        Unsafe.setProperty("kylin.second-storage.jdbc-catalog", "testQueryWithClickHouseHASuccess");
        internalConfigClickHouse(2, 22, ClickHouseUtils.startClickHouse(), ClickHouseUtils.startClickHouse());
        ClusterInfo cluster = ClickHouseConfigLoader.getInstance().getCluster();
        List nodes = cluster.getNodes();
        nodes.forEach(node -> {
            Assert.assertEquals(22L, node.getSSHPort());
        });
        Node node2 = (Node) nodes.get(0);
        new CliCommandExecutor(node2.getIp(), cluster.getUserName(), cluster.getPassword(), KylinConfig.getInstanceFromEnv().getSecondStorageSshIdentityPath(), node2.getSSHPort()).getSshClient().toString();
    }

    public static void internalConfigClickHouse(int i, int i2, JdbcDatabaseContainer<?>... jdbcDatabaseContainerArr) throws IOException {
        ClickHouseUtils.internalConfigClickHouse(jdbcDatabaseContainerArr, i, i2);
    }

    public void buildModel() throws Exception {
        new IndexDataConstructor(project).buildDataflow(modelId);
        waitAllJobFinish(project);
        waitJobFinish(project, triggerClickHouseLoadJob(project, modelId, EnableTestUser.ADMIN, (List) NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId).getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList())));
    }

    private static <T> T wrapWithOnlineMode(String str, Callable<T> callable) throws Exception {
        String kylinEngineSegmentOnlineMode = KylinConfig.getInstanceFromEnv().getKylinEngineSegmentOnlineMode();
        System.setProperty("kylin.engine.segment-online-mode", str);
        try {
            T call = callable.call();
            System.setProperty("kylin.engine.segment-online-mode", kylinEngineSegmentOnlineMode);
            return call;
        } catch (Throwable th) {
            System.setProperty("kylin.engine.segment-online-mode", kylinEngineSegmentOnlineMode);
            throw th;
        }
    }
}
