package org.apache.druid.indexing.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataStorageTablesConfig;

@Deprecated
/* loaded from: input_file:org/apache/druid/indexing/worker/WorkerTaskMonitor.class */
public class WorkerTaskMonitor extends WorkerTaskManager {
    private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
    private final ObjectMapper jsonMapper;
    private final PathChildrenCache pathChildrenCache;
    private final CuratorFramework cf;
    private final WorkerCuratorCoordinator workerCuratorCoordinator;
    private final Object lifecycleLock;
    private volatile boolean started;

    @Inject
    public WorkerTaskMonitor(ObjectMapper objectMapper, TaskRunner taskRunner, TaskConfig taskConfig, CuratorFramework curatorFramework, WorkerCuratorCoordinator workerCuratorCoordinator, @IndexingService DruidLeaderClient druidLeaderClient) {
        super(objectMapper, taskRunner, taskConfig, druidLeaderClient);
        this.lifecycleLock = new Object();
        this.started = false;
        this.jsonMapper = objectMapper;
        this.pathChildrenCache = new PathChildrenCache(curatorFramework, workerCuratorCoordinator.getTaskPathForWorker(), false, true, Execs.makeThreadFactory("TaskMonitorCache-%s"));
        this.cf = curatorFramework;
        this.workerCuratorCoordinator = workerCuratorCoordinator;
    }

    @Override // org.apache.druid.indexing.worker.WorkerTaskManager
    @LifecycleStart
    public void start() throws Exception {
        super.start();
        synchronized (this.lifecycleLock) {
            Preconditions.checkState(!this.started, "already started");
            this.started = true;
            try {
                cleanupStaleAnnouncements();
                registerRunListener();
                this.pathChildrenCache.start();
                log.debug("Started WorkerTaskMonitor.", new Object[0]);
                this.started = true;
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                log.makeAlert(e2, "Exception starting WorkerTaskMonitor", new Object[0]).emit();
                throw e2;
            }
        }
    }

    private void cleanupStaleAnnouncements() throws Exception {
        synchronized (this.lock) {
            for (TaskAnnouncement taskAnnouncement : this.workerCuratorCoordinator.getAnnouncements()) {
                if (taskAnnouncement.getTaskStatus().isRunnable()) {
                    TaskStatus taskStatus = null;
                    TaskAnnouncement taskAnnouncement2 = this.completedTasks.get(taskAnnouncement.getTaskId());
                    if (taskAnnouncement2 != null) {
                        taskStatus = taskAnnouncement2.getTaskStatus();
                    } else if (!this.runningTasks.containsKey(taskAnnouncement.getTaskStatus().getId())) {
                        taskStatus = TaskStatus.failure(taskAnnouncement.getTaskStatus().getId(), "Canceled as unknown task. See middleManager or indexer logs for more details.");
                    }
                    if (taskStatus != null) {
                        log.info("Cleaning up stale announcement for task [%s]. New status is [%s].", taskAnnouncement.getTaskStatus().getId(), taskStatus.getStatusCode());
                        this.workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(taskAnnouncement.getTaskStatus().getId(), taskAnnouncement.getTaskType(), taskAnnouncement.getTaskResource(), taskStatus, TaskLocation.unknown(), taskAnnouncement.getTaskDataSource()));
                    }
                }
            }
        }
    }

    private void registerRunListener() {
        this.pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: org.apache.druid.indexing.worker.WorkerTaskMonitor.1
            @Override // org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                if (CuratorUtils.isChildAdded(pathChildrenCacheEvent)) {
                    WorkerTaskMonitor.this.assignTask((Task) WorkerTaskMonitor.this.jsonMapper.readValue(WorkerTaskMonitor.this.cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()), Task.class));
                }
            }
        });
    }

    @Override // org.apache.druid.indexing.worker.WorkerTaskManager
    @LifecycleStop
    public void stop() throws Exception {
        super.stop();
        synchronized (this.lifecycleLock) {
            Preconditions.checkState(this.started, "not started");
            try {
                this.started = false;
                this.pathChildrenCache.close();
                log.debug("Stopped WorkerTaskMonitor.", new Object[0]);
            } catch (Exception e) {
                log.makeAlert(e, "Exception stopping WorkerTaskMonitor", new Object[0]).emit();
            }
        }
    }

    @Override // org.apache.druid.indexing.worker.WorkerTaskManager
    protected void taskStarted(String str) {
        try {
            this.workerCuratorCoordinator.removeTaskRunZnode(str);
        } catch (Exception e) {
            log.error(e, "Unknown exception while deleting task[%s] znode.", str);
        }
    }

    @Override // org.apache.druid.indexing.worker.WorkerTaskManager
    protected void taskAnnouncementChanged(TaskAnnouncement taskAnnouncement) {
        try {
            this.workerCuratorCoordinator.updateTaskStatusAnnouncement(taskAnnouncement);
        } catch (Exception e) {
            log.makeAlert(e, "Failed to update task announcement", new Object[0]).addData(MetadataStorageTablesConfig.TASK_ENTRY_TYPE, taskAnnouncement.getTaskId()).emit();
        }
    }
}
