/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexingProgress;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskHistory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;

public class TaskMonitor<T extends Task> {
    private static final Logger log = new Logger(TaskMonitor.class);
    private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded((String)"task-monitor-%d");
    private final ConcurrentMap<String, MonitorEntry> runningTasks = new ConcurrentHashMap<String, MonitorEntry>();
    private final ConcurrentMap<String, TaskHistory<T>> taskHistories = new ConcurrentHashMap<String, TaskHistory<T>>();
    private final Object taskCountLock = new Object();
    private final Object startStopLock = new Object();
    private final IndexingServiceClient indexingServiceClient;
    private final int maxRetry;
    private final int expectedNumSucceededTasks;
    private int numRunningTasks;
    private int numSucceededTasks;
    private int numFailedTasks;
    private boolean running = false;

    TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) {
        this.indexingServiceClient = (IndexingServiceClient)Preconditions.checkNotNull((Object)indexingServiceClient, (Object)"indexingServiceClient");
        this.maxRetry = maxRetry;
        this.expectedNumSucceededTasks = expectedNumSucceededTasks;
        log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", new Object[]{expectedNumSucceededTasks});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(long taskStatusCheckingPeriod) {
        Object object = this.startStopLock;
        synchronized (object) {
            this.running = true;
            log.info("Starting taskMonitor", new Object[0]);
            this.taskStatusChecker.scheduleAtFixedRate(() -> {
                try {
                    Iterator iterator = this.runningTasks.entrySet().iterator();
                    block7: while (iterator.hasNext()) {
                        Map.Entry entry = iterator.next();
                        String specId = (String)entry.getKey();
                        MonitorEntry monitorEntry = (MonitorEntry)entry.getValue();
                        String taskId = monitorEntry.runningTask.getId();
                        TaskStatusResponse taskStatusResponse = this.indexingServiceClient.getTaskStatus(taskId);
                        TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
                        if (taskStatus == null) continue;
                        switch ((TaskState)Preconditions.checkNotNull((Object)taskStatus.getStatusCode(), (Object)"taskState")) {
                            case SUCCESS: {
                                this.incrementNumSucceededTasks();
                                monitorEntry.setLastStatus(taskStatus);
                                iterator.remove();
                                continue block7;
                            }
                            case FAILED: {
                                this.incrementNumFailedTasks();
                                log.warn("task[%s] failed!", new Object[]{taskId});
                                if (monitorEntry.numTries() < this.maxRetry) {
                                    log.info("We still have chances[%d/%d] to complete for spec[%s].", new Object[]{monitorEntry.numTries(), this.maxRetry, monitorEntry.spec.getId()});
                                    this.retry(specId, monitorEntry, taskStatus);
                                    continue block7;
                                }
                                log.error("spec[%s] failed after [%d] tries", new Object[]{monitorEntry.spec.getId(), monitorEntry.numTries()});
                                monitorEntry.setLastStatus(taskStatus);
                                iterator.remove();
                                continue block7;
                            }
                            case RUNNING: {
                                monitorEntry.updateStatus(taskStatus);
                                continue block7;
                            }
                        }
                        throw new ISE("Unknown taskStatus[%s] for task[%s[", new Object[]{taskStatus.getStatusCode(), taskId});
                    }
                }
                catch (Throwable t) {
                    log.error(t, "Error while monitoring", new Object[0]);
                    throw t;
                }
            }, taskStatusCheckingPeriod, taskStatusCheckingPeriod, TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        Object object = this.startStopLock;
        synchronized (object) {
            this.running = false;
            this.taskStatusChecker.shutdownNow();
            log.info("Stopped taskMonitor", new Object[0]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<SubTaskCompleteEvent<T>> submit(SubTaskSpec<T> spec) {
        Object object = this.startStopLock;
        synchronized (object) {
            if (!this.running) {
                return Futures.immediateFailedFuture((Throwable)new ISE("TaskMonitore is not running", new Object[0]));
            }
            T task = spec.newSubTask(0);
            log.info("Submitting a new task[%s] for spec[%s]", new Object[]{task.getId(), spec.getId()});
            this.indexingServiceClient.runTask(task);
            this.incrementNumRunningTasks();
            SettableFuture taskFuture = SettableFuture.create();
            this.runningTasks.put(spec.getId(), new MonitorEntry(this, spec, task, this.indexingServiceClient.getTaskStatus(task.getId()).getStatus(), taskFuture));
            return taskFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void retry(String subTaskSpecId, MonitorEntry monitorEntry, TaskStatusPlus lastFailedTaskStatus) {
        Object object = this.startStopLock;
        synchronized (object) {
            if (this.running) {
                SubTaskSpec spec = monitorEntry.spec;
                Object task = spec.newSubTask(monitorEntry.taskHistory.size() + 1);
                log.info("Submitting a new task[%s] for retrying spec[%s]", new Object[]{task.getId(), spec.getId()});
                this.indexingServiceClient.runTask(task);
                this.incrementNumRunningTasks();
                this.runningTasks.put(subTaskSpecId, monitorEntry.withNewRunningTask(task, this.indexingServiceClient.getTaskStatus(task.getId()).getStatus(), lastFailedTaskStatus));
            }
        }
    }

    void killAll() {
        this.runningTasks.values().forEach(entry -> {
            String taskId = ((MonitorEntry)entry).runningTask.getId();
            log.info("Request to kill subtask[%s]", new Object[]{taskId});
            this.indexingServiceClient.killTask(taskId);
        });
        this.runningTasks.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void incrementNumRunningTasks() {
        Object object = this.taskCountLock;
        synchronized (object) {
            ++this.numRunningTasks;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void incrementNumSucceededTasks() {
        Object object = this.taskCountLock;
        synchronized (object) {
            --this.numRunningTasks;
            ++this.numSucceededTasks;
            log.info("[%d/%d] tasks succeeded", new Object[]{this.numSucceededTasks, this.expectedNumSucceededTasks});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void incrementNumFailedTasks() {
        Object object = this.taskCountLock;
        synchronized (object) {
            --this.numRunningTasks;
            ++this.numFailedTasks;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean isSucceeded() {
        Object object = this.taskCountLock;
        synchronized (object) {
            return this.numSucceededTasks == this.expectedNumSucceededTasks;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int getNumRunningTasks() {
        Object object = this.taskCountLock;
        synchronized (object) {
            return this.numRunningTasks;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SinglePhaseParallelIndexingProgress getProgress() {
        Object object = this.taskCountLock;
        synchronized (object) {
            return new SinglePhaseParallelIndexingProgress(this.numRunningTasks, this.numSucceededTasks, this.numFailedTasks, this.numSucceededTasks + this.numFailedTasks, this.numRunningTasks + this.numSucceededTasks + this.numFailedTasks, this.expectedNumSucceededTasks);
        }
    }

    Set<String> getRunningTaskIds() {
        return this.runningTasks.values().stream().map(entry -> ((MonitorEntry)entry).runningTask.getId()).collect(Collectors.toSet());
    }

    List<SubTaskSpec<T>> getRunningSubTaskSpecs() {
        return this.runningTasks.values().stream().map(monitorEntry -> ((MonitorEntry)monitorEntry).spec).collect(Collectors.toList());
    }

    @Nullable
    MonitorEntry getRunningTaskMonitorEntry(String subTaskSpecId) {
        return this.runningTasks.values().stream().filter(monitorEntry -> ((MonitorEntry)monitorEntry).spec.getId().equals(subTaskSpecId)).findFirst().orElse(null);
    }

    List<SubTaskSpec<T>> getCompleteSubTaskSpecs() {
        return this.taskHistories.values().stream().map(TaskHistory::getSpec).collect(Collectors.toList());
    }

    @Nullable
    TaskHistory<T> getCompleteSubTaskSpecHistory(String subTaskSpecId) {
        return (TaskHistory)this.taskHistories.get(subTaskSpecId);
    }

    static class SubTaskCompleteEvent<T extends Task> {
        private final SubTaskSpec<T> spec;
        @Nullable
        private final TaskStatusPlus lastStatus;
        @Nullable
        private final Throwable throwable;

        static <T extends Task> SubTaskCompleteEvent<T> success(SubTaskSpec<T> spec, TaskStatusPlus lastStatus) {
            return new SubTaskCompleteEvent<T>(spec, (TaskStatusPlus)Preconditions.checkNotNull((Object)lastStatus, (Object)"lastStatus"), null);
        }

        static <T extends Task> SubTaskCompleteEvent<T> fail(SubTaskSpec<T> spec, Throwable t) {
            return new SubTaskCompleteEvent<T>(spec, null, t);
        }

        private SubTaskCompleteEvent(SubTaskSpec<T> spec, @Nullable TaskStatusPlus lastStatus, @Nullable Throwable throwable) {
            this.spec = (SubTaskSpec)Preconditions.checkNotNull(spec, (Object)"spec");
            this.lastStatus = lastStatus;
            this.throwable = throwable;
        }

        SubTaskSpec<T> getSpec() {
            return this.spec;
        }

        TaskState getLastState() {
            return this.lastStatus == null ? TaskState.FAILED : this.lastStatus.getStatusCode();
        }

        @Nullable
        TaskStatusPlus getLastStatus() {
            return this.lastStatus;
        }

        @Nullable
        Throwable getThrowable() {
            return this.throwable;
        }
    }

    static class MonitorEntry {
        private final SubTaskSpec<T> spec;
        private final T runningTask;
        private final CopyOnWriteArrayList<TaskStatusPlus> taskHistory;
        private final SettableFuture<SubTaskCompleteEvent<T>> completeEventFuture;
        @Nullable
        private volatile TaskStatusPlus runningStatus;
        final /* synthetic */ TaskMonitor this$0;

        MonitorEntry(SubTaskSpec<T> spec, @Nullable T runningTask, TaskStatusPlus runningStatus, SettableFuture<SubTaskCompleteEvent<T>> completeEventFuture) {
            this(this$0, spec, (Task)runningTask, runningStatus, new CopyOnWriteArrayList(), completeEventFuture);
        }

        private MonitorEntry(SubTaskSpec<T> spec, @Nullable T runningTask, TaskStatusPlus runningStatus, CopyOnWriteArrayList<TaskStatusPlus> taskHistory, SettableFuture<SubTaskCompleteEvent<T>> completeEventFuture) {
            this.this$0 = this$0;
            this.spec = spec;
            this.runningTask = runningTask;
            this.runningStatus = runningStatus;
            this.taskHistory = taskHistory;
            this.completeEventFuture = completeEventFuture;
        }

        MonitorEntry withNewRunningTask(T newTask, @Nullable TaskStatusPlus newStatus, TaskStatusPlus statusOfLastTask) {
            this.taskHistory.add(statusOfLastTask);
            return new MonitorEntry(this.this$0, this.spec, newTask, newStatus, this.taskHistory, this.completeEventFuture);
        }

        int numTries() {
            return this.taskHistory.size() + 1;
        }

        void updateStatus(TaskStatusPlus statusPlus) {
            if (!this.runningTask.getId().equals(statusPlus.getId())) {
                throw new ISE("Task id[%s] of lastStatus is different from the running task[%s]", new Object[]{statusPlus.getId(), this.runningTask.getId()});
            }
            this.runningStatus = statusPlus;
        }

        void setLastStatus(TaskStatusPlus lastStatus) {
            if (!this.runningTask.getId().equals(lastStatus.getId())) {
                throw new ISE("Task id[%s] of lastStatus is different from the running task[%s]", new Object[]{lastStatus.getId(), this.runningTask.getId()});
            }
            this.runningStatus = lastStatus;
            this.taskHistory.add(lastStatus);
            this.this$0.taskHistories.put(this.spec.getId(), new TaskHistory(this.spec, this.taskHistory));
            this.completeEventFuture.set(SubTaskCompleteEvent.success(this.spec, lastStatus));
        }

        SubTaskSpec<T> getSpec() {
            return this.spec;
        }

        @Nullable
        TaskStatusPlus getRunningStatus() {
            return this.runningStatus;
        }

        List<TaskStatusPlus> getTaskHistory() {
            return this.taskHistory;
        }
    }
}

