package gobblin.runtime;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import gobblin.configuration.ConfigurationKeys;
import gobblin.metrics.GobblinMetrics;
import gobblin.runtime.fork.Fork;
import gobblin.util.ExecutorsUtils;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-runtime-0.11.0.jar:gobblin/runtime/TaskExecutor.class */
public class TaskExecutor extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TaskExecutor.class);
    private final ScheduledExecutorService taskExecutor;
    private final ExecutorService forkExecutor;
    private final long retryIntervalInSeconds;

    private TaskExecutor(int i, int i2, long j) {
        Preconditions.checkArgument(i > 0, "Task executor thread pool size should be positive");
        Preconditions.checkArgument(j > 0, "Task retry interval should be positive");
        this.taskExecutor = Executors.newScheduledThreadPool(i, ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d")));
        this.retryIntervalInSeconds = j;
        this.forkExecutor = new ThreadPoolExecutor(i, Integer.MAX_VALUE, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("ForkExecutor-%d")));
    }

    public TaskExecutor(Properties properties) {
        this(Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_EXECUTOR_THREADPOOL_SIZE_KEY, Integer.toString(2))), Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY, Integer.toString(1))), Long.parseLong(properties.getProperty(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY, Long.toString(300L))));
    }

    public TaskExecutor(Configuration configuration) {
        this(configuration.getInt(ConfigurationKeys.TASK_EXECUTOR_THREADPOOL_SIZE_KEY, 2), configuration.getInt(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY, 1), configuration.getLong(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY, 300L));
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() throws Exception {
        LOG.info("Starting the task executor");
        if (this.taskExecutor.isShutdown() || this.taskExecutor.isTerminated()) {
            throw new IllegalStateException("Task thread pool executor is shutdown or terminated");
        }
        if (this.forkExecutor.isShutdown() || this.forkExecutor.isTerminated()) {
            throw new IllegalStateException("Fork thread pool executor is shutdown or terminated");
        }
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void shutDown() throws Exception {
        LOG.info("Stopping the task executor");
        try {
            ExecutorsUtils.shutdownExecutorService(this.taskExecutor, Optional.of(LOG));
        } finally {
            ExecutorsUtils.shutdownExecutorService(this.forkExecutor, Optional.of(LOG));
        }
    }

    public void execute(Task task) {
        LOG.info(String.format("Executing task %s", task.getTaskId()));
        this.taskExecutor.execute(task);
    }

    public Future<?> submit(Task task) {
        LOG.info(String.format("Submitting task %s", task.getTaskId()));
        return this.taskExecutor.submit(task);
    }

    public void execute(Fork fork) {
        LOG.info(String.format("Executing fork %d of task %s", Integer.valueOf(fork.getIndex()), fork.getTaskId()));
        this.forkExecutor.execute(fork);
    }

    public Future<?> submit(Fork fork) {
        LOG.info(String.format("Submitting fork %d of task %s", Integer.valueOf(fork.getIndex()), fork.getTaskId()));
        return this.forkExecutor.submit(fork);
    }

    public void retry(Task task) {
        if (GobblinMetrics.isEnabled(task.getTaskState().getWorkunit()) && task.getTaskState().contains(ConfigurationKeys.FORK_BRANCHES_KEY)) {
            task.getTaskState().adjustJobMetricsOnRetry(task.getTaskState().getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY));
        }
        long retryCount = task.getRetryCount() * this.retryIntervalInSeconds;
        this.taskExecutor.schedule(task, retryCount, TimeUnit.SECONDS);
        LOG.info(String.format("Scheduled retry of failed task %s to run in %d seconds", task.getTaskId(), Long.valueOf(retryCount)));
        task.incrementRetryCount();
    }

    public ExecutorService getForkExecutor() {
        return this.forkExecutor;
    }
}
