package org.apache.shardingsphere.scaling.core.job.task.incremental;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.Generated;
import org.apache.shardingsphere.scaling.core.common.channel.distribution.DistributionChannel;
import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
import org.apache.shardingsphere.scaling.core.common.record.Record;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.executor.AbstractScalingExecutor;
import org.apache.shardingsphere.scaling.core.executor.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.executor.dumper.DumperFactory;
import org.apache.shardingsphere.scaling.core.executor.engine.ExecuteCallback;
import org.apache.shardingsphere.scaling.core.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.executor.importer.ImporterFactory;
import org.apache.shardingsphere.scaling.core.executor.importer.ImporterListener;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.class */
public final class IncrementalTask extends AbstractScalingExecutor implements ScalingTask {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(IncrementalTask.class);
    private final String taskId;
    private final int concurrency;
    private final DumperConfiguration dumperConfig;
    private final ImporterConfiguration importerConfig;
    private Dumper dumper;
    private final DataSourceManager dataSourceManager = new DataSourceManager();
    private final IncrementalTaskProgress progress = new IncrementalTaskProgress();

    public IncrementalTask(int i, DumperConfiguration dumperConfiguration, ImporterConfiguration importerConfiguration) {
        this.concurrency = i;
        this.dumperConfig = dumperConfiguration;
        this.importerConfig = importerConfiguration;
        this.taskId = dumperConfiguration.getDataSourceName();
        this.progress.setPosition(dumperConfiguration.getPosition());
    }

    @Override // org.apache.shardingsphere.scaling.core.executor.AbstractScalingExecutor, org.apache.shardingsphere.scaling.core.executor.ScalingExecutor
    public void start() {
        this.progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
        this.dumper = DumperFactory.newInstanceLogDumper(this.dumperConfig, this.progress.getPosition());
        List<Importer> instanceImporters = instanceImporters();
        instanceChannel(instanceImporters);
        Future<?> submitAll = ScalingContext.getInstance().getIncrementalDumperExecuteEngine().submitAll(instanceImporters, getExecuteCallback());
        this.dumper.start();
        waitForResult(submitAll);
        this.dataSourceManager.close();
    }

    private List<Importer> instanceImporters() {
        ArrayList arrayList = new ArrayList(this.concurrency);
        for (int i = 0; i < this.concurrency; i++) {
            arrayList.add(ImporterFactory.newInstance(this.importerConfig, this.dataSourceManager));
        }
        return arrayList;
    }

    private void instanceChannel(Collection<Importer> collection) {
        DistributionChannel distributionChannel = new DistributionChannel(collection.size(), list -> {
            Record record = (Record) list.get(list.size() - 1);
            if (record.getPosition() instanceof PlaceholderPosition) {
                return;
            }
            this.progress.setPosition(record.getPosition());
            this.progress.getIncrementalTaskDelay().setLastEventTimestamps(record.getCommitTime());
        });
        this.dumper.setChannel(distributionChannel);
        ImporterListener importerListener = list2 -> {
            this.progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
        };
        for (Importer importer : collection) {
            importer.setChannel(distributionChannel);
            importer.setImporterListener(importerListener);
        }
    }

    private ExecuteCallback getExecuteCallback() {
        return new ExecuteCallback() { // from class: org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTask.1
            @Override // org.apache.shardingsphere.scaling.core.executor.engine.ExecuteCallback
            public void onSuccess() {
            }

            @Override // org.apache.shardingsphere.scaling.core.executor.engine.ExecuteCallback
            public void onFailure(Throwable th) {
                IncrementalTask.log.error("get an error when migrating the increment data", th);
                IncrementalTask.this.dumper.stop();
            }
        };
    }

    private void waitForResult(Future<?> future) {
        try {
            future.get();
        } catch (InterruptedException e) {
        } catch (ExecutionException e2) {
            throw new ScalingTaskExecuteException(String.format("Task %s execute failed ", this.taskId), e2.getCause());
        }
    }

    @Override // org.apache.shardingsphere.scaling.core.executor.AbstractScalingExecutor, org.apache.shardingsphere.scaling.core.executor.ScalingExecutor
    public void stop() {
        if (null != this.dumper) {
            this.dumper.stop();
            this.dumper = null;
        }
    }

    @Override // org.apache.shardingsphere.scaling.core.job.task.ScalingTask
    @Generated
    public String getTaskId() {
        return this.taskId;
    }

    @Override // org.apache.shardingsphere.scaling.core.job.task.ScalingTask
    @Generated
    public IncrementalTaskProgress getProgress() {
        return this.progress;
    }
}
