/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
import org.joda.time.Duration;

public class TaskQueue {
    private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60L);
    private final List<Task> tasks = new ArrayList<Task>();
    private final Map<String, ListenableFuture<TaskStatus>> taskFutures = new HashMap<String, ListenableFuture<TaskStatus>>();
    private final TaskQueueConfig config;
    private final TaskStorage taskStorage;
    private final TaskRunner taskRunner;
    private final TaskActionClientFactory taskActionClientFactory;
    private final TaskLockbox taskLockbox;
    private final ServiceEmitter emitter;
    private final ReentrantLock giant = new ReentrantLock(true);
    private final Condition managementMayBeNecessary = this.giant.newCondition();
    private final ExecutorService managerExec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("TaskQueue-Manager").build());
    private final ScheduledExecutorService storageSyncExec = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("TaskQueue-StorageSync").build());
    private volatile boolean active = false;
    private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);
    private final ConcurrentHashMap<String, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap();
    private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<String, Long>();
    private Map<String, Long> prevTotalFailedTaskCount = new HashMap<String, Long>();

    @Inject
    public TaskQueue(TaskQueueConfig config, TaskStorage taskStorage, TaskRunner taskRunner, TaskActionClientFactory taskActionClientFactory, TaskLockbox taskLockbox, ServiceEmitter emitter) {
        this.config = (TaskQueueConfig)Preconditions.checkNotNull((Object)config, (Object)"config");
        this.taskStorage = (TaskStorage)Preconditions.checkNotNull((Object)taskStorage, (Object)"taskStorage");
        this.taskRunner = (TaskRunner)Preconditions.checkNotNull((Object)taskRunner, (Object)"taskRunner");
        this.taskActionClientFactory = (TaskActionClientFactory)Preconditions.checkNotNull((Object)taskActionClientFactory, (Object)"taskActionClientFactory");
        this.taskLockbox = (TaskLockbox)Preconditions.checkNotNull((Object)taskLockbox, (Object)"taskLockbox");
        this.emitter = (ServiceEmitter)Preconditions.checkNotNull((Object)emitter, (Object)"emitter");
    }

    @LifecycleStart
    public void start() {
        this.giant.lock();
        try {
            Preconditions.checkState((!this.active ? 1 : 0) != 0, (Object)"queue must be stopped");
            this.active = true;
            this.syncFromStorage();
            this.managerExec.submit(new Runnable(){

                @Override
                public void run() {
                    while (true) {
                        try {
                            TaskQueue.this.manage();
                        }
                        catch (InterruptedException e) {
                            log.info("Interrupted, exiting!", new Object[0]);
                        }
                        catch (Exception e) {
                            long restartDelay = TaskQueue.this.config.getRestartDelay().getMillis();
                            log.makeAlert((Throwable)e, "Failed to manage", new Object[0]).addData("restartDelay", (Object)restartDelay).emit();
                            try {
                                Thread.sleep(restartDelay);
                            }
                            catch (InterruptedException e2) {
                                log.info("Interrupted, exiting!", new Object[0]);
                                break;
                            }
                        }
                    }
                }
            });
            ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.storageSyncExec, (Duration)this.config.getStorageSyncRate(), (Callable)new Callable<ScheduledExecutors.Signal>(){

                @Override
                public ScheduledExecutors.Signal call() {
                    block3: {
                        try {
                            TaskQueue.this.syncFromStorage();
                        }
                        catch (Exception e) {
                            if (!TaskQueue.this.active) break block3;
                            log.makeAlert((Throwable)e, "Failed to sync with storage", new Object[0]).emit();
                        }
                    }
                    if (TaskQueue.this.active) {
                        return ScheduledExecutors.Signal.REPEAT;
                    }
                    return ScheduledExecutors.Signal.STOP;
                }
            });
            this.managementMayBeNecessary.signalAll();
        }
        finally {
            this.giant.unlock();
        }
    }

    @LifecycleStop
    public void stop() {
        this.giant.lock();
        try {
            this.tasks.clear();
            this.taskFutures.clear();
            this.active = false;
            this.managerExec.shutdownNow();
            this.storageSyncExec.shutdownNow();
            this.managementMayBeNecessary.signalAll();
        }
        finally {
            this.giant.unlock();
        }
    }

    public boolean isActive() {
        return this.active;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void manage() throws InterruptedException {
        log.info("Beginning management in %s.", new Object[]{this.config.getStartDelay()});
        Thread.sleep(this.config.getStartDelay().getMillis());
        this.taskRunner.restore();
        while (this.active) {
            this.giant.lock();
            try {
                HashMap<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<String, ListenableFuture<TaskStatus>>();
                for (TaskRunnerWorkItem taskRunnerWorkItem : this.taskRunner.getKnownTasks()) {
                    runnerTaskFutures.put(taskRunnerWorkItem.getTaskId(), taskRunnerWorkItem.getResult());
                }
                for (Task task : ImmutableList.copyOf(this.tasks)) {
                    ListenableFuture<TaskStatus> runnerTaskFuture;
                    if (this.taskFutures.containsKey(task.getId())) continue;
                    if (runnerTaskFutures.containsKey(task.getId())) {
                        runnerTaskFuture = (ListenableFuture<TaskStatus>)runnerTaskFutures.get(task.getId());
                    } else {
                        boolean taskIsReady;
                        try {
                            taskIsReady = task.isReady(this.taskActionClientFactory.create(task));
                        }
                        catch (Exception e) {
                            log.warn((Throwable)e, "Exception thrown during isReady for task: %s", new Object[]{task.getId()});
                            this.notifyStatus(task, TaskStatus.failure((String)task.getId()), "failed because of exception[%s]", e.getClass());
                            continue;
                        }
                        if (!taskIsReady) continue;
                        log.info("Asking taskRunner to run: %s", new Object[]{task.getId()});
                        runnerTaskFuture = this.taskRunner.run(task);
                    }
                    this.taskFutures.put(task.getId(), this.attachCallbacks(task, runnerTaskFuture));
                }
                Sets.SetView tasksToKill = Sets.difference(runnerTaskFutures.keySet(), (Set)ImmutableSet.copyOf((Collection)Lists.transform(this.tasks, (Function)new Function<Task, Object>(){

                    public String apply(Task task) {
                        return task.getId();
                    }
                })));
                if (!tasksToKill.isEmpty()) {
                    log.info("Asking taskRunner to clean up %,d tasks.", new Object[]{tasksToKill.size()});
                    for (String taskId : tasksToKill) {
                        try {
                            this.taskRunner.shutdown(taskId, "task is not in runnerTaskFutures[%s]", runnerTaskFutures.keySet());
                        }
                        catch (Exception e) {
                            log.warn((Throwable)e, "TaskRunner failed to clean up task: %s", new Object[]{taskId});
                        }
                    }
                }
                this.managementMayBeNecessary.awaitNanos(this.MANAGEMENT_WAIT_TIMEOUT_NANOS);
            }
            finally {
                this.giant.unlock();
            }
        }
    }

    public boolean add(Task task) throws EntryExistsException {
        if (this.taskStorage.getTask(task.getId()).isPresent()) {
            throw new EntryExistsException(StringUtils.format((String)"Task %s is already exists", (Object[])new Object[]{task.getId()}));
        }
        this.giant.lock();
        try {
            Preconditions.checkState((boolean)this.active, (Object)"Queue is not active!");
            Preconditions.checkNotNull((Object)task, (Object)"task");
            Preconditions.checkState((this.tasks.size() < this.config.getMaxSize() ? 1 : 0) != 0, (String)"Too many tasks (max = %,d)", (Object[])new Object[]{this.config.getMaxSize()});
            this.taskStorage.insert(task, TaskStatus.running((String)task.getId()));
            this.addTaskInternal(task);
            this.managementMayBeNecessary.signalAll();
            boolean bl = true;
            return bl;
        }
        finally {
            this.giant.unlock();
        }
    }

    private void addTaskInternal(Task task) {
        this.tasks.add(task);
        this.taskLockbox.add(task);
    }

    private void removeTaskInternal(Task task) {
        this.taskLockbox.remove(task);
        this.tasks.remove(task);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown(String taskId, String reasonFormat, Object ... args) {
        this.giant.lock();
        try {
            Preconditions.checkNotNull((Object)taskId, (Object)"taskId");
            for (Task task : this.tasks) {
                if (!task.getId().equals(taskId)) continue;
                this.notifyStatus(task, TaskStatus.failure((String)taskId), reasonFormat, args);
                break;
            }
        }
        finally {
            this.giant.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyStatus(Task task, TaskStatus taskStatus, String reasonFormat, Object ... args) {
        block12: {
            this.giant.lock();
            try {
                Preconditions.checkNotNull((Object)task, (Object)"task");
                Preconditions.checkNotNull((Object)taskStatus, (Object)"status");
                Preconditions.checkState((boolean)this.active, (Object)"Queue is not active!");
                Preconditions.checkArgument((boolean)task.getId().equals(taskStatus.getId()), (String)"Mismatching task ids[%s/%s]", (Object[])new Object[]{task.getId(), taskStatus.getId()});
                try {
                    this.taskRunner.shutdown(task.getId(), reasonFormat, args);
                }
                catch (Exception e) {
                    log.warn((Throwable)e, "TaskRunner failed to cleanup task after completion: %s", new Object[]{task.getId()});
                }
                int removed = 0;
                for (int i = this.tasks.size() - 1; i >= 0; --i) {
                    if (!this.tasks.get(i).getId().equals(task.getId())) continue;
                    ++removed;
                    this.removeTaskInternal(this.tasks.get(i));
                    break;
                }
                if (removed == 0) {
                    log.warn("Unknown task completed: %s", new Object[]{task.getId()});
                } else if (removed > 1) {
                    log.makeAlert("Removed multiple copies of task", new Object[0]).addData("count", (Object)removed).addData("task", (Object)task.getId()).emit();
                }
                this.taskFutures.remove(task.getId());
                if (removed <= 0) break block12;
                try {
                    Optional<TaskStatus> previousStatus = this.taskStorage.getStatus(task.getId());
                    if (!previousStatus.isPresent() || !((TaskStatus)previousStatus.get()).isRunnable()) {
                        log.makeAlert("Ignoring notification for already-complete task", new Object[0]).addData("task", (Object)task.getId()).emit();
                        break block12;
                    }
                    this.taskStorage.setStatus(taskStatus);
                    log.info("Task done: %s", new Object[]{task});
                    this.managementMayBeNecessary.signalAll();
                }
                catch (Exception e) {
                    log.makeAlert((Throwable)e, "Failed to persist status for task", new Object[0]).addData("task", (Object)task.getId()).addData("statusCode", (Object)taskStatus.getStatusCode()).emit();
                }
            }
            finally {
                this.giant.unlock();
            }
        }
    }

    private ListenableFuture<TaskStatus> attachCallbacks(final Task task, ListenableFuture<TaskStatus> statusFuture) {
        final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
        IndexTaskUtils.setTaskDimensions(metricBuilder, task);
        Futures.addCallback(statusFuture, (FutureCallback)new FutureCallback<TaskStatus>(){

            public void onSuccess(TaskStatus status) {
                log.info("Received %s status for task: %s", new Object[]{status.getStatusCode(), status.getId()});
                this.handleStatus(status);
            }

            public void onFailure(Throwable t) {
                log.makeAlert(t, "Failed to run task", new Object[0]).addData("task", (Object)task.getId()).addData("type", (Object)task.getType()).addData("dataSource", (Object)task.getDataSource()).emit();
                this.handleStatus(TaskStatus.failure((String)task.getId()));
            }

            private void handleStatus(TaskStatus status) {
                try {
                    if (!TaskQueue.this.active) {
                        log.info("Abandoning task due to shutdown: %s", new Object[]{task.getId()});
                        return;
                    }
                    TaskQueue.this.notifyStatus(task, status, "notified status change from task", new Object[0]);
                    if (status.isComplete()) {
                        IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
                        TaskQueue.this.emitter.emit(metricBuilder.build("task/run/time", (Number)status.getDuration()));
                        log.info("Task %s: %s (%d run duration)", new Object[]{status.getStatusCode(), task, status.getDuration()});
                        if (status.isSuccess()) {
                            Counters.incrementAndGetLong(TaskQueue.this.totalSuccessfulTaskCount, task.getDataSource());
                        } else {
                            Counters.incrementAndGetLong(TaskQueue.this.totalFailedTaskCount, task.getDataSource());
                        }
                    }
                }
                catch (Exception e) {
                    log.makeAlert((Throwable)e, "Failed to handle task status", new Object[0]).addData("task", (Object)task.getId()).addData("statusCode", (Object)status.getStatusCode()).emit();
                }
            }
        });
        return statusFuture;
    }

    private void syncFromStorage() {
        this.giant.lock();
        try {
            if (this.active) {
                Map<String, Task> newTasks = TaskQueue.toTaskIDMap(this.taskStorage.getActiveTasks());
                int tasksSynced = newTasks.size();
                Map<String, Task> oldTasks = TaskQueue.toTaskIDMap(this.tasks);
                HashSet commonIds = Sets.newHashSet((Iterable)Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
                for (String taskID : commonIds) {
                    newTasks.remove(taskID);
                    oldTasks.remove(taskID);
                }
                Collection<Task> addedTasks = newTasks.values();
                Collection<Task> removedTasks = oldTasks.values();
                for (Task task : removedTasks) {
                    this.removeTaskInternal(task);
                }
                for (Task task : addedTasks) {
                    this.addTaskInternal(task);
                }
                log.info("Synced %d tasks from storage (%d tasks added, %d tasks removed).", new Object[]{tasksSynced, addedTasks.size(), removedTasks.size()});
                this.managementMayBeNecessary.signalAll();
            } else {
                log.info("Not active. Skipping storage sync.", new Object[0]);
            }
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Failed to sync tasks from storage!", new Object[0]);
            throw Throwables.propagate((Throwable)e);
        }
        finally {
            this.giant.unlock();
        }
    }

    private static Map<String, Task> toTaskIDMap(List<Task> taskList) {
        HashMap<String, Task> rv = new HashMap<String, Task>();
        for (Task task : taskList) {
            rv.put(task.getId(), task);
        }
        return rv;
    }

    private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev) {
        return total.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> (Long)e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
    }

    public Map<String, Long> getSuccessfulTaskCount() {
        Map<String, Long> total = this.totalSuccessfulTaskCount.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((AtomicLong)e.getValue()).get()));
        Map<String, Long> delta = this.getDeltaValues(total, this.prevTotalSuccessfulTaskCount);
        this.prevTotalSuccessfulTaskCount = total;
        return delta;
    }

    public Map<String, Long> getFailedTaskCount() {
        Map<String, Long> total = this.totalFailedTaskCount.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((AtomicLong)e.getValue()).get()));
        Map<String, Long> delta = this.getDeltaValues(total, this.prevTotalFailedTaskCount);
        this.prevTotalFailedTaskCount = total;
        return delta;
    }

    public Map<String, Long> getRunningTaskCount() {
        Map<String, String> taskDatasources = this.tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
        return this.taskRunner.getRunningTasks().stream().collect(Collectors.toMap(e -> taskDatasources.getOrDefault(e.getTaskId(), ""), e -> 1L, Long::sum));
    }

    public Map<String, Long> getPendingTaskCount() {
        Map<String, String> taskDatasources = this.tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
        return this.taskRunner.getPendingTasks().stream().collect(Collectors.toMap(e -> taskDatasources.getOrDefault(e.getTaskId(), ""), e -> 1L, Long::sum));
    }

    public Map<String, Long> getWaitingTaskCount() {
        Set runnerKnownTaskIds = this.taskRunner.getKnownTasks().stream().map(TaskRunnerWorkItem::getTaskId).collect(Collectors.toSet());
        return this.tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())).collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum));
    }
}

