package org.apache.druid.indexing.overlord;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerTest;
import org.apache.druid.indexing.overlord.hrtr.WorkerHolder;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/druid/indexing/overlord/TaskQueueTest.class */
public class TaskQueueTest extends IngestionTestBase {
    private static final Granularity SEGMENT_GRANULARITY = Granularities.DAY;

    /* loaded from: input_file:org/apache/druid/indexing/overlord/TaskQueueTest$SimpleTaskRunner.class */
    private static class SimpleTaskRunner implements TaskRunner {
        private final TaskActionClientFactory actionClientFactory;

        private SimpleTaskRunner(TaskActionClientFactory taskActionClientFactory) {
            this.actionClientFactory = taskActionClientFactory;
        }

        public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
            return null;
        }

        public void start() {
        }

        public void registerListener(TaskRunnerListener taskRunnerListener, Executor executor) {
        }

        public void unregisterListener(String str) {
        }

        public ListenableFuture<TaskStatus> run(Task task) {
            try {
                TaskToolbox taskToolbox = (TaskToolbox) Mockito.mock(TaskToolbox.class);
                Mockito.when(taskToolbox.getTaskActionClient()).thenReturn(this.actionClientFactory.create(task));
                return Futures.immediateFuture(task.run(taskToolbox));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void shutdown(String str, String str2) {
        }

        public void stop() {
        }

        public Collection<? extends TaskRunnerWorkItem> getRunningTasks() {
            return null;
        }

        public Collection<? extends TaskRunnerWorkItem> getPendingTasks() {
            return null;
        }

        public Collection<? extends TaskRunnerWorkItem> getKnownTasks() {
            return Collections.emptyList();
        }

        public Optional<ScalingStats> getScalingStats() {
            return null;
        }

        public Map<String, Long> getTotalTaskSlotCount() {
            return ImmutableMap.of("_default_worker_category", 0L);
        }

        public Map<String, Long> getIdleTaskSlotCount() {
            return ImmutableMap.of("_default_worker_category", 0L);
        }

        public Map<String, Long> getUsedTaskSlotCount() {
            return ImmutableMap.of("_default_worker_category", 0L);
        }

        public Map<String, Long> getLazyTaskSlotCount() {
            return ImmutableMap.of("_default_worker_category", 0L);
        }

        public Map<String, Long> getBlacklistedTaskSlotCount() {
            return ImmutableMap.of("_default_worker_category", 0L);
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/overlord/TaskQueueTest$TestTask.class */
    private static class TestTask extends AbstractBatchIndexTask {
        private final Interval interval;
        private boolean done;

        private TestTask(String str, Interval interval) {
            this(str, interval, (Map<String, Object>) null);
        }

        private TestTask(String str, Interval interval, Map<String, Object> map) {
            super(str, "datasource", map, AbstractTask.IngestionMode.NONE);
            this.interval = interval;
        }

        public boolean isReady(TaskActionClient taskActionClient) throws Exception {
            return tryTimeChunkLock(taskActionClient, ImmutableList.of(this.interval));
        }

        public String setup(TaskToolbox taskToolbox) {
            return null;
        }

        public void cleanUp(TaskToolbox taskToolbox, TaskStatus taskStatus) {
        }

        public TaskStatus runTask(TaskToolbox taskToolbox) {
            this.done = true;
            return TaskStatus.success(getId());
        }

        public boolean requireLockExistingSegments() {
            return false;
        }

        public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> list) {
            return null;
        }

        public boolean isPerfectRollup() {
            return false;
        }

        @Nullable
        public Granularity getSegmentGranularity() {
            return TaskQueueTest.SEGMENT_GRANULARITY;
        }

        public String getType() {
            return "test";
        }

        public boolean isDone() {
            return this.done;
        }
    }

    @Test
    public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception {
        IngestionTestBase.TestLocalTaskActionClientFactory createActionClientFactory = createActionClientFactory();
        TaskQueue taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, (Period) null, (Period) null, (Period) null), new DefaultTaskConfig(), getTaskStorage(), new SimpleTaskRunner(createActionClientFactory), createActionClientFactory, getLockbox(), new NoopServiceEmitter());
        taskQueue.setActive(true);
        TestTask testTask = new TestTask("t1", Intervals.of("2021-01/P1M"));
        prepareTaskForLocking(testTask);
        Assert.assertTrue(testTask.isReady(createActionClientFactory.create(testTask)));
        TestTask testTask2 = new TestTask("t2", Intervals.of("2021-01-31/P1M"));
        taskQueue.add(testTask2);
        taskQueue.manageInternal();
        Assert.assertFalse(testTask2.isDone());
        Assert.assertTrue(getLockbox().findLocksForTask(testTask2).isEmpty());
        TestTask testTask3 = new TestTask("t3", Intervals.of("2021-02-01/P1M"));
        taskQueue.add(testTask3);
        taskQueue.manageInternal();
        Assert.assertFalse(testTask2.isDone());
        Assert.assertTrue(testTask3.isDone());
        Assert.assertTrue(getLockbox().findLocksForTask(testTask2).isEmpty());
        shutdownTask(testTask);
        taskQueue.shutdown(testTask3.getId(), "Emulating shutdown of task3", new Object[0]);
        taskQueue.manageInternal();
        Assert.assertTrue(testTask2.isDone());
    }

