package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.class */
public class TaskMonitor<T extends Task, SubTaskReportType extends SubTaskReport> {
    private static final Logger log = new Logger(TaskMonitor.class);
    private final IndexingServiceClient indexingServiceClient;
    private final int maxRetry;
    private final int estimatedNumSucceededTasks;

    @GuardedBy("taskCountLock")
    private int numRunningTasks;

    @GuardedBy("taskCountLock")
    private int numSucceededTasks;

    @GuardedBy("taskCountLock")
    private int numFailedTasks;
    private int numCanceledTasks;
    private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded("task-monitor-%d");
    private final ConcurrentMap<String, TaskMonitor<T, SubTaskReportType>.MonitorEntry> runningTasks = new ConcurrentHashMap();
    private final ConcurrentMap<String, TaskHistory<T>> taskHistories = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, SubTaskReportType> reportsMap = new ConcurrentHashMap<>();
    private final Object taskCountLock = new Object();
    private final Object startStopLock = new Object();

    @GuardedBy("startStopLock")
    private boolean running = false;

    /* renamed from: org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$druid$indexer$TaskState = new int[TaskState.values().length];

        static {
            try {
                $SwitchMap$org$apache$druid$indexer$TaskState[TaskState.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$druid$indexer$TaskState[TaskState.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$druid$indexer$TaskState[TaskState.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor$MonitorEntry.class */
    public class MonitorEntry {
        private final SubTaskSpec<T> spec;
        private final T runningTask;
        private final CopyOnWriteArrayList<TaskStatusPlus> taskHistory;
        private final SettableFuture<SubTaskCompleteEvent<T>> completeEventFuture;

        @Nullable
        private volatile TaskStatusPlus runningStatus;

        private MonitorEntry(TaskMonitor taskMonitor, SubTaskSpec<T> subTaskSpec, @Nullable T t, TaskStatusPlus taskStatusPlus, SettableFuture<SubTaskCompleteEvent<T>> settableFuture) {
            this(subTaskSpec, t, taskStatusPlus, (CopyOnWriteArrayList<TaskStatusPlus>) new CopyOnWriteArrayList(), settableFuture);
        }

        private MonitorEntry(SubTaskSpec<T> subTaskSpec, T t, @Nullable TaskStatusPlus taskStatusPlus, CopyOnWriteArrayList<TaskStatusPlus> copyOnWriteArrayList, SettableFuture<SubTaskCompleteEvent<T>> settableFuture) {
            this.spec = subTaskSpec;
            this.runningTask = t;
            this.runningStatus = taskStatusPlus;
            this.taskHistory = copyOnWriteArrayList;
            this.completeEventFuture = settableFuture;
        }

        TaskMonitor<T, SubTaskReportType>.MonitorEntry withNewRunningTask(T t, @Nullable TaskStatusPlus taskStatusPlus, TaskStatusPlus taskStatusPlus2) {
            this.taskHistory.add(taskStatusPlus2);
            return new MonitorEntry(this.spec, t, taskStatusPlus, this.taskHistory, this.completeEventFuture);
        }

        int numTries() {
            return this.taskHistory.size() + 1;
        }

        void updateStatus(TaskStatusPlus taskStatusPlus) {
            if (!this.runningTask.getId().equals(taskStatusPlus.getId())) {
                throw new ISE("Task id[%s] of lastStatus is different from the running task[%s]", new Object[]{taskStatusPlus.getId(), this.runningTask.getId()});
            }
            this.runningStatus = taskStatusPlus;
        }

