package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;

import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.class */
public final class RuleAlteredJobWorker {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RuleAlteredJobWorker.class);
    private static final RuleAlteredJobWorker INSTANCE = new RuleAlteredJobWorker();
    private static final YamlRuleConfigurationSwapperEngine SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
    private static final AtomicBoolean WORKER_INITIALIZED = new AtomicBoolean(false);

    public static boolean isOnRuleAlteredActionEnabled(RuleConfiguration ruleConfiguration) {
        if (null == ruleConfiguration) {
            return false;
        }
        Optional findInstance = RuleAlteredDetectorFactory.findInstance(ruleConfiguration);
        return findInstance.isPresent() && ((RuleAlteredDetector) findInstance.get()).getOnRuleAlteredActionConfig(ruleConfiguration).isPresent();
    }

    public static void initWorkerIfNecessary() {
        if (WORKER_INITIALIZED.get()) {
            return;
        }
        synchronized (WORKER_INITIALIZED) {
            if (WORKER_INITIALIZED.get()) {
                return;
            }
            log.info("start worker initialization");
            ShardingSphereEventBus.getInstance().register(INSTANCE);
            new FinishedCheckJobExecutor().start();
            new PipelineJobExecutor().start();
            WORKER_INITIALIZED.set(true);
            log.info("worker initialization done");
        }
    }

    public static RuleAlteredContext createRuleAlteredContext(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        YamlRuleConfiguration yamlRuleConfiguration = null;
        Iterator it = getYamlRootConfig(ruleAlteredJobConfiguration).getRules().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            YamlRuleConfiguration yamlRuleConfiguration2 = (YamlRuleConfiguration) it.next();
            if (ruleAlteredJobConfiguration.getAlteredRuleYamlClassNameTablesMap().containsKey(yamlRuleConfiguration2.getClass().getName())) {
                yamlRuleConfiguration = yamlRuleConfiguration2;
                break;
            }
        }
        if (null == yamlRuleConfiguration) {
            throw new PipelineJobCreationException("could not find altered rule");
        }
        RuleConfiguration swapToRuleConfiguration = SWAPPER_ENGINE.swapToRuleConfiguration(yamlRuleConfiguration);
        Optional findInstance = RuleAlteredDetectorFactory.findInstance(swapToRuleConfiguration);
        Preconditions.checkState(findInstance.isPresent());
        Optional onRuleAlteredActionConfig = ((RuleAlteredDetector) findInstance.get()).getOnRuleAlteredActionConfig(swapToRuleConfiguration);
        if (onRuleAlteredActionConfig.isPresent()) {
            return new RuleAlteredContext(ruleAlteredJobConfiguration.getJobId(), (OnRuleAlteredActionConfiguration) onRuleAlteredActionConfig.get());
        }
        log.error("rule altered action enabled but actor is not configured, ignored, ruleConfig={}", swapToRuleConfiguration);
        throw new PipelineJobCreationException("rule altered actor not configured");
    }

    private static YamlRootConfiguration getYamlRootConfig(RuleAlteredJobConfiguration ruleAlteredJobConfiguration) {
        ShardingSpherePipelineDataSourceConfiguration newInstance = PipelineDataSourceConfigurationFactory.newInstance(ruleAlteredJobConfiguration.getTarget().getType(), ruleAlteredJobConfiguration.getTarget().getParameter());
        return newInstance instanceof ShardingSpherePipelineDataSourceConfiguration ? newInstance.getRootConfig() : PipelineDataSourceConfigurationFactory.newInstance(ruleAlteredJobConfiguration.getSource().getType(), ruleAlteredJobConfiguration.getSource().getParameter()).getRootConfig();
    }

    @Subscribe
    public void start(StartScalingEvent startScalingEvent) {
        log.info("Start scaling job by {}", startScalingEvent);
        if (hasUncompletedJobOfSameDatabaseName(startScalingEvent.getDatabaseName())) {
            log.warn("There is uncompleted job with the same database name, please handle it first, current job will be ignored");
            return;
        }
        Optional<RuleAlteredJobConfiguration> createJobConfig = createJobConfig(startScalingEvent);
        if (createJobConfig.isPresent()) {
            RuleAlteredJobAPIFactory.getInstance().start(createJobConfig.get());
            return;
        }
        log.info("Switch rule configuration immediately.");
        ShardingSphereEventBus.getInstance().post(new ScalingTaskFinishedEvent(startScalingEvent.getDatabaseName(), startScalingEvent.getActiveVersion(), startScalingEvent.getNewVersion()));
        ShardingSphereEventBus.getInstance().post(new ScalingReleaseDatabaseLevelLockEvent(startScalingEvent.getDatabaseName()));
    }

    private Optional<RuleAlteredJobConfiguration> createJobConfig(StartScalingEvent startScalingEvent) {
        YamlRootConfiguration yamlRootConfiguration = getYamlRootConfiguration(startScalingEvent.getDatabaseName(), startScalingEvent.getSourceDataSource(), startScalingEvent.getSourceRule());
        YamlRootConfiguration yamlRootConfiguration2 = getYamlRootConfiguration(startScalingEvent.getDatabaseName(), startScalingEvent.getTargetDataSource(), startScalingEvent.getTargetRule());
        HashMap hashMap = new HashMap();
        for (Pair<YamlRuleConfiguration, YamlRuleConfiguration> pair : groupSourceTargetRuleConfigsByType(yamlRootConfiguration.getRules(), yamlRootConfiguration2.getRules())) {
            YamlRuleConfiguration yamlRuleConfiguration = null == pair.getLeft() ? (YamlRuleConfiguration) pair.getRight() : (YamlRuleConfiguration) pair.getLeft();
            Optional findInstance = RuleAlteredDetectorFactory.findInstance(yamlRuleConfiguration);
            if (findInstance.isPresent()) {
                List findRuleAlteredLogicTables = ((RuleAlteredDetector) findInstance.get()).findRuleAlteredLogicTables((YamlRuleConfiguration) pair.getLeft(), (YamlRuleConfiguration) pair.getRight(), yamlRootConfiguration.getDataSources(), yamlRootConfiguration2.getDataSources());
                log.info("type={}, ruleAlteredLogicTables={}", yamlRuleConfiguration.getClass().getName(), findRuleAlteredLogicTables);
                if (!findRuleAlteredLogicTables.isEmpty()) {
                    hashMap.put(yamlRuleConfiguration.getClass().getName(), findRuleAlteredLogicTables);
                }
            }
        }
        if (hashMap.isEmpty()) {
            log.error("no altered rule");
            throw new PipelineJobCreationException("no altered rule");
        }
        if (hashMap.size() > 1) {
            log.error("more than 1 rule altered");
            throw new PipelineJobCreationException("more than 1 rule altered");
        }
        YamlRuleAlteredJobConfiguration yamlRuleAlteredJobConfiguration = new YamlRuleAlteredJobConfiguration();
        yamlRuleAlteredJobConfiguration.setDatabaseName(startScalingEvent.getDatabaseName());
        yamlRuleAlteredJobConfiguration.setAlteredRuleYamlClassNameTablesMap(hashMap);
        yamlRuleAlteredJobConfiguration.setActiveVersion(Integer.valueOf(startScalingEvent.getActiveVersion()));
        yamlRuleAlteredJobConfiguration.setNewVersion(Integer.valueOf(startScalingEvent.getNewVersion()));
        yamlRuleAlteredJobConfiguration.setSource(createYamlPipelineDataSourceConfiguration(yamlRootConfiguration));
        yamlRuleAlteredJobConfiguration.setTarget(createYamlPipelineDataSourceConfiguration(yamlRootConfiguration2));
        yamlRuleAlteredJobConfiguration.extendConfiguration();
        return Optional.of(new RuleAlteredJobConfigurationSwapper().swapToObject(yamlRuleAlteredJobConfiguration));
    }

    private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(Collection<YamlRuleConfiguration> collection, Collection<YamlRuleConfiguration> collection2) {
        Map map = (Map) collection.stream().collect(Collectors.toMap((v0) -> {
            return v0.getClass();
        }, Function.identity()));
        Map map2 = (Map) collection2.stream().collect(Collectors.toMap((v0) -> {
            return v0.getClass();
        }, Function.identity()));
        LinkedList linkedList = new LinkedList();
        for (Map.Entry entry : map.entrySet()) {
            linkedList.add(Pair.of((YamlRuleConfiguration) entry.getValue(), (YamlRuleConfiguration) map2.get(entry.getKey())));
        }
        for (Map.Entry entry2 : map2.entrySet()) {
            if (!map.containsKey(entry2.getKey())) {
                linkedList.add(Pair.of((Object) null, (YamlRuleConfiguration) entry2.getValue()));
            }
        }
        return linkedList;
    }

    private YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(YamlRootConfiguration yamlRootConfiguration) {
        ShardingSpherePipelineDataSourceConfiguration shardingSpherePipelineDataSourceConfiguration = new ShardingSpherePipelineDataSourceConfiguration(yamlRootConfiguration);
        YamlPipelineDataSourceConfiguration yamlPipelineDataSourceConfiguration = new YamlPipelineDataSourceConfiguration();
        yamlPipelineDataSourceConfiguration.setType(shardingSpherePipelineDataSourceConfiguration.getType());
        yamlPipelineDataSourceConfiguration.setParameter(shardingSpherePipelineDataSourceConfiguration.getParameter());
        return yamlPipelineDataSourceConfiguration;
    }

    private YamlRootConfiguration getYamlRootConfiguration(String str, String str2, String str3) {
        YamlRootConfiguration yamlRootConfiguration = new YamlRootConfiguration();
        yamlRootConfiguration.setDatabaseName(str);
        yamlRootConfiguration.setDataSources((Map) YamlEngine.unmarshal(str2, Map.class));
        yamlRootConfiguration.setRules((Collection) YamlEngine.unmarshal(str3, Collection.class, true));
        return yamlRootConfiguration;
    }

    public static TaskConfiguration buildTaskConfig(RuleAlteredJobConfiguration ruleAlteredJobConfiguration, int i, OnRuleAlteredActionConfiguration onRuleAlteredActionConfiguration) {
        return RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(ruleAlteredJobConfiguration, i, onRuleAlteredActionConfiguration);
    }

    private boolean hasUncompletedJobOfSameDatabaseName(String str) {
        boolean z = false;
        Iterator it = RuleAlteredJobAPIFactory.getInstance().list().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            JobInfo jobInfo = (JobInfo) it.next();
            if (!RuleAlteredJobAPIFactory.getInstance().getProgress(jobInfo.getJobId()).values().stream().allMatch(jobProgress -> {
                return null != jobProgress && jobProgress.getStatus().equals(JobStatus.FINISHED);
            }) && str.equals(RuleAlteredJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter()).getDatabaseName())) {
                z = true;
                break;
            }
        }
        return z;
    }

    @Subscribe
    public void scalingReleaseDatabaseLevelLock(ScalingReleaseDatabaseLevelLockEvent scalingReleaseDatabaseLevelLockEvent) {
        String databaseName = scalingReleaseDatabaseLevelLockEvent.getDatabaseName();
        try {
            restoreSourceWriting(databaseName);
        } catch (RuntimeException e) {
            log.error("restore source writing failed, databaseName={}", databaseName, e);
        }
        PipelineSimpleLock.getInstance().releaseLock(scalingReleaseDatabaseLevelLockEvent.getDatabaseName());
    }

    private void restoreSourceWriting(String str) {
        log.info("restoreSourceWriting, databaseName={}", str);
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        if (lockContext.isLocked(str)) {
            log.info("Source writing is still stopped on database '{}', restore it now", str);
            lockContext.releaseLock(str);
        }
    }
}