    @Test
    public void testShutdownReleasesTaskLock() throws Exception {
        IngestionTestBase.TestLocalTaskActionClientFactory createActionClientFactory = createActionClientFactory();
        TaskQueue taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, (Period) null, (Period) null, (Period) null), new DefaultTaskConfig(), getTaskStorage(), new SimpleTaskRunner(createActionClientFactory), createActionClientFactory, getLockbox(), new NoopServiceEmitter());
        taskQueue.setActive(true);
        TestTask testTask = new TestTask("t1", Intervals.of("2021-01/P1M"));
        taskQueue.add(testTask);
        getLockbox().lock(testTask, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, testTask, testTask.interval, (String) null));
        List findLocksForTask = getLockbox().findLocksForTask(testTask);
        Assert.assertEquals(1L, findLocksForTask.size());
        Assert.assertEquals(testTask.interval, ((TaskLock) findLocksForTask.get(0)).getInterval());
        taskQueue.shutdown(testTask.getId(), "Shutdown Task test", new Object[0]);
        Assert.assertTrue(getLockbox().findLocksForTask(testTask).isEmpty());
        Optional status = getTaskStorage().getStatus(testTask.getId());
        Assert.assertTrue(status.isPresent());
        Assert.assertEquals(TaskState.FAILED, ((TaskStatus) status.get()).getStatusCode());
        Assert.assertNotNull(((TaskStatus) status.get()).getErrorMsg());
        Assert.assertEquals("Shutdown Task test", ((TaskStatus) status.get()).getErrorMsg());
    }

    @Test
    public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExistsException {
        IngestionTestBase.TestLocalTaskActionClientFactory createActionClientFactory = createActionClientFactory();
        TaskQueue taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, (Period) null, (Period) null, (Period) null), new DefaultTaskConfig(), getTaskStorage(), new SimpleTaskRunner(createActionClientFactory), createActionClientFactory, getLockbox(), new NoopServiceEmitter());
        taskQueue.setActive(true);
        taskQueue.add(new TestTask("t1", Intervals.of("2021-01-01/P1D")));
        List tasks = taskQueue.getTasks();
        Assert.assertEquals(1L, tasks.size());
        Assert.assertTrue(((Boolean) ((Task) tasks.get(0)).getContextValue("useLineageBasedSegmentAllocation")).booleanValue());
    }

    @Test
    public void testDefaultTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException {
        IngestionTestBase.TestLocalTaskActionClientFactory createActionClientFactory = createActionClientFactory();
        TaskQueue taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, (Period) null, (Period) null, (Period) null), new DefaultTaskConfig() { // from class: org.apache.druid.indexing.overlord.TaskQueueTest.1
            public Map<String, Object> getContext() {
                return ImmutableMap.of("useLineageBasedSegmentAllocation", false);
            }
        }, getTaskStorage(), new SimpleTaskRunner(createActionClientFactory), createActionClientFactory, getLockbox(), new NoopServiceEmitter());
        taskQueue.setActive(true);
        taskQueue.add(new TestTask("t1", Intervals.of("2021-01-01/P1D")));
        List tasks = taskQueue.getTasks();
        Assert.assertEquals(1L, tasks.size());
        Assert.assertFalse(((Boolean) ((Task) tasks.get(0)).getContextValue("useLineageBasedSegmentAllocation")).booleanValue());
    }

    @Test
    public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException {
        IngestionTestBase.TestLocalTaskActionClientFactory createActionClientFactory = createActionClientFactory();
        TaskQueue taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, (Period) null, (Period) null, (Period) null), new DefaultTaskConfig(), getTaskStorage(), new SimpleTaskRunner(createActionClientFactory), createActionClientFactory, getLockbox(), new NoopServiceEmitter());
        taskQueue.setActive(true);
        taskQueue.add(new TestTask("t1", Intervals.of("2021-01-01/P1D"), ImmutableMap.of("useLineageBasedSegmentAllocation", false)));
        List tasks = taskQueue.getTasks();
        Assert.assertEquals(1L, tasks.size());
        Assert.assertFalse(((Boolean) ((Task) tasks.get(0)).getContextValue("useLineageBasedSegmentAllocation")).booleanValue());
    }

    @Test
    public void testLockConfigTakePrecedenceThanDefaultTaskContext() throws EntryExistsException {
        IngestionTestBase.TestLocalTaskActionClientFactory createActionClientFactory = createActionClientFactory();
        TaskQueue taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, (Period) null, (Period) null, (Period) null), new DefaultTaskConfig() { // from class: org.apache.druid.indexing.overlord.TaskQueueTest.2
            public Map<String, Object> getContext() {
                return ImmutableMap.of("forceTimeChunkLock", false);
            }
        }, getTaskStorage(), new SimpleTaskRunner(createActionClientFactory), createActionClientFactory, getLockbox(), new NoopServiceEmitter());
        taskQueue.setActive(true);
        taskQueue.add(new TestTask("t1", Intervals.of("2021-01-01/P1D")));
        List tasks = taskQueue.getTasks();
        Assert.assertEquals(1L, tasks.size());
        Assert.assertTrue(((Boolean) ((Task) tasks.get(0)).getContextValue("forceTimeChunkLock")).booleanValue());
    }

    @Test
    public void testUserProvidedContextOverrideLockConfig() throws EntryExistsException {
        IngestionTestBase.TestLocalTaskActionClientFactory createActionClientFactory = createActionClientFactory();
        TaskQueue taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, (Period) null, (Period) null, (Period) null), new DefaultTaskConfig(), getTaskStorage(), new SimpleTaskRunner(createActionClientFactory), createActionClientFactory, getLockbox(), new NoopServiceEmitter());
        taskQueue.setActive(true);
        taskQueue.add(new TestTask("t1", Intervals.of("2021-01-01/P1D"), ImmutableMap.of("forceTimeChunkLock", false)));
        List tasks = taskQueue.getTasks();
        Assert.assertEquals(1L, tasks.size());
        Assert.assertFalse(((Boolean) ((Task) tasks.get(0)).getContextValue("forceTimeChunkLock")).booleanValue());
    }

    @Test
    public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsException {
        IngestionTestBase.TestLocalTaskActionClientFactory createActionClientFactory = createActionClientFactory();
        TaskQueue taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, (Period) null, (Period) null, (Period) null), new DefaultTaskConfig(), getTaskStorage(), new SimpleTaskRunner(createActionClientFactory), createActionClientFactory, getLockbox(), new NoopServiceEmitter());
        taskQueue.setActive(true);
        TestTask testTask = new TestTask("t1", Intervals.of("2021-01-01/P1D")) { // from class: org.apache.druid.indexing.overlord.TaskQueueTest.3
            @Override // org.apache.druid.indexing.overlord.TaskQueueTest.TestTask
            public boolean isReady(TaskActionClient taskActionClient) {
                throw new RuntimeException("isReady failure test");
            }
        };
        taskQueue.add(testTask);
        taskQueue.manageInternal();
        Optional status = getTaskStorage().getStatus(testTask.getId());
        Assert.assertTrue(status.isPresent());
        Assert.assertEquals(TaskState.FAILED, ((TaskStatus) status.get()).getStatusCode());
        Assert.assertNotNull(((TaskStatus) status.get()).getErrorMsg());
        Assert.assertTrue(StringUtils.format("Actual message is: %s", new Object[]{((TaskStatus) status.get()).getErrorMsg()}), ((TaskStatus) status.get()).getErrorMsg().startsWith("Failed while waiting for the task to be ready to run"));
    }

    @Test
    public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws EntryExistsException, InterruptedException {
        IngestionTestBase.TestLocalTaskActionClientFactory createActionClientFactory = createActionClientFactory();
        HttpRemoteTaskRunner createHttpRemoteTaskRunner = createHttpRemoteTaskRunner(ImmutableList.of("t1"));
        StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("druid/overlord", "testHost");
        WorkerHolder workerHolder = (WorkerHolder) EasyMock.createMock(WorkerHolder.class);
        EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", "_default_worker_category")).anyTimes();
        workerHolder.incrementContinuouslyFailedTasksCount();
        EasyMock.expectLastCall();
        workerHolder.setLastCompletedTaskTime((DateTime) EasyMock.anyObject());
        EasyMock.expect(Integer.valueOf(workerHolder.getContinuouslyFailedTasksCount())).andReturn(1);
        EasyMock.replay(new Object[]{workerHolder});
        TaskQueue taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, (Period) null, (Period) null, (Period) null), new DefaultTaskConfig(), getTaskStorage(), createHttpRemoteTaskRunner, createActionClientFactory, getLockbox(), stubServiceEmitter);
        taskQueue.setActive(true);
        TestTask testTask = new TestTask("t1", Intervals.of("2021-01-01/P1D"), ImmutableMap.of("forceTimeChunkLock", false));
        taskQueue.add(testTask);
        taskQueue.manageInternal();
        createHttpRemoteTaskRunner.taskAddedOrUpdated(TaskAnnouncement.create(testTask, TaskStatus.running(testTask.getId()), TaskLocation.create("worker", 1, 2)), workerHolder);
        while (!((List) createHttpRemoteTaskRunner.getRunningTasks().stream().map((v0) -> {
            return v0.getTaskId();
        }).collect(Collectors.toList())).contains(testTask.getId())) {
            Thread.sleep(100L);
        }
        taskQueue.shutdown(testTask.getId(), "shutdown", new Object[0]);
        createHttpRemoteTaskRunner.taskAddedOrUpdated(TaskAnnouncement.create(testTask, TaskStatus.failure(testTask.getId(), "shutdown"), TaskLocation.create("worker", 1, 2)), workerHolder);
        taskQueue.manageInternal();
        stubServiceEmitter.getEvents();
        stubServiceEmitter.verifyEmitted("task/run/time", 1);
    }

    private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String> list) {
        HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery testDruidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
        DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = (DruidNodeDiscoveryProvider) EasyMock.createMock(DruidNodeDiscoveryProvider.class);
        EasyMock.expect(druidNodeDiscoveryProvider.getForService("workerNodeService")).andReturn(testDruidNodeDiscovery);
        EasyMock.replay(new Object[]{druidNodeDiscoveryProvider});
        TaskStorage taskStorage = (TaskStorage) EasyMock.createStrictMock(TaskStorage.class);
        for (String str : list) {
            EasyMock.expect(taskStorage.getStatus(str)).andReturn(Optional.of(TaskStatus.running(str)));
        }
        EasyMock.replay(new Object[]{taskStorage});
        HttpRemoteTaskRunner httpRemoteTaskRunner = new HttpRemoteTaskRunner(TestHelper.makeJsonMapper(), new HttpRemoteTaskRunnerConfig() { // from class: org.apache.druid.indexing.overlord.TaskQueueTest.4
            public int getPendingTasksRunnerNumThreads() {
                return 3;
            }
        }, (HttpClient) EasyMock.createNiceMock(HttpClient.class), DSuppliers.of(new AtomicReference(DefaultWorkerBehaviorConfig.defaultConfig())), new NoopProvisioningStrategy(), druidNodeDiscoveryProvider, (TaskStorage) EasyMock.createNiceMock(TaskStorage.class), (CuratorFramework) EasyMock.createNiceMock(CuratorFramework.class), new IndexerZkConfig(new ZkPathsConfig(), (String) null, (String) null, (String) null, (String) null), new StubServiceEmitter("druid/overlord", "testHost"));
        httpRemoteTaskRunner.start();
        httpRemoteTaskRunner.registerListener(new TaskRunnerListener() { // from class: org.apache.druid.indexing.overlord.TaskQueueTest.5
            public String getListenerId() {
                return "test-listener";
            }

            public void locationChanged(String str2, TaskLocation taskLocation) {
            }

            public void statusChanged(String str2, TaskStatus taskStatus) {
            }
        }, Execs.directExecutor());
        return httpRemoteTaskRunner;
    }
}
