/*
 * Decompiled with CFR 0.152.
 */
package io.polaris.framework.toolkit.elasticjob.base;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.lite.internal.sharding.ShardingService;
import com.dangdang.ddframe.job.lite.lifecycle.domain.JobSettings;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import io.polaris.framework.toolkit.elasticjob.base.BasePooledJob;
import io.polaris.framework.toolkit.elasticjob.base.PooledJobExecutor;
import io.polaris.framework.toolkit.elasticjob.context.JobCtx;
import io.polaris.framework.toolkit.elasticjob.context.JobCtxHolder;
import io.polaris.framework.toolkit.elasticjob.err.JobSkipException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseDataFlowPooledJob<T>
extends BasePooledJob<T>
implements DataflowJob {
    private static final Logger log = LoggerFactory.getLogger(BaseDataFlowPooledJob.class);
    private Map<String, PooledJobExecutor> jobExecutors = new ConcurrentHashMap<String, PooledJobExecutor>();

    private String toKey(ShardingContext shardingContext) {
        return shardingContext.getJobName() + "/" + shardingContext.getShardingTotalCount() + "/" + shardingContext.getShardingItem();
    }

    private PooledJobExecutor getOrInit(ShardingContext shardingContext) {
        String key = this.toKey(shardingContext);
        PooledJobExecutor jobExecutor = this.jobExecutors.get(key);
        if (jobExecutor == null) {
            jobExecutor = new PooledJobExecutor(shardingContext);
            try {
                this.doBefore(jobExecutor, shardingContext);
                jobExecutor.setStatisticsPersistedDao(this::saveJobStatistics);
                jobExecutor.doInit();
                this.jobExecutors.put(key, jobExecutor);
            }
            catch (RuntimeException e) {
                String message = "[\u4f5c\u4e1a:" + shardingContext.getJobName() + ", \u5206\u7247:" + shardingContext.getShardingItem() + "] \u521d\u59cb\u5316\u4f5c\u4e1a\u8d44\u6e90\u65f6\u53d1\u751f\u5f02\u5e38: " + e.getMessage();
                log.error(message, (Throwable)e);
                this.doAfter(jobExecutor, shardingContext);
                throw e;
            }
        }
        return jobExecutor;
    }

    @Override
    protected void doAfter(PooledJobExecutor jobExecutor, ShardingContext shardingContext) {
        jobExecutor.close();
        String key = this.toKey(jobExecutor.getShardingContext());
        this.jobExecutors.remove(key);
    }

    public final List<T> fetchData(ShardingContext shardingContext) {
        PooledJobExecutor jobExecutor = this.getOrInit(shardingContext);
        List list = null;
        try {
            list = this.doFetchData(jobExecutor, shardingContext);
        }
        catch (JobSkipException e) {
            log.warn("[\u4f5c\u4e1a:{}, \u5206\u7247:{}] {}", new Object[]{shardingContext.getJobName(), shardingContext.getShardingItem(), e.getMessage()});
        }
        catch (RuntimeException e) {
            log.warn("[\u4f5c\u4e1a:{}, \u5206\u7247:{}] \u83b7\u53d6\u5f85\u5904\u7406\u6570\u636e\u65f6\u53d1\u751f\u5f02\u5e38.", (Object)shardingContext.getJobName(), (Object)shardingContext.getShardingItem());
            this.doAfter(jobExecutor, shardingContext);
            throw e;
        }
        if (list == null || list.isEmpty()) {
            log.info("[\u4f5c\u4e1a:{}, \u5206\u7247:{}] \u5f85\u5904\u7406\u6570\u636e\u5168\u90e8\u5904\u7406\u5b8c\u6210.", (Object)shardingContext.getJobName(), (Object)shardingContext.getShardingItem());
            jobExecutor.notifyFetched(0);
            this.doAfter(jobExecutor, shardingContext);
        }
        return list;
    }

    public final void processData(ShardingContext shardingContext, List list) {
        PooledJobExecutor jobExecutor = this.getOrInit(shardingContext);
        try {
            if (list != null && !list.isEmpty()) {
                jobExecutor.notifyFetched(list.size());
                for (Object data : list) {
                    jobExecutor.execute(() -> this.doProcessData(jobExecutor, shardingContext, data));
                }
                if (!this.isEligibleForJobRunning(shardingContext.getJobName())) {
                    log.info("[\u4f5c\u4e1a:{}, \u5206\u7247:{}] \u4e0d\u6ee1\u8db3\u53d6\u6570\u6761\u4ef6, \u4e0d\u518d\u7ee7\u7eed\u53d6\u6570.", (Object)shardingContext.getJobName(), (Object)shardingContext.getShardingItem());
                    jobExecutor.notifyFetched(0);
                    this.doAfter(jobExecutor, shardingContext);
                }
            } else {
                log.info("[\u4f5c\u4e1a:{}, \u5206\u7247:{}] \u5f85\u5904\u7406\u6570\u636e\u5168\u90e8\u5904\u7406\u5b8c\u6210.", (Object)shardingContext.getJobName(), (Object)shardingContext.getShardingItem());
                jobExecutor.notifyFetched(0);
                this.doAfter(jobExecutor, shardingContext);
            }
        }
        catch (JobSkipException e) {
            log.warn("[\u4f5c\u4e1a:{}, \u5206\u7247:{}] {}", new Object[]{shardingContext.getJobName(), shardingContext.getShardingItem(), e.getMessage()});
            jobExecutor.notifyFetched(0);
            this.doAfter(jobExecutor, shardingContext);
        }
        catch (RuntimeException e) {
            String message = "[\u4f5c\u4e1a:" + shardingContext.getJobName() + ", \u5206\u7247:" + shardingContext.getShardingItem() + "] \u5904\u7406\u6570\u636e\u65f6\u53d1\u751f\u5f02\u5e38: " + e.getMessage();
            log.error(message, (Throwable)e);
            this.doAfter(jobExecutor, shardingContext);
            throw e;
        }
    }

    protected boolean isEligibleForJobRunning(String jobName) {
        boolean eligible = true;
        JobCtx jobCtx = JobCtxHolder.get();
        if (jobCtx != null) {
            JobSettings jobSettings = jobCtx.getJobApi().getJobSettingsApi().getJobSettings(jobName);
            if (!jobSettings.isStreamingProcess()) {
                eligible = false;
            } else {
                CoordinatorRegistryCenter regCenter = JobRegistry.getInstance().getRegCenter(jobName);
                ShardingService shardingService = new ShardingService(regCenter, jobName);
                if (shardingService.isNeedSharding()) {
                    eligible = false;
                }
            }
        }
        return eligible;
    }
}

