package org.apache.gobblin.runtime;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.runtime.fork.Fork;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/runtime/TaskExecutor.class */
public class TaskExecutor extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
    private final ScheduledExecutorService taskExecutor;
    private final ExecutorService forkExecutor;
    private final long retryIntervalInSeconds;
    private final int queuedTaskTimeMaxSize;
    private final long queuedTaskTimeMaxAge;
    private final Map<String, Long> queuedTasks;
    private final ConcurrentSkipListMap<Long, Long> queuedTaskTimeHistorical;
    private long lastCalculationTime;
    private AtomicInteger queuedTaskCount;
    private AtomicInteger currentQueuedTaskCount;
    private AtomicInteger historicalQueuedTaskCount;
    private AtomicLong queuedTaskTotalTime;
    private AtomicLong currentQueuedTaskTotalTime;
    private AtomicLong historicalQueuedTaskTotalTime;
    private final Counter runningTaskCount;
    private final Meter successfulTaskCount;
    private final Meter failedTaskCount;
    private final Timer taskCreateAndRunTimer;
    private final TaskExecutorQueueMetricSet metricSet;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/runtime/TaskExecutor$TaskExecutorQueueMetricSet.class */
    public class TaskExecutorQueueMetricSet implements MetricSet {
        private TaskExecutorQueueMetricSet() {
        }

        public Map<String, Metric> getMetrics() {
            HashMap hashMap = new HashMap();
            hashMap.put(MetricRegistry.name("queued", new String[]{"current", "count"}), new Gauge<Integer>() { // from class: org.apache.gobblin.runtime.TaskExecutor.TaskExecutorQueueMetricSet.1
                /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
                public Integer m28getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return Integer.valueOf(TaskExecutor.this.currentQueuedTaskCount.intValue());
                }
            });
            hashMap.put(MetricRegistry.name("queued", new String[]{"historical", "count"}), new Gauge<Integer>() { // from class: org.apache.gobblin.runtime.TaskExecutor.TaskExecutorQueueMetricSet.2
                /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
                public Integer m29getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return Integer.valueOf(TaskExecutor.this.historicalQueuedTaskCount.intValue());
                }
            });
            hashMap.put(MetricRegistry.name("queued", new String[]{"count"}), new Gauge<Integer>() { // from class: org.apache.gobblin.runtime.TaskExecutor.TaskExecutorQueueMetricSet.3
                /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
                public Integer m30getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return Integer.valueOf(TaskExecutor.this.queuedTaskCount.intValue());
                }
            });
            hashMap.put(MetricRegistry.name("queued", new String[]{"current", "time", "total"}), new Gauge<Long>() { // from class: org.apache.gobblin.runtime.TaskExecutor.TaskExecutorQueueMetricSet.4
                /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
                public Long m31getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return Long.valueOf(TaskExecutor.this.currentQueuedTaskTotalTime.longValue());
                }
            });
            hashMap.put(MetricRegistry.name("queued", new String[]{"historical", "time", "total"}), new Gauge<Long>() { // from class: org.apache.gobblin.runtime.TaskExecutor.TaskExecutorQueueMetricSet.5
                /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
                public Long m32getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return Long.valueOf(TaskExecutor.this.historicalQueuedTaskTotalTime.longValue());
                }
            });
            hashMap.put(MetricRegistry.name("queued", new String[]{"time", "total"}), new Gauge<Long>() { // from class: org.apache.gobblin.runtime.TaskExecutor.TaskExecutorQueueMetricSet.6
                /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
                public Long m33getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return Long.valueOf(TaskExecutor.this.queuedTaskTotalTime.longValue());
                }
            });
            hashMap.put(MetricRegistry.name("running", new String[]{"count"}), TaskExecutor.this.runningTaskCount);
            hashMap.put(MetricRegistry.name("successful", new String[]{"count"}), TaskExecutor.this.successfulTaskCount);
            hashMap.put(MetricRegistry.name("failed", new String[]{"count"}), TaskExecutor.this.failedTaskCount);
            return Collections.unmodifiableMap(hashMap);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/runtime/TaskExecutor$TrackingTask.class */
    public class TrackingTask implements Runnable {
        private Task underlyingTask;

        public TrackingTask(TaskExecutor taskExecutor, Task task) {
            this(task, 0L, TimeUnit.SECONDS);
        }

        public TrackingTask(Task task, long j, TimeUnit timeUnit) {
            long currentTimeMillis = System.currentTimeMillis();
            long millis = currentTimeMillis + timeUnit.toMillis(j);
            Logger logger = TaskExecutor.LOG;
            Object[] objArr = new Object[2];
            objArr[0] = task.getTaskId();
            objArr[1] = millis <= currentTimeMillis ? "now" : "at " + millis;
            logger.debug(String.format("Task %s queued to run %s.", objArr));
            TaskExecutor.this.queuedTasks.putIfAbsent(task.getTaskId(), Long.valueOf(millis));
            this.underlyingTask = task;
        }

        @Override // java.lang.Runnable
        public void run() {
            onStart(System.currentTimeMillis());
            try {
                try {
                    this.underlyingTask.run();
                    TaskExecutor.this.successfulTaskCount.mark();
                    TaskExecutor.this.runningTaskCount.dec();
                } catch (Exception e) {
                    TaskExecutor.this.failedTaskCount.mark();
                    throw e;
                }
            } catch (Throwable th) {
                TaskExecutor.this.runningTaskCount.dec();
                throw th;
            }
        }

        private void onStart(long j) {
            Long l = (Long) TaskExecutor.this.queuedTasks.remove(this.underlyingTask.getTaskId());
            long propAsLong = this.underlyingTask.getTaskContext().getTaskState().getPropAsLong("workunit.creation.time.in.millis", 0L);
            long longValue = j - l.longValue();
            TaskExecutor.this.taskCreateAndRunTimer.update(j - propAsLong, TimeUnit.MILLISECONDS);
            TaskExecutor.LOG.debug(String.format("Task %s started. Saving queued time of %d ms to history.", this.underlyingTask.getTaskId(), Long.valueOf(longValue)));
            TaskExecutor.this.queuedTaskTimeHistorical.putIfAbsent(Long.valueOf(System.currentTimeMillis()), Long.valueOf(longValue));
            TaskExecutor.this.runningTaskCount.inc();
        }
    }

    private TaskExecutor(int i, int i2, long j, int i3, long j2, int i4) {
        this.queuedTasks = Maps.newConcurrentMap();
        this.queuedTaskTimeHistorical = new ConcurrentSkipListMap<>();
        this.lastCalculationTime = 0L;
        this.queuedTaskCount = new AtomicInteger();
        this.currentQueuedTaskCount = new AtomicInteger();
        this.historicalQueuedTaskCount = new AtomicInteger();
        this.queuedTaskTotalTime = new AtomicLong();
        this.currentQueuedTaskTotalTime = new AtomicLong();
        this.historicalQueuedTaskTotalTime = new AtomicLong();
        this.runningTaskCount = new Counter();
        this.successfulTaskCount = new Meter();
        this.failedTaskCount = new Meter();
        this.metricSet = new TaskExecutorQueueMetricSet();
        Preconditions.checkArgument(i > 0, "Task executor thread pool size should be positive");
        Preconditions.checkArgument(j > 0, "Task retry interval should be positive");
        Preconditions.checkArgument(i3 > 0, "Queued task time max size should be positive");
        Preconditions.checkArgument(j2 > 0, "Queued task time max age should be positive");
        this.taskExecutor = ExecutorsUtils.loggingDecorator(Executors.newScheduledThreadPool(i, ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d"))));
        this.retryIntervalInSeconds = j;
        this.queuedTaskTimeMaxSize = i3;
        this.queuedTaskTimeMaxAge = j2;
        this.taskCreateAndRunTimer = new Timer(new SlidingTimeWindowReservoir(i4, TimeUnit.MINUTES));
        this.forkExecutor = ExecutorsUtils.loggingDecorator(new ThreadPoolExecutor(i, Integer.MAX_VALUE, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("ForkExecutor-%d"))));
    }

    public TaskExecutor(Properties properties) {
        this(Integer.parseInt(properties.getProperty("taskexecutor.threadpool.size", Integer.toString(2))), Integer.parseInt(properties.getProperty("taskretry.threadpool.coresize", Integer.toString(1))), Long.parseLong(properties.getProperty("task.retry.intervalinsec", Long.toString(300L))), Integer.parseInt(properties.getProperty("taskexecutor.queued_task_time.history.max_size", Integer.toString(2048))), Long.parseLong(properties.getProperty("taskexecutor.queued_task_time.history.max_age", Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))), Integer.parseInt(properties.getProperty("metrics.timer.window.size.in.minutes", Integer.toString(15))));
    }

    public TaskExecutor(Configuration configuration) {
        this(configuration.getInt("taskexecutor.threadpool.size", 2), configuration.getInt("taskretry.threadpool.coresize", 1), configuration.getLong("task.retry.intervalinsec", 300L), configuration.getInt("taskexecutor.queued_task_time.history.max_size", 2048), configuration.getLong("taskexecutor.queued_task_time.history.max_age", ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE), configuration.getInt("metrics.timer.window.size.in.minutes", 15));
    }

    protected void startUp() throws Exception {
        LOG.info("Starting the task executor");
        if (this.taskExecutor.isShutdown() || this.taskExecutor.isTerminated()) {
            throw new IllegalStateException("Task thread pool executor is shutdown or terminated");
        }
        if (this.forkExecutor.isShutdown() || this.forkExecutor.isTerminated()) {
            throw new IllegalStateException("Fork thread pool executor is shutdown or terminated");
        }
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping the task executor");
        try {
            ExecutorsUtils.shutdownExecutorService(this.taskExecutor, Optional.of(LOG));
        } finally {
            ExecutorsUtils.shutdownExecutorService(this.forkExecutor, Optional.of(LOG));
        }
    }

    public void execute(Task task) {
        LOG.info(String.format("Executing task %s", task.getTaskId()));
        this.taskExecutor.execute(new TrackingTask(this, task));
    }

    public Future<?> submit(Task task) {
        LOG.info(String.format("Submitting task %s", task.getTaskId()));
        return this.taskExecutor.submit(new TrackingTask(this, task));
    }

    public void execute(Fork fork) {
        LOG.info(String.format("Executing fork %d of task %s", Integer.valueOf(fork.getIndex()), fork.getTaskId()));
        this.forkExecutor.execute(fork);
    }

    public Future<?> submit(Fork fork) {
        LOG.info(String.format("Submitting fork %d of task %s", Integer.valueOf(fork.getIndex()), fork.getTaskId()));
        return this.forkExecutor.submit(fork);
    }

    public void retry(Task task) {
        if (GobblinMetrics.isEnabled(task.getTaskState().getWorkunit()) && task.getTaskState().contains("fork.branches")) {
            task.getTaskState().adjustJobMetricsOnRetry(task.getTaskState().getPropAsInt("fork.branches"));
        }
        long retryCount = task.getRetryCount() * this.retryIntervalInSeconds;
        this.taskExecutor.schedule(new TrackingTask(task, retryCount, TimeUnit.SECONDS), retryCount, TimeUnit.SECONDS);
        LOG.info(String.format("Scheduled retry of failed task %s to run in %d seconds", task.getTaskId(), Long.valueOf(retryCount)));
        task.incrementRetryCount();
    }

    public MetricSet getTaskExecutorQueueMetricSet() {
        return this.metricSet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void calculateMetrics() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.lastCalculationTime >= currentTimeMillis - TimeUnit.SECONDS.toMillis(10L)) {
            LOG.debug("Skipped metric calculation because not enough time has elapsed since the last calculation.");
            return;
        }
        LOG.debug("Starting metric calculation.");
        int i = 0;
        int i2 = 0;
        long j = 0;
        for (Map.Entry<String, Long> entry : this.queuedTasks.entrySet()) {
            if (entry.getValue().longValue() <= currentTimeMillis) {
                i++;
                long longValue = currentTimeMillis - entry.getValue().longValue();
                j += longValue;
                LOG.debug(String.format("Task %s has been waiting in the queue for %d ms.", entry.getKey(), Long.valueOf(longValue)));
            } else {
                i2++;
            }
        }
        if (i2 > 0) {
            LOG.debug(String.format("%d tasks were ignored during metric calculations because they are scheduled to run in the future.", Integer.valueOf(i2)));
        }
        this.currentQueuedTaskCount.set(i);
        this.currentQueuedTaskTotalTime.set(j);
        LOG.debug(String.format("%d current tasks have been waiting for a total of %d ms.", Integer.valueOf(i), Long.valueOf(j)));
        int i3 = 0;
        long j2 = 0;
        long j3 = currentTimeMillis - this.queuedTaskTimeMaxAge;
        Iterator it = this.queuedTaskTimeHistorical.descendingMap().entrySet().iterator();
        while (it.hasNext()) {
            try {
                Map.Entry entry2 = (Map.Entry) it.next();
                if (((Long) entry2.getKey()).longValue() < j3 || i3 >= this.queuedTaskTimeMaxSize) {
                    LOG.debug(String.format("Task started at %d is before the cutoff of %d and is being removed. Queue time %d will be removed from metric calculations.", entry2.getKey(), Long.valueOf(j3), entry2.getValue()));
                    it.remove();
                } else {
                    i3++;
                    j2 += ((Long) entry2.getValue()).longValue();
                    LOG.debug(String.format("Task started at %d is after cutoff. Queue time %d will be used in metric calculations.", entry2.getKey(), entry2.getValue()));
                }
            } catch (NoSuchElementException e) {
                LOG.warn("Ran out of items in historical task queue time set.");
            }
        }
        this.historicalQueuedTaskCount.set(i3);
        this.historicalQueuedTaskTotalTime.set(j2);
        LOG.debug(String.format("%d historical tasks have been waiting for a total of %d ms.", Integer.valueOf(i3), Long.valueOf(j2)));
        int i4 = i + i3;
        long j4 = j + j2;
        this.queuedTaskCount.set(i4);
        this.queuedTaskTotalTime.set(j4);
        LOG.debug(String.format("%d tasks have been waiting for a total of %d ms.", Integer.valueOf(i4), Long.valueOf(j4)));
        this.lastCalculationTime = currentTimeMillis;
        LOG.debug("Finished metric calculation.");
    }

    public ExecutorService getForkExecutor() {
        return this.forkExecutor;
    }

    public long getRetryIntervalInSeconds() {
        return this.retryIntervalInSeconds;
    }

    public int getQueuedTaskTimeMaxSize() {
        return this.queuedTaskTimeMaxSize;
    }

    public long getQueuedTaskTimeMaxAge() {
        return this.queuedTaskTimeMaxAge;
    }

    public AtomicInteger getQueuedTaskCount() {
        return this.queuedTaskCount;
    }

    public AtomicInteger getCurrentQueuedTaskCount() {
        return this.currentQueuedTaskCount;
    }

    public AtomicInteger getHistoricalQueuedTaskCount() {
        return this.historicalQueuedTaskCount;
    }

    public AtomicLong getQueuedTaskTotalTime() {
        return this.queuedTaskTotalTime;
    }

    public AtomicLong getCurrentQueuedTaskTotalTime() {
        return this.currentQueuedTaskTotalTime;
    }

    public AtomicLong getHistoricalQueuedTaskTotalTime() {
        return this.historicalQueuedTaskTotalTime;
    }

    public Counter getRunningTaskCount() {
        return this.runningTaskCount;
    }

    public Meter getSuccessfulTaskCount() {
        return this.successfulTaskCount;
    }

    public Meter getFailedTaskCount() {
        return this.failedTaskCount;
    }

    public Timer getTaskCreateAndRunTimer() {
        return this.taskCreateAndRunTimer;
    }
}