        void setLastStatus(TaskStatusPlus taskStatusPlus) {
            if (!this.runningTask.getId().equals(taskStatusPlus.getId())) {
                throw new ISE("Task id[%s] of lastStatus is different from the running task[%s]", new Object[]{taskStatusPlus.getId(), this.runningTask.getId()});
            }
            this.runningStatus = taskStatusPlus;
            this.taskHistory.add(taskStatusPlus);
            TaskMonitor.this.taskHistories.put(this.spec.getId(), new TaskHistory(this.spec, this.taskHistory));
            this.completeEventFuture.set(SubTaskCompleteEvent.success(this.spec, taskStatusPlus));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SubTaskSpec<T> getSpec() {
            return this.spec;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public TaskStatusPlus getRunningStatus() {
            return this.runningStatus;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public List<TaskStatusPlus> getTaskHistory() {
            return this.taskHistory;
        }

        /* synthetic */ MonitorEntry(TaskMonitor taskMonitor, SubTaskSpec subTaskSpec, Task task, TaskStatusPlus taskStatusPlus, SettableFuture settableFuture, AnonymousClass1 anonymousClass1) {
            this(taskMonitor, subTaskSpec, task, taskStatusPlus, settableFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor$SubTaskCompleteEvent.class */
    public static class SubTaskCompleteEvent<T extends Task> {
        private final SubTaskSpec<T> spec;

        @Nullable
        private final TaskStatusPlus lastStatus;

        @Nullable
        private final Throwable throwable;

        static <T extends Task> SubTaskCompleteEvent<T> success(SubTaskSpec<T> subTaskSpec, TaskStatusPlus taskStatusPlus) {
            return new SubTaskCompleteEvent<>(subTaskSpec, (TaskStatusPlus) Preconditions.checkNotNull(taskStatusPlus, "lastStatus"), null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static <T extends Task> SubTaskCompleteEvent<T> fail(SubTaskSpec<T> subTaskSpec, Throwable th) {
            return new SubTaskCompleteEvent<>(subTaskSpec, null, th);
        }

        private SubTaskCompleteEvent(SubTaskSpec<T> subTaskSpec, @Nullable TaskStatusPlus taskStatusPlus, @Nullable Throwable th) {
            this.spec = (SubTaskSpec) Preconditions.checkNotNull(subTaskSpec, "spec");
            this.lastStatus = taskStatusPlus;
            this.throwable = th;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SubTaskSpec<T> getSpec() {
            return this.spec;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public TaskState getLastState() {
            return this.lastStatus == null ? TaskState.FAILED : this.lastStatus.getStatusCode();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public TaskStatusPlus getLastStatus() {
            return this.lastStatus;
        }

        @Nullable
        Throwable getThrowable() {
            return this.throwable;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskMonitor(IndexingServiceClient indexingServiceClient, int i, int i2) {
        this.indexingServiceClient = (IndexingServiceClient) Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
        this.maxRetry = i;
        this.estimatedNumSucceededTasks = i2;
        log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", new Object[]{Integer.valueOf(i2)});
    }

    public void start(long j) {
        synchronized (this.startStopLock) {
            this.running = true;
            log.info("Starting taskMonitor", new Object[0]);
            this.taskStatusChecker.scheduleAtFixedRate(() -> {
                try {
                    Iterator<Map.Entry<String, TaskMonitor<T, SubTaskReportType>.MonitorEntry>> it = this.runningTasks.entrySet().iterator();
                    while (it.hasNext()) {
                        Map.Entry<String, TaskMonitor<T, SubTaskReportType>.MonitorEntry> next = it.next();
                        String key = next.getKey();
                        TaskMonitor<T, SubTaskReportType>.MonitorEntry value = next.getValue();
                        String id = ((MonitorEntry) value).runningTask.getId();
                        TaskStatusPlus status = this.indexingServiceClient.getTaskStatus(id).getStatus();
                        if (status != null) {
                            switch (AnonymousClass1.$SwitchMap$org$apache$druid$indexer$TaskState[((TaskState) Preconditions.checkNotNull(status.getStatusCode(), "taskState")).ordinal()]) {
                                case 1:
                                    if (!this.reportsMap.containsKey(id)) {
                                        throw new ISE("Missing reports from task[%s]!", new Object[]{id});
                                    }
                                    incrementNumSucceededTasks();
                                    value.setLastStatus(status);
                                    it.remove();
                                    break;
                                case 2:
                                    this.reportsMap.remove(id);
                                    incrementNumFailedTasks();
                                    log.warn("task[%s] failed!", new Object[]{id});
                                    if (value.numTries() >= this.maxRetry) {
                                        log.error("spec[%s] failed after [%d] tries", new Object[]{((MonitorEntry) value).spec.getId(), Integer.valueOf(value.numTries())});
                                        value.setLastStatus(status);
                                        it.remove();
                                        break;
                                    } else {
                                        log.info("We still have more chances[%d/%d] to process the spec[%s].", new Object[]{Integer.valueOf(value.numTries()), Integer.valueOf(this.maxRetry), ((MonitorEntry) value).spec.getId()});
                                        retry(key, value, status);
                                        break;
                                    }
                                case 3:
                                    value.updateStatus(status);
                                    break;
                                default:
                                    throw new ISE("Unknown taskStatus[%s] for task[%s[", new Object[]{status.getStatusCode(), id});
                            }
                        }
                    }
                } catch (Throwable th) {
                    log.error(th, "Error while monitoring", new Object[0]);
                }
            }, j, j, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() {
        synchronized (this.startStopLock) {
            if (this.running) {
                this.running = false;
                this.taskStatusChecker.shutdownNow();
                synchronized (this.taskCountLock) {
                    if (this.numRunningTasks > 0) {
                        Iterator<TaskMonitor<T, SubTaskReportType>.MonitorEntry> it = this.runningTasks.values().iterator();
                        while (it.hasNext()) {
                            TaskMonitor<T, SubTaskReportType>.MonitorEntry next = it.next();
                            it.remove();
                            String id = ((MonitorEntry) next).runningTask.getId();
                            log.info("Request to cancel subtask[%s]", new Object[]{id});
                            this.indexingServiceClient.cancelTask(id);
                            this.numRunningTasks--;
                            this.numCanceledTasks++;
                        }
                        if (this.numRunningTasks > 0) {
                            log.warn("Inconsistent state: numRunningTasks[%d] is still not zero after trying to cancel all running tasks.", new Object[]{Integer.valueOf(this.numRunningTasks)});
                        }
                    }
                }
                log.info("Stopped taskMonitor", new Object[0]);
            }
        }
    }

    public ListenableFuture<SubTaskCompleteEvent<T>> submit(SubTaskSpec<T> subTaskSpec) {
        synchronized (this.startStopLock) {
            if (!this.running) {
                return Futures.immediateFailedFuture(new ISE("TaskMonitor is not running", new Object[0]));
            }
            T submitTask = submitTask(subTaskSpec, 0);
            log.info("Submitted a new task[%s] for spec[%s]", new Object[]{submitTask.getId(), subTaskSpec.getId()});
            incrementNumRunningTasks();
            SettableFuture create = SettableFuture.create();
            this.runningTasks.put(subTaskSpec.getId(), new MonitorEntry(this, subTaskSpec, submitTask, this.indexingServiceClient.getTaskStatus(submitTask.getId()).getStatus(), create, (AnonymousClass1) null));
            return create;
        }
    }

    public void collectReport(SubTaskReportType subtaskreporttype) {
        this.reportsMap.compute(subtaskreporttype.getTaskId(), (str, subTaskReport) -> {
            if (subTaskReport != null) {
                log.warn("Received duplicate report for task [%s]", new Object[]{str});
                Preconditions.checkState(subTaskReport.equals(subtaskreporttype), "task[%s] sent two or more reports and previous report[%s] is different from the current one[%s]", new Object[]{str, subTaskReport, subtaskreporttype});
            }
            return subtaskreporttype;
        });
    }

    public Map<String, SubTaskReportType> getReports() {
        return this.reportsMap;
    }

    private void retry(String str, TaskMonitor<T, SubTaskReportType>.MonitorEntry monitorEntry, TaskStatusPlus taskStatusPlus) {
        synchronized (this.startStopLock) {
            if (this.running) {
                SubTaskSpec<T> subTaskSpec = ((MonitorEntry) monitorEntry).spec;
                T submitTask = submitTask(subTaskSpec, ((MonitorEntry) monitorEntry).taskHistory.size() + 1);
                log.info("Submitted a new task[%s] for retrying spec[%s]", new Object[]{submitTask.getId(), subTaskSpec.getId()});
                incrementNumRunningTasks();
                this.runningTasks.put(str, monitorEntry.withNewRunningTask(submitTask, this.indexingServiceClient.getTaskStatus(submitTask.getId()).getStatus(), taskStatusPlus));
            }
        }
    }

    private T submitTask(SubTaskSpec<T> subTaskSpec, int i) {
        T newSubTask = subTaskSpec.newSubTask(i);
        try {
            this.indexingServiceClient.runTask(newSubTask.getId(), newSubTask);
        } catch (Exception e) {
            if (!isUnknownTypeIdException(e)) {
                throw e;
            }
            log.warn(e, "Got an unknown type id error. Retrying with a backward compatible type.", new Object[0]);
            newSubTask = subTaskSpec.newSubTaskWithBackwardCompatibleType(i);
            this.indexingServiceClient.runTask(newSubTask.getId(), newSubTask);
        }
        return newSubTask;
    }

    private boolean isUnknownTypeIdException(Throwable th) {
        if ((th instanceof IllegalStateException) && th.getMessage() != null && th.getMessage().contains("Could not resolve type id")) {
            return true;
        }
        if (th.getCause() != null) {
            return isUnknownTypeIdException(th.getCause());
        }
        return false;
    }

    private void incrementNumRunningTasks() {
        synchronized (this.taskCountLock) {
            this.numRunningTasks++;
        }
    }

    private void incrementNumSucceededTasks() {
        synchronized (this.taskCountLock) {
            this.numRunningTasks--;
            this.numSucceededTasks++;
            log.info("[%d/%d] tasks succeeded", new Object[]{Integer.valueOf(this.numSucceededTasks), Integer.valueOf(this.estimatedNumSucceededTasks)});
        }
    }

    private void incrementNumFailedTasks() {
        synchronized (this.taskCountLock) {
            this.numRunningTasks--;
            this.numFailedTasks++;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumSucceededTasks() {
        int i;
        synchronized (this.taskCountLock) {
            i = this.numSucceededTasks;
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumRunningTasks() {
        int i;
        synchronized (this.taskCountLock) {
            i = this.numRunningTasks;
        }
        return i;
    }

    @VisibleForTesting
    int getNumCanceledTasks() {
        return this.numCanceledTasks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ParallelIndexingPhaseProgress getProgress() {
        ParallelIndexingPhaseProgress parallelIndexingPhaseProgress;
        synchronized (this.taskCountLock) {
            parallelIndexingPhaseProgress = new ParallelIndexingPhaseProgress(this.numRunningTasks, this.numSucceededTasks, this.numFailedTasks, this.numSucceededTasks + this.numFailedTasks, this.numRunningTasks + this.numSucceededTasks + this.numFailedTasks, this.estimatedNumSucceededTasks);
        }
        return parallelIndexingPhaseProgress;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<String> getRunningTaskIds() {
        return (Set) this.runningTasks.values().stream().map(monitorEntry -> {
            return monitorEntry.runningTask.getId();
        }).collect(Collectors.toSet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SubTaskSpec<T>> getRunningSubTaskSpecs() {
        return (List) this.runningTasks.values().stream().map(monitorEntry -> {
            return monitorEntry.spec;
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public TaskMonitor<T, SubTaskReportType>.MonitorEntry getRunningTaskMonitorEntry(String str) {
        return this.runningTasks.values().stream().filter(monitorEntry -> {
            return monitorEntry.spec.getId().equals(str);
        }).findFirst().orElse(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SubTaskSpec<T>> getCompleteSubTaskSpecs() {
        return (List) this.taskHistories.values().stream().map((v0) -> {
            return v0.getSpec();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public TaskHistory<T> getCompleteSubTaskSpecHistory(String str) {
        return this.taskHistories.get(str);
    }
}
