package org.apache.shardingsphere.data.pipeline.core.api.impl;

import com.google.common.base.Preconditions;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockMode;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.apache.shardingsphere.scaling.core.job.environment.ScalingEnvironmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.class */
public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl implements RuleAlteredJobAPI {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RuleAlteredJobAPIImpl.class);

    public List<JobInfo> list() {
        checkModeConfig();
        return (List) getJobBriefInfos().map(jobBriefInfo -> {
            return getJobInfo(jobBriefInfo.getJobName());
        }).collect(Collectors.toList());
    }

    private void checkModeConfig() {
        ModeConfiguration modeConfig = PipelineContext.getModeConfig();
        Preconditions.checkNotNull(modeConfig, "Mode configuration is required.");
        Preconditions.checkArgument("Cluster".equalsIgnoreCase(modeConfig.getType()), "Mode must be `Cluster`.");
    }

    private Stream<JobBriefInfo> getJobBriefInfos() {
        return PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(jobBriefInfo -> {
            return !jobBriefInfo.getJobName().startsWith("_");
        });
    }

    private JobInfo getJobInfo(String str) {
        JobInfo jobInfo = new JobInfo(str);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(jobInfo.getJobId());
        RuleAlteredJobConfiguration jobConfig = getJobConfig(elasticJobConfigPOJO);
        jobInfo.setActive(!elasticJobConfigPOJO.isDisabled());
        jobInfo.setShardingTotalCount(jobConfig.getJobShardingCount());
        jobInfo.setTables(jobConfig.getLogicTables());
        jobInfo.setCreateTime(elasticJobConfigPOJO.getProps().getProperty("create_time"));
        jobInfo.setStopTime(elasticJobConfigPOJO.getProps().getProperty("stop_time"));
        jobInfo.setJobParameter(elasticJobConfigPOJO.getJobParameter());
        return jobInfo;
    }

    public Optional<String> start(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        if (0 == ruleAlteredJobConfiguration.getJobShardingCount()) {
            log.warn("Invalid scaling job config!");
            throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0");
        }
        log.info("Start scaling job by {}", ruleAlteredJobConfiguration);
        GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
        String jobId = ruleAlteredJobConfiguration.getJobId();
        String scalingJobConfigPath = PipelineMetaDataNode.getScalingJobConfigPath(jobId);
        if (governanceRepositoryAPI.isExisted(scalingJobConfigPath)) {
            log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", scalingJobConfigPath);
            return Optional.of(jobId);
        }
        governanceRepositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId), RuleAlteredJob.class.getName());
        governanceRepositoryAPI.persist(scalingJobConfigPath, createJobConfigText(ruleAlteredJobConfiguration));
        return Optional.of(jobId);
    }

    private String createJobConfigText(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
        jobConfigurationPOJO.setJobName(ruleAlteredJobConfiguration.getJobId());
        jobConfigurationPOJO.setShardingTotalCount(ruleAlteredJobConfiguration.getJobShardingCount());
        jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration(ruleAlteredJobConfiguration)));
        jobConfigurationPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
        return YamlEngine.marshal(jobConfigurationPOJO);
    }

    public Map<Integer, JobProgress> getProgress(String str) {
        checkModeConfig();
        return getProgress(getJobConfig(str));
    }

    public Map<Integer, JobProgress> getProgress(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        String jobId = ruleAlteredJobConfiguration.getJobId();
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(jobId);
        return (Map) IntStream.range(0, ruleAlteredJobConfiguration.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (linkedHashMap, num) -> {
            JobProgress jobProgress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, num.intValue());
            if (null != jobProgress) {
                jobProgress.setActive(!elasticJobConfigPOJO.isDisabled());
            }
            linkedHashMap.put(num, jobProgress);
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
    }

    private void verifyManualMode(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        if (null != RuleAlteredJobWorker.createRuleAlteredContext(ruleAlteredJobConfiguration).getCompletionDetectAlgorithm()) {
            throw new PipelineVerifyFailedException("It's not necessary to do it in auto mode.");
        }
    }

    private void verifyJobNotCompleted(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        if (RuleAlteredJobProgressDetector.isJobCompleted(ruleAlteredJobConfiguration.getJobShardingCount(), getProgress(ruleAlteredJobConfiguration).values())) {
            throw new PipelineVerifyFailedException("Job is completed, it's not necessary to do it.");
        }
    }

    public void stopClusterWriteDB(String str) {
        checkModeConfig();
        log.info("stopClusterWriteDB for job {}", str);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(str);
        RuleAlteredJobConfiguration jobConfig = getJobConfig(elasticJobConfigPOJO);
        verifyManualMode(jobConfig);
        verifyJobNotStopped(elasticJobConfigPOJO);
        verifyJobNotCompleted(jobConfig);
        stopClusterWriteDB(jobConfig.getDatabaseName(), str);
    }

    public void stopClusterWriteDB(String str, String str2) {
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        if (lockContext.isLocked(str)) {
            log.info("stopClusterWriteDB, already stopped");
        } else {
            if (!lockContext.tryLock(str, LockMode.READ)) {
                throw new RuntimeException("Stop source writing failed");
            }
            log.info("stopClusterWriteDB, tryLockSuccess=true");
        }
    }

    public void restoreClusterWriteDB(String str) {
        checkModeConfig();
        log.info("restoreClusterWriteDB for job {}", str);
        RuleAlteredJobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(str));
        verifyManualMode(jobConfig);
        restoreClusterWriteDB(jobConfig.getDatabaseName(), str);
    }

    public void restoreClusterWriteDB(String str, String str2) {
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        if (!lockContext.isLocked(str)) {
            log.info("restoreClusterWriteDB, isLocked false, databaseName={}", str);
        } else {
            log.info("restoreClusterWriteDB, before releaseLock, databaseName={}, jobId={}", str, str2);
            lockContext.releaseLock(str);
        }
    }

    public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
        checkModeConfig();
        return (Collection) DataConsistencyCalculateAlgorithmFactory.getAllInstances().stream().map(dataConsistencyCalculateAlgorithm -> {
            DataConsistencyCheckAlgorithmInfo dataConsistencyCheckAlgorithmInfo = new DataConsistencyCheckAlgorithmInfo();
            dataConsistencyCheckAlgorithmInfo.setType(dataConsistencyCalculateAlgorithm.getType());
            dataConsistencyCheckAlgorithmInfo.setDescription(dataConsistencyCalculateAlgorithm.getDescription());
            dataConsistencyCheckAlgorithmInfo.setSupportedDatabaseTypes(dataConsistencyCalculateAlgorithm.getSupportedDatabaseTypes());
            return dataConsistencyCheckAlgorithmInfo;
        }).collect(Collectors.toList());
    }

    public boolean isDataConsistencyCheckNeeded(String str) {
        log.info("isDataConsistencyCheckNeeded for job {}", str);
        return isDataConsistencyCheckNeeded(getJobConfig(str));
    }

    public boolean isDataConsistencyCheckNeeded(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        return isDataConsistencyCheckNeeded(RuleAlteredJobWorker.createRuleAlteredContext(ruleAlteredJobConfiguration));
    }

    private boolean isDataConsistencyCheckNeeded(RuleAlteredContext ruleAlteredContext) {
        return null != ruleAlteredContext.getDataConsistencyCalculateAlgorithm();
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String str) {
        checkModeConfig();
        log.info("Data consistency check for job {}", str);
        RuleAlteredJobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(str));
        verifyDataConsistencyCheck(jobConfig);
        return dataConsistencyCheck(jobConfig);
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        RuleAlteredContext createRuleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(ruleAlteredJobConfiguration);
        if (isDataConsistencyCheckNeeded(createRuleAlteredContext)) {
            return dataConsistencyCheck(ruleAlteredJobConfiguration, createRuleAlteredContext.getDataConsistencyCalculateAlgorithm());
        }
        log.info("DataConsistencyCalculatorAlgorithm is not configured, data consistency check is ignored.");
        return Collections.emptyMap();
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String str, String str2, Properties properties) {
        checkModeConfig();
        log.info("Data consistency check for job {}, algorithmType: {}", str, str2);
        RuleAlteredJobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(str));
        verifyDataConsistencyCheck(jobConfig);
        return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(str2, properties));
    }

    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(RuleAlteredJobConfiguration ruleAlteredJobConfiguration, DataConsistencyCalculateAlgorithm dataConsistencyCalculateAlgorithm) {
        String jobId = ruleAlteredJobConfiguration.getJobId();
        Map<String, DataConsistencyCheckResult> check = new DataConsistencyChecker(ruleAlteredJobConfiguration).check(dataConsistencyCalculateAlgorithm);
        log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", new Object[]{jobId, dataConsistencyCalculateAlgorithm.getType(), check});
        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, aggregateDataConsistencyCheckResults(jobId, check));
        return check;
    }

    private void verifyDataConsistencyCheck(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        verifyManualMode(ruleAlteredJobConfiguration);
    }

    public boolean aggregateDataConsistencyCheckResults(String str, Map<String, DataConsistencyCheckResult> map) {
        if (map.isEmpty()) {
            return false;
        }
        for (Map.Entry<String, DataConsistencyCheckResult> entry : map.entrySet()) {
            DataConsistencyCheckResult value = entry.getValue();
            boolean isMatched = value.getCountCheckResult().isMatched();
            boolean isMatched2 = value.getContentCheckResult().isMatched();
            if (!isMatched || !isMatched2) {
                log.error("Scaling job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", new Object[]{str, entry.getKey(), Boolean.valueOf(isMatched), Boolean.valueOf(isMatched2)});
                return false;
            }
        }
        return true;
    }

    public void switchClusterConfiguration(String str) {
        checkModeConfig();
        log.info("Switch cluster configuration for job {}", str);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(str);
        RuleAlteredJobConfiguration jobConfig = getJobConfig(elasticJobConfigPOJO);
        verifyManualMode(jobConfig);
        verifyJobNotStopped(elasticJobConfigPOJO);
        verifyJobNotCompleted(jobConfig);
        switchClusterConfiguration(jobConfig);
    }

    public void switchClusterConfiguration(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        String jobId = ruleAlteredJobConfiguration.getJobId();
        RuleAlteredContext createRuleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(ruleAlteredJobConfiguration);
        GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
        if (isDataConsistencyCheckNeeded(createRuleAlteredContext)) {
            Optional<Boolean> jobCheckResult = governanceRepositoryAPI.getJobCheckResult(jobId);
            if (!jobCheckResult.isPresent() || !jobCheckResult.get().booleanValue()) {
                throw new PipelineVerifyFailedException("Data consistency check is not finished or failed.");
            }
        }
        ShardingSphereEventBus.getInstance().post(new ScalingTaskFinishedEvent(ruleAlteredJobConfiguration.getDatabaseName(), ruleAlteredJobConfiguration.getActiveVersion().intValue(), ruleAlteredJobConfiguration.getNewVersion().intValue()));
        RuleAlteredJobSchedulerCenter.updateJobStatus(jobId, JobStatus.FINISHED);
        Iterator<Integer> it = governanceRepositoryAPI.getShardingItems(jobId).iterator();
        while (it.hasNext()) {
            governanceRepositoryAPI.updateShardingJobStatus(jobId, it.next().intValue(), JobStatus.FINISHED);
        }
        RuleAlteredJobCenter.stop(jobId);
        stop(jobId);
    }

    public void reset(String str) {
        checkModeConfig();
        log.info("Scaling job {} reset target table", str);
        JobConfigurationPOJO elasticJobConfigPOJO = getElasticJobConfigPOJO(str);
        verifyJobStopped(elasticJobConfigPOJO);
        try {
            new ScalingEnvironmentManager().cleanupTargetTables(getJobConfig(elasticJobConfigPOJO));
        } catch (SQLException e) {
            throw new PipelineJobExecutionException("Reset target table failed for job " + str, e);
        }
    }

    public RuleAlteredJobConfiguration getJobConfig(String str) {
        return getJobConfig(getElasticJobConfigPOJO(str));
    }

    private RuleAlteredJobConfiguration getJobConfig(JobConfigurationPOJO jobConfigurationPOJO) {
        return RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigurationPOJO.getJobParameter());
    }
}
