/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.task.service;

import com.google.common.base.Preconditions;
import java.util.concurrent.ExecutorService;
import org.apache.geaflow.cluster.exception.ComponentUncaughtExceptionHandler;
import org.apache.geaflow.cluster.task.runner.ITaskRunner;
import org.apache.geaflow.cluster.task.service.ITaskService;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
import org.apache.geaflow.common.thread.Executors;
import org.apache.geaflow.common.utils.ExecutorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTaskService<TASK, R extends ITaskRunner<TASK>>
implements ITaskService<TASK> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTaskService.class);
    protected ExecutorService executorService;
    private R[] tasks;
    private String threadFormat;
    protected final Configuration configuration;

    public AbstractTaskService(Configuration configuration, String threadFormat) {
        this.threadFormat = threadFormat;
        this.configuration = configuration;
    }

    @Override
    public void start() {
        this.tasks = this.buildTaskRunner();
        Preconditions.checkArgument((this.tasks != null && this.tasks.length != 0 ? 1 : 0) != 0, (Object)"must specify at least one task");
        this.executorService = Executors.getExecutorService((int)this.getMaxMultiple(), (int)this.tasks.length, (String)this.threadFormat, (Thread.UncaughtExceptionHandler)ComponentUncaughtExceptionHandler.INSTANCE);
        for (int i = 0; i < this.tasks.length; ++i) {
            this.executorService.execute((Runnable)this.tasks[i]);
        }
    }

    protected int getMaxMultiple() {
        return this.configuration.getInteger(ExecutionConfigKeys.EXECUTOR_MAX_MULTIPLE);
    }

    @Override
    public void process(int workerId, TASK task) {
        this.tasks[workerId].add(task);
    }

    @Override
    public void interrupt(int workerId) {
        this.tasks[workerId].interrupt();
    }

    @Override
    public void shutdown() {
        LOGGER.info("shutdown executor service {}", (Object)this.threadFormat);
        for (int i = 0; i < this.tasks.length; ++i) {
            this.tasks[i].shutdown();
        }
        ExecutorUtil.shutdown((ExecutorService)this.executorService);
    }

    public R getRunner(int workerId) {
        return this.tasks[workerId];
    }

    protected abstract R[] buildTaskRunner();
}

