package org.apache.druid.indexing.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.worker.WorkerHistoryItem;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider;
import org.apache.druid.utils.CollectionUtils;

/* loaded from: input_file:org/apache/druid/indexing/worker/WorkerTaskManager.class */
public class WorkerTaskManager implements IndexerTaskCountStatsProvider {
    private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class);
    private final ObjectMapper jsonMapper;
    private final TaskRunner taskRunner;
    private final OverlordClient overlordClient;
    private final File storageDir;
    private final LifecycleLock lifecycleLock = new LifecycleLock();
    private final ConcurrentMap<String, Task> assignedTasks = new ConcurrentHashMap();
    protected final ConcurrentMap<String, TaskDetails> runningTasks = new ConcurrentHashMap();
    protected final ConcurrentMap<String, TaskAnnouncement> completedTasks = new ConcurrentHashMap();
    private final ChangeRequestHistory<WorkerHistoryItem> changeHistory = new ChangeRequestHistory<>();
    protected final Object lock = new Object();
    private final AtomicBoolean disabled = new AtomicBoolean(false);
    private final ExecutorService exec = Execs.singleThreaded("WorkerTaskManager-NoticeHandler");
    private final ScheduledExecutorService completedTasksCleanupExecutor = Execs.scheduledSingleThreaded("WorkerTaskManager-CompletedTasksCleaner");

    /* loaded from: input_file:org/apache/druid/indexing/worker/WorkerTaskManager$LocationNotice.class */
    private class LocationNotice implements Notice {
        private final String taskId;
        private final TaskLocation location;

        public LocationNotice(String str, TaskLocation taskLocation) {
            this.taskId = str;
            this.location = taskLocation;
        }

        @Override // org.apache.druid.indexing.worker.WorkerTaskManager.Notice
        public String getTaskId() {
            return this.taskId;
        }

        @Override // org.apache.druid.indexing.worker.WorkerTaskManager.Notice
        public void handle() {
            synchronized (WorkerTaskManager.this.lock) {
                TaskDetails taskDetails = WorkerTaskManager.this.runningTasks.get(this.taskId);
                if (taskDetails == null) {
                    WorkerTaskManager.log.warn("Got location notice for task [%s] that isn't running...", new Object[]{this.taskId});
                    return;
                }
                if (!Objects.equals(taskDetails.location, this.location)) {
                    taskDetails.location = this.location;
                    TaskAnnouncement create = TaskAnnouncement.create(taskDetails.task, taskDetails.status, taskDetails.location);
                    WorkerTaskManager.this.changeHistory.addChangeRequest(new WorkerHistoryItem.TaskUpdate(create));
                    WorkerTaskManager.this.taskAnnouncementChanged(create);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/worker/WorkerTaskManager$Notice.class */
    public interface Notice {
        String getTaskId();

        void handle();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/worker/WorkerTaskManager$RunNotice.class */
    public class RunNotice implements Notice {
        private final Task task;

        public RunNotice(Task task) {
            this.task = task;
        }

        @Override // org.apache.druid.indexing.worker.WorkerTaskManager.Notice
        public String getTaskId() {
            return this.task.getId();
        }

        @Override // org.apache.druid.indexing.worker.WorkerTaskManager.Notice
        public void handle() {
            synchronized (WorkerTaskManager.this.lock) {
                if (WorkerTaskManager.this.runningTasks.containsKey(this.task.getId()) || WorkerTaskManager.this.completedTasks.containsKey(this.task.getId())) {
                    WorkerTaskManager.log.warn("Got run notice for task [%s] that I am already running or completed...", new Object[]{this.task.getId()});
                    WorkerTaskManager.this.taskStarted(this.task.getId());
                    return;
                }
                WorkerTaskManager.this.addRunningTask(this.task, WorkerTaskManager.this.taskRunner.run(this.task));
                TaskAnnouncement create = TaskAnnouncement.create(this.task, TaskStatus.running(this.task.getId()), TaskLocation.unknown());
                WorkerTaskManager.this.changeHistory.addChangeRequest(new WorkerHistoryItem.TaskUpdate(create));
                WorkerTaskManager.this.cleanupAssignedTask(this.task);
                WorkerTaskManager.log.info("Task[%s] started.", new Object[]{this.task.getId()});
                WorkerTaskManager.this.taskAnnouncementChanged(create);
                WorkerTaskManager.this.taskStarted(this.task.getId());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/worker/WorkerTaskManager$StatusNotice.class */
    public class StatusNotice implements Notice {
        private final Task task;
        private final TaskStatus status;

        public StatusNotice(Task task, TaskStatus taskStatus) {
            this.task = task;
            this.status = taskStatus;
        }

        @Override // org.apache.druid.indexing.worker.WorkerTaskManager.Notice
        public String getTaskId() {
            return this.task.getId();
        }

        @Override // org.apache.druid.indexing.worker.WorkerTaskManager.Notice
        public void handle() {
            synchronized (WorkerTaskManager.this.lock) {
                TaskDetails taskDetails = WorkerTaskManager.this.runningTasks.get(this.task.getId());
                if (taskDetails == null) {
                    WorkerTaskManager.log.warn("Got status notice for task [%s] that isn't running...", new Object[]{this.task.getId()});
                    return;
                }
                if (!this.status.isComplete()) {
                    WorkerTaskManager.log.warn("Got status notice for task [%s] that isn't complete (status = [%s])...", new Object[]{this.task.getId(), this.status.getStatusCode()});
                    return;
                }
                taskDetails.status = this.status.withDuration(System.currentTimeMillis() - taskDetails.startTime);
                TaskAnnouncement create = TaskAnnouncement.create(taskDetails.task, taskDetails.status, taskDetails.location);
                WorkerTaskManager.this.moveFromRunningToCompleted(this.task.getId(), create);
                WorkerTaskManager.this.changeHistory.addChangeRequest(new WorkerHistoryItem.TaskUpdate(create));
                WorkerTaskManager.this.taskAnnouncementChanged(create);
                WorkerTaskManager.log.info("Task [%s] completed with status [%s].", new Object[]{this.task.getId(), this.status.getStatusCode()});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/worker/WorkerTaskManager$TaskDetails.class */
    public static class TaskDetails {
        private final Task task;
        private TaskStatus status;
        private final long startTime = System.currentTimeMillis();
        private TaskLocation location = TaskLocation.unknown();

        public TaskDetails(Task task) {
            this.task = task;
            this.status = TaskStatus.running(task.getId());
        }
    }

    @Inject
    public WorkerTaskManager(ObjectMapper objectMapper, TaskRunner taskRunner, TaskConfig taskConfig, OverlordClient overlordClient) {
        this.jsonMapper = objectMapper;
        this.taskRunner = taskRunner;
        this.overlordClient = overlordClient;
        this.storageDir = taskConfig.getBaseTaskDir();
    }

    @LifecycleStart
    public void start() throws Exception {
        if (!this.lifecycleLock.canStart()) {
            throw new ISE("can't start.", new Object[0]);
        }
        synchronized (this.lock) {
            try {
                try {
                    log.debug("Starting...", new Object[0]);
                    cleanupAndMakeTmpTaskDir();
                    registerLocationListener();
                    restoreRestorableTasks();
                    initAssignedTasks();
                    initCompletedTasks();
                    scheduleCompletedTasksCleanup();
                    this.lifecycleLock.started();
                    log.debug("Started.", new Object[0]);
                    this.lifecycleLock.exitStart();
                } catch (Exception e) {
                    log.makeAlert(e, "Exception starting WorkerTaskManager.", new Object[0]).emit();
                    throw e;
                }
            } catch (Throwable th) {
                this.lifecycleLock.exitStart();
                throw th;
            }
        }
    }

    @LifecycleStop
    public void stop() throws Exception {
        if (!this.lifecycleLock.canStop()) {
            throw new ISE("can't stop.", new Object[0]);
        }
        synchronized (this.lock) {
            try {
                this.taskRunner.unregisterListener("WorkerTaskManager");
                this.exec.shutdownNow();
                this.taskRunner.stop();
                log.debug("Stopped WorkerTaskManager.", new Object[0]);
            } catch (Exception e) {
                log.makeAlert(e, "Exception stopping WorkerTaskManager", new Object[0]).emit();
            }
        }
    }

    public Map<String, TaskAnnouncement> getCompletedTasks() {
        return this.completedTasks;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitNoticeToExec(Notice notice) {
        this.exec.execute(() -> {
            try {
                notice.handle();
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                log.makeAlert(e, "Failed to handle notice", new Object[0]).addData("noticeClass", notice.getClass().getSimpleName()).addData("noticeTaskId", notice.getTaskId()).emit();
            }
        });
    }

    private void restoreRestorableTasks() {
        for (Pair<Task, ListenableFuture<TaskStatus>> pair : this.taskRunner.restore()) {
            addRunningTask((Task) pair.lhs, (ListenableFuture) pair.rhs);
        }
    }

    private void registerLocationListener() {
        this.taskRunner.registerListener(new TaskRunnerListener() { // from class: org.apache.druid.indexing.worker.WorkerTaskManager.1
            @Override // org.apache.druid.indexing.overlord.TaskRunnerListener
            public String getListenerId() {
                return "WorkerTaskManager";
            }

            @Override // org.apache.druid.indexing.overlord.TaskRunnerListener
            public void locationChanged(String str, TaskLocation taskLocation) {
                WorkerTaskManager.this.submitNoticeToExec(new LocationNotice(str, taskLocation));
            }

            @Override // org.apache.druid.indexing.overlord.TaskRunnerListener
            public void statusChanged(String str, TaskStatus taskStatus) {
            }
        }, Execs.directExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addRunningTask(final Task task, ListenableFuture<TaskStatus> listenableFuture) {
        this.runningTasks.put(task.getId(), new TaskDetails(task));
        Futures.addCallback(listenableFuture, new FutureCallback<TaskStatus>() { // from class: org.apache.druid.indexing.worker.WorkerTaskManager.2
            public void onSuccess(TaskStatus taskStatus) {
                WorkerTaskManager.this.submitNoticeToExec(new StatusNotice(task, taskStatus));
            }

            public void onFailure(Throwable th) {
                WorkerTaskManager.this.submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId(), "Failed to run task with an exception. See middleManager or indexer logs for more details.")));
            }
        }, MoreExecutors.directExecutor());
    }

    public void assignTask(Task task) {
        Preconditions.checkState(this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS), "not started");
        synchronized (this.lock) {
            if (this.assignedTasks.containsKey(task.getId()) || this.runningTasks.containsKey(task.getId()) || this.completedTasks.containsKey(task.getId())) {
                log.warn("Request to assign task[%s] ignored because it exists already.", new Object[]{task.getId()});
                return;
            }
            try {
                FileUtils.writeAtomically(new File(getAssignedTaskDir(), task.getId()), getTmpTaskDir(), outputStream -> {
                    this.jsonMapper.writeValue(outputStream, task);
                    return null;
                });
                this.assignedTasks.put(task.getId(), task);
                this.changeHistory.addChangeRequest(new WorkerHistoryItem.TaskUpdate(TaskAnnouncement.create(task, TaskStatus.running(task.getId()), TaskLocation.unknown())));
                submitNoticeToExec(new RunNotice(task));
            } catch (IOException e) {
                log.error(e, "Error while trying to persist assigned task[%s]", new Object[]{task.getId()});
                throw new ISE("Assign Task[%s] Request failed because [%s].", new Object[]{task.getId(), e.getMessage()});
            }
        }
    }

    private File getTmpTaskDir() {
        return new File(this.storageDir, "workerTaskManagerTmp");
    }

    private void cleanupAndMakeTmpTaskDir() throws IOException {
        File tmpTaskDir = getTmpTaskDir();
        FileUtils.mkdirp(tmpTaskDir);
        if (!tmpTaskDir.isDirectory()) {
            throw new ISE("Tmp Tasks Dir [%s] does not exist/not-a-directory.", new Object[]{tmpTaskDir});
        }
        try {
            org.apache.commons.io.FileUtils.cleanDirectory(tmpTaskDir);
        } catch (IOException e) {
            log.warn("Failed to cleanup tmp dir [%s].", new Object[]{tmpTaskDir.getAbsolutePath()});
        }
    }

    public File getAssignedTaskDir() {
        return new File(this.storageDir, "assignedTasks");
    }

    private void initAssignedTasks() throws IOException {
        String name;
        Task task;
        File assignedTaskDir = getAssignedTaskDir();
        log.debug("Looking for any previously assigned tasks on disk[%s].", new Object[]{assignedTaskDir});
        FileUtils.mkdirp(assignedTaskDir);
        for (File file : assignedTaskDir.listFiles()) {
            try {
                name = file.getName();
                task = (Task) this.jsonMapper.readValue(file, Task.class);
            } catch (IOException e) {
                log.noStackTrace().error(e, "Failed to read assigned task from disk at [%s]. Ignored.", new Object[]{file.getAbsoluteFile()});
            }
            if (!name.equals(task.getId())) {
                throw new ISE("Corrupted assigned task on disk[%s].", new Object[]{file.getAbsoluteFile()});
                break;
            }
            this.assignedTasks.put(name, task);
        }
        if (!this.assignedTasks.isEmpty()) {
            log.info("Found %,d running tasks from previous run: %s", new Object[]{Integer.valueOf(this.assignedTasks.size()), this.assignedTasks.values().stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.joining(", "))});
        }
        Iterator<Task> it = this.assignedTasks.values().iterator();
        while (it.hasNext()) {
            submitNoticeToExec(new RunNotice(it.next()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanupAssignedTask(Task task) {
        this.assignedTasks.remove(task.getId());
        File file = new File(getAssignedTaskDir(), task.getId());
        try {
            Files.delete(file.toPath());
        } catch (IOException e) {
            log.error(e, "Failed to delete assigned task from disk at [%s].", new Object[]{file});
        }
    }

    public ListenableFuture<ChangeRequestsSnapshot<WorkerHistoryItem>> getChangesSince(ChangeRequestHistory.Counter counter) {
        SettableFuture create;
        Preconditions.checkState(this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS), "not started");
        if (counter.getCounter() >= 0) {
            return this.changeHistory.getRequestsSince(counter);
        }
        synchronized (this.lock) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new WorkerHistoryItem.Metadata(this.disabled.get()));
            for (Task task : this.assignedTasks.values()) {
                arrayList.add(new WorkerHistoryItem.TaskUpdate(TaskAnnouncement.create(task, TaskStatus.running(task.getId()), TaskLocation.unknown())));
            }
            for (TaskDetails taskDetails : this.runningTasks.values()) {
                arrayList.add(new WorkerHistoryItem.TaskUpdate(TaskAnnouncement.create(taskDetails.task, taskDetails.status, taskDetails.location)));
            }
            Iterator<TaskAnnouncement> it = this.completedTasks.values().iterator();
            while (it.hasNext()) {
                arrayList.add(new WorkerHistoryItem.TaskUpdate(it.next()));
            }
            create = SettableFuture.create();
            create.set(ChangeRequestsSnapshot.success(this.changeHistory.getLastCounter(), Lists.newArrayList(arrayList)));
        }
        return create;
    }

    public File getCompletedTaskDir() {
        return new File(this.storageDir, "completedTasks");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void moveFromRunningToCompleted(String str, TaskAnnouncement taskAnnouncement) {
        synchronized (this.lock) {
            this.runningTasks.remove(str);
            addCompletedTask(str, taskAnnouncement);
            try {
                FileUtils.writeAtomically(new File(getCompletedTaskDir(), str), getTmpTaskDir(), outputStream -> {
                    this.jsonMapper.writeValue(outputStream, taskAnnouncement);
                    return null;
                });
            } catch (IOException e) {
                log.error(e, "Error while trying to persist completed task[%s] announcement.", new Object[]{str});
                throw new ISE("Persisting completed task[%s] announcement failed because [%s].", new Object[]{str, e.getMessage()});
            }
        }
    }

    private void initCompletedTasks() throws IOException {
        String name;
        TaskAnnouncement taskAnnouncement;
        File completedTaskDir = getCompletedTaskDir();
        log.debug("Looking for any previously completed tasks on disk[%s].", new Object[]{completedTaskDir});
        FileUtils.mkdirp(completedTaskDir);
        for (File file : completedTaskDir.listFiles()) {
            try {
                name = file.getName();
                taskAnnouncement = (TaskAnnouncement) this.jsonMapper.readValue(file, TaskAnnouncement.class);
            } catch (IOException e) {
                log.error(e, "Failed to read completed task from disk at [%s]. Ignored.", new Object[]{file.getAbsoluteFile()});
            }
            if (!name.equals(taskAnnouncement.getTaskId())) {
                throw new ISE("Corrupted completed task on disk[%s].", new Object[]{file.getAbsoluteFile()});
                break;
            }
            addCompletedTask(name, taskAnnouncement);
        }
        if (this.completedTasks.isEmpty()) {
            return;
        }
        log.info("Found %,d complete tasks from previous run: %s", new Object[]{Integer.valueOf(this.completedTasks.size()), this.completedTasks.values().stream().map(taskAnnouncement2 -> {
            return StringUtils.format("%s (%s)", new Object[]{taskAnnouncement2.getTaskId(), taskAnnouncement2.getStatus()});
        }).collect(Collectors.joining(", "))});
    }

    private void scheduleCompletedTasksCleanup() {
        this.completedTasksCleanupExecutor.scheduleAtFixedRate(() -> {
            try {
                doCompletedTasksCleanup();
            } catch (Throwable th) {
                log.error(th, "Got unknown exception while running the scheduled cleanup.", new Object[0]);
            }
        }, 1L, 5L, TimeUnit.MINUTES);
    }

    public void workerEnabled() {
        Preconditions.checkState(this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS), "not started");
        if (this.disabled.compareAndSet(true, false)) {
            this.changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(false));
        }
    }

    public void workerDisabled() {
        Preconditions.checkState(this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS), "not started");
        if (this.disabled.compareAndSet(false, true)) {
            this.changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(true));
        }
    }

    public boolean isWorkerEnabled() {
        Preconditions.checkState(this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS), "not started");
        return !this.disabled.get();
    }

    void doCompletedTasksCleanup() throws InterruptedException {
        Either error;
        if (this.completedTasks.isEmpty()) {
            log.debug("Skipping completed tasks cleanup, because there are no completed tasks.", new Object[0]);
            return;
        }
        ImmutableSet copyOf = ImmutableSet.copyOf(this.completedTasks.keySet());
        try {
            error = Either.value((Map) FutureUtils.get(this.overlordClient.taskStatuses(copyOf), true));
            log.debug("Received completed task status response [%s].", new Object[]{error});
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof HttpResponseException)) {
                error = Either.error(e.getCause());
            } else if (e.getCause().getResponse().getStatus().getCode() == 404) {
                log.debug("Deleting all completed tasks. Overlord appears to be running on older version.", new Object[0]);
                error = Either.value(ImmutableMap.of());
            } else {
                error = Either.error(e.getCause());
            }
        }
        if (error.isError()) {
            log.warn((Throwable) error.error(), "Exception while getting active tasks from Overlord. Will retry on next scheduled run.", new Object[0]);
            return;
        }
        UnmodifiableIterator it = copyOf.iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            TaskStatus taskStatus = (TaskStatus) ((Map) error.valueOrThrow()).get(str);
            if (taskStatus == null || taskStatus.isComplete()) {
                EmittingLogger emittingLogger = log;
                Object[] objArr = new Object[2];
                objArr[0] = str;
                objArr[1] = taskStatus == null ? "unknown" : taskStatus.getStatusCode();
                emittingLogger.debug("Deleting completed task[%s] information, Overlord task status[%s].", objArr);
                this.completedTasks.remove(str);
                File file = new File(getCompletedTaskDir(), str);
                try {
                    Files.deleteIfExists(file.toPath());
                    this.changeHistory.addChangeRequest(new WorkerHistoryItem.TaskRemoval(str));
                } catch (IOException e2) {
                    log.error(e2, "Failed to delete completed task from disk [%s].", new Object[]{file});
                }
            }
        }
    }

    void addCompletedTask(String str, TaskAnnouncement taskAnnouncement) {
        this.completedTasks.put(str, taskAnnouncement);
    }

    private <T> Map<String, Long> getNumTasksPerDatasource(Collection<T> collection, Function<T, String> function) {
        HashMap hashMap = new HashMap();
        Iterator<T> it = collection.iterator();
        while (it.hasNext()) {
            hashMap.merge(function.apply(it.next()), 1L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
        }
        return hashMap;
    }

    public Map<String, Long> getWorkerRunningTasks() {
        return getNumTasksPerDatasource(CollectionUtils.mapValues(this.runningTasks, taskDetails -> {
            return taskDetails.task;
        }).values(), (v0) -> {
            return v0.getDataSource();
        });
    }

    public Map<String, Long> getWorkerAssignedTasks() {
        return getNumTasksPerDatasource(this.assignedTasks.values(), (v0) -> {
            return v0.getDataSource();
        });
    }

    public Map<String, Long> getWorkerCompletedTasks() {
        return getNumTasksPerDatasource(getCompletedTasks().values(), (v0) -> {
            return v0.getTaskDataSource();
        });
    }

    protected void taskStarted(String str) {
    }

    protected void taskAnnouncementChanged(TaskAnnouncement taskAnnouncement) {
    }
}
