package org.apache.shardingsphere.data.pipeline.core.api.impl;

import com.google.common.base.Preconditions;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
import org.apache.shardingsphere.scaling.core.job.environment.ScalingEnvironmentManager;
import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.class */
public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl implements RuleAlteredJobAPI {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RuleAlteredJobAPIImpl.class);
    private static final Map<String, DataConsistencyCheckAlgorithm> DATA_CONSISTENCY_CHECK_ALGORITHM_MAP = new TreeMap(SingletonSPIRegistry.getTypedSingletonInstancesMap(DataConsistencyCheckAlgorithm.class));

    public List<JobInfo> list() {
        checkModeConfig();
        return (List) getJobBriefInfos().map(jobBriefInfo -> {
            return getJobInfo(jobBriefInfo.getJobName());
        }).collect(Collectors.toList());
    }

    private void checkModeConfig() {
        ModeConfiguration modeConfig = PipelineContext.getModeConfig();
        Preconditions.checkNotNull(modeConfig, "Mode configuration is required.");
        Preconditions.checkArgument("Cluster".equalsIgnoreCase(modeConfig.getType()), "Mode must be `Cluster`.");
    }

    private Stream<JobBriefInfo> getJobBriefInfos() {
        return PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(jobBriefInfo -> {
            return !jobBriefInfo.getJobName().startsWith("_");
        });
    }

    private JobInfo getJobInfo(String str) {
        JobInfo jobInfo = new JobInfo(str);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(jobInfo.getJobId());
        JobConfiguration jobConfig = getJobConfig(elasticJobConfigPOJO);
        jobInfo.setActive(!elasticJobConfigPOJO.isDisabled());
        jobInfo.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
        jobInfo.setTables(jobConfig.getHandleConfig().getLogicTables());
        jobInfo.setCreateTime(elasticJobConfigPOJO.getProps().getProperty("create_time"));
        jobInfo.setStopTime(elasticJobConfigPOJO.getProps().getProperty("stop_time"));
        jobInfo.setJobParameter(elasticJobConfigPOJO.getJobParameter());
        return jobInfo;
    }

    public Optional<String> start(JobConfiguration jobConfiguration) {
        jobConfiguration.buildHandleConfig();
        if (jobConfiguration.getHandleConfig().getJobShardingCount() == 0) {
            log.warn("Invalid scaling job config!");
            throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0");
        }
        log.info("Start scaling job by {}", jobConfiguration.getHandleConfig());
        GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
        String jobId = jobConfiguration.getHandleConfig().getJobId();
        String format = String.format("%s/%s/config", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId);
        if (governanceRepositoryAPI.isExisted(format)) {
            log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", format);
            return Optional.of(jobId);
        }
        governanceRepositoryAPI.persist(String.format("%s/%s", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId), RuleAlteredJob.class.getName());
        governanceRepositoryAPI.persist(format, createJobConfig(jobConfiguration));
        return Optional.of(jobId);
    }

    private String createJobConfig(JobConfiguration jobConfiguration) {
        JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
        jobConfigurationPOJO.setJobName(jobConfiguration.getHandleConfig().getJobId());
        jobConfigurationPOJO.setShardingTotalCount(jobConfiguration.getHandleConfig().getJobShardingCount());
        jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(jobConfiguration));
        jobConfigurationPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
        return YamlEngine.marshal(jobConfigurationPOJO);
    }

    public Map<Integer, JobProgress> getProgress(String str) {
        checkModeConfig();
        return getProgress(getJobConfig(str));
    }

    public Map<Integer, JobProgress> getProgress(JobConfiguration jobConfiguration) {
        String jobId = jobConfiguration.getHandleConfig().getJobId();
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(jobId);
        return (Map) IntStream.range(0, jobConfiguration.getHandleConfig().getJobShardingCount()).boxed().collect(LinkedHashMap::new, (linkedHashMap, num) -> {
            JobProgress jobProgress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, num.intValue());
            if (null != jobProgress) {
                jobProgress.setActive(!elasticJobConfigPOJO.isDisabled());
            }
            linkedHashMap.put(num, jobProgress);
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
    }

    private void verifyManualMode(JobConfiguration jobConfiguration) {
        if (null != RuleAlteredJobWorker.createRuleAlteredContext(jobConfiguration).getCompletionDetectAlgorithm()) {
            throw new PipelineVerifyFailedException("It's not necessary to do it in auto mode.");
        }
    }

    private void verifyJobNotCompleted(JobConfiguration jobConfiguration) {
        if (RuleAlteredJobProgressDetector.isJobCompleted(jobConfiguration.getHandleConfig().getJobShardingCount(), getProgress(jobConfiguration).values())) {
            throw new PipelineVerifyFailedException("Job is completed, it's not necessary to do it.");
        }
    }

    private void verifySourceWritingStopped(JobConfiguration jobConfiguration) {
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        String schemaName = jobConfiguration.getWorkflowConfig().getSchemaName();
        ShardingSphereLock shardingSphereLock = (ShardingSphereLock) lockContext.getSchemaLock(schemaName).orElse(null);
        if (null == shardingSphereLock || !shardingSphereLock.isLocked(schemaName)) {
            throw new PipelineVerifyFailedException("Source writing is not stopped. You could run `STOP SCALING SOURCE WRITING {jobId}` to stop it.");
        }
    }

    public void stopClusterWriteDB(String str) {
        checkModeConfig();
        log.info("stopClusterWriteDB for job {}", str);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(str);
        JobConfiguration jobConfig = getJobConfig(elasticJobConfigPOJO);
        verifyManualMode(jobConfig);
        verifyJobNotStopped(elasticJobConfigPOJO);
        verifyJobNotCompleted(jobConfig);
        stopClusterWriteDB(jobConfig.getWorkflowConfig().getSchemaName(), str);
    }

    public void stopClusterWriteDB(String str, String str2) {
        ShardingSphereLock orCreateSchemaLock = PipelineContext.getContextManager().getInstanceContext().getLockContext().getOrCreateSchemaLock(str);
        if (orCreateSchemaLock.isLocked(str)) {
            log.info("stopClusterWriteDB, already stopped");
            return;
        }
        boolean tryLock = orCreateSchemaLock.tryLock(str);
        log.info("stopClusterWriteDB, tryLockSuccess={}", Boolean.valueOf(tryLock));
        if (!tryLock) {
            throw new RuntimeException("Stop source writing failed");
        }
    }

    public void restoreClusterWriteDB(String str) {
        checkModeConfig();
        log.info("restoreClusterWriteDB for job {}", str);
        JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(str));
        verifyManualMode(jobConfig);
        restoreClusterWriteDB(jobConfig.getWorkflowConfig().getSchemaName(), str);
    }

    public void restoreClusterWriteDB(String str, String str2) {
        ShardingSphereLock shardingSphereLock = (ShardingSphereLock) PipelineContext.getContextManager().getInstanceContext().getLockContext().getSchemaLock(str).orElse(null);
        if (null == shardingSphereLock) {
            log.info("restoreClusterWriteDB, lock is null");
        } else if (!shardingSphereLock.isLocked(str)) {
            log.info("restoreClusterWriteDB, isLocked false, schemaName={}", str);
        } else {
            log.info("restoreClusterWriteDB, before releaseLock, schemaName={}, jobId={}", str, str2);
            shardingSphereLock.releaseLock(str);
        }
    }

    public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
        checkModeConfig();
        return (Collection) DATA_CONSISTENCY_CHECK_ALGORITHM_MAP.values().stream().map(dataConsistencyCheckAlgorithm -> {
            DataConsistencyCheckAlgorithmInfo dataConsistencyCheckAlgorithmInfo = new DataConsistencyCheckAlgorithmInfo();
            dataConsistencyCheckAlgorithmInfo.setType(dataConsistencyCheckAlgorithm.getType());
            dataConsistencyCheckAlgorithmInfo.setDescription(dataConsistencyCheckAlgorithm.getDescription());
            dataConsistencyCheckAlgorithmInfo.setSupportedDatabaseTypes(dataConsistencyCheckAlgorithm.getSupportedDatabaseTypes());
            dataConsistencyCheckAlgorithmInfo.setProvider(dataConsistencyCheckAlgorithm.getProvider());
            return dataConsistencyCheckAlgorithmInfo;
        }).collect(Collectors.toList());
    }

    public boolean isDataConsistencyCheckNeeded(String str) {
        log.info("isDataConsistencyCheckNeeded for job {}", str);
        return isDataConsistencyCheckNeeded(getJobConfig(str));
    }

    public boolean isDataConsistencyCheckNeeded(JobConfiguration jobConfiguration) {
        return isDataConsistencyCheckNeeded(RuleAlteredJobWorker.createRuleAlteredContext(jobConfiguration));
    }

    private boolean isDataConsistencyCheckNeeded(RuleAlteredContext ruleAlteredContext) {
        return null != ruleAlteredContext.getDataConsistencyCheckAlgorithm();
    }

    private void verifyDataConsistencyCheck(JobConfigurationPOJO jobConfigurationPOJO, JobConfiguration jobConfiguration) {
        verifyManualMode(jobConfiguration);
        verifySourceWritingStopped(jobConfiguration);
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String str) {
        checkModeConfig();
        log.info("Data consistency check for job {}", str);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(str);
        JobConfiguration jobConfig = getJobConfig(elasticJobConfigPOJO);
        verifyDataConsistencyCheck(elasticJobConfigPOJO, jobConfig);
        return dataConsistencyCheck(jobConfig);
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(JobConfiguration jobConfiguration) {
        RuleAlteredContext createRuleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfiguration);
        if (isDataConsistencyCheckNeeded(createRuleAlteredContext)) {
            return dataConsistencyCheck0(jobConfiguration, createRuleAlteredContext.getDataConsistencyCheckAlgorithm());
        }
        log.info("dataConsistencyCheckAlgorithm is not configured, data consistency check is ignored.");
        return Collections.emptyMap();
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String str, String str2) {
        checkModeConfig();
        log.info("Data consistency check for job {}, algorithmType: {}", str, str2);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(str);
        JobConfiguration jobConfig = getJobConfig(elasticJobConfigPOJO);
        verifyDataConsistencyCheck(elasticJobConfigPOJO, jobConfig);
        return dataConsistencyCheck0(jobConfig, (DataConsistencyCheckAlgorithm) ShardingSphereAlgorithmFactory.createAlgorithm(new ShardingSphereAlgorithmConfiguration(str2, new Properties()), DataConsistencyCheckAlgorithm.class));
    }

    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck0(JobConfiguration jobConfiguration, DataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm) {
        String jobId = jobConfiguration.getHandleConfig().getJobId();
        DataConsistencyChecker newInstance = EnvironmentCheckerFactory.newInstance(jobConfiguration);
        Map<String, DataConsistencyCheckResult> checkRecordsCount = newInstance.checkRecordsCount();
        if (checkRecordsCount.values().stream().allMatch((v0) -> {
            return v0.isRecordsCountMatched();
        })) {
            Map<String, Boolean> checkRecordsContent = newInstance.checkRecordsContent(dataConsistencyCheckAlgorithm);
            checkRecordsCount.forEach((str, dataConsistencyCheckResult) -> {
                dataConsistencyCheckResult.setRecordsContentMatched(((Boolean) checkRecordsContent.getOrDefault(str, false)).booleanValue());
            });
        }
        log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", new Object[]{jobId, dataConsistencyCheckAlgorithm.getClass().getName(), checkRecordsCount});
        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, aggregateDataConsistencyCheckResults(jobId, checkRecordsCount));
        return checkRecordsCount;
    }

    public boolean aggregateDataConsistencyCheckResults(String str, Map<String, DataConsistencyCheckResult> map) {
        if (map.isEmpty()) {
            return false;
        }
        for (Map.Entry<String, DataConsistencyCheckResult> entry : map.entrySet()) {
            boolean isRecordsCountMatched = entry.getValue().isRecordsCountMatched();
            boolean isRecordsContentMatched = entry.getValue().isRecordsContentMatched();
            if (!isRecordsContentMatched || !isRecordsCountMatched) {
                log.error("Scaling job: {}, table: {} data consistency check failed, recordsContentMatched: {}, recordsCountMatched: {}", new Object[]{str, entry.getKey(), Boolean.valueOf(isRecordsContentMatched), Boolean.valueOf(isRecordsCountMatched)});
                return false;
            }
        }
        return true;
    }

    public void switchClusterConfiguration(String str) {
        checkModeConfig();
        log.info("Switch cluster configuration for job {}", str);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(str);
        JobConfiguration jobConfig = getJobConfig(elasticJobConfigPOJO);
        verifyManualMode(jobConfig);
        verifyJobNotStopped(elasticJobConfigPOJO);
        verifyJobNotCompleted(jobConfig);
        switchClusterConfiguration(jobConfig);
    }

    public void switchClusterConfiguration(JobConfiguration jobConfiguration) {
        String jobId = jobConfiguration.getHandleConfig().getJobId();
        RuleAlteredContext createRuleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfiguration);
        GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
        if (isDataConsistencyCheckNeeded(createRuleAlteredContext)) {
            Optional<Boolean> jobCheckResult = governanceRepositoryAPI.getJobCheckResult(jobId);
            if (!jobCheckResult.isPresent() || !jobCheckResult.get().booleanValue()) {
                throw new PipelineVerifyFailedException("Data consistency check is not finished or failed.");
            }
        }
        WorkflowConfiguration workflowConfig = jobConfiguration.getWorkflowConfig();
        ShardingSphereEventBus.getInstance().post(new ScalingTaskFinishedEvent(workflowConfig.getSchemaName(), workflowConfig.getActiveVersion().intValue(), workflowConfig.getNewVersion().intValue()));
        Iterator<Integer> it = governanceRepositoryAPI.getShardingItems(jobId).iterator();
        while (it.hasNext()) {
            governanceRepositoryAPI.updateShardingJobStatus(jobId, it.next().intValue(), JobStatus.FINISHED);
        }
        stop(jobId);
        try {
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e) {
            log.error(e.getMessage());
        }
        RuleAlteredJobContext ruleAlteredJobContext = new RuleAlteredJobContext(jobConfiguration);
        new RuleAlteredJobPreparer().cleanup(ruleAlteredJobContext);
        ruleAlteredJobContext.close();
    }

    public void reset(String str) {
        checkModeConfig();
        log.info("Scaling job {} reset target table", str);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(str);
        verifyJobStopped(elasticJobConfigPOJO);
        try {
            new ScalingEnvironmentManager().cleanupTargetTables(getJobConfig(elasticJobConfigPOJO));
        } catch (SQLException e) {
            throw new PipelineJobExecutionException("Reset target table failed for job " + str);
        }
    }

    public JobConfiguration getJobConfig(String str) {
        return getJobConfig(getElasticJobConfigPOJO(str));
    }

    private JobConfiguration getJobConfig(JobConfigurationPOJO jobConfigurationPOJO) {
        return (JobConfiguration) YamlEngine.unmarshal(jobConfigurationPOJO.getJobParameter(), JobConfiguration.class, true);
    }
}
