package org.apache.druid.indexing.overlord;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/overlord/TaskQueueScaleTest.class */
public class TaskQueueScaleTest {
    private static final String DATASOURCE = "ds";
    private final int numTasks = 1000;

    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
    private TaskQueue taskQueue;
    private TaskStorage taskStorage;
    private TestTaskRunner taskRunner;
    private Closer closer;

    /* loaded from: input_file:org/apache/druid/indexing/overlord/TaskQueueScaleTest$TestTask.class */
    private static class TestTask extends NoopTask {
        private final int number;
        private final long runtime;

        public TestTask(int i, long j) {
            super((String) null, (String) null, TaskQueueScaleTest.DATASOURCE, 0L, 0L, (String) null, (FirehoseFactory) null, Collections.emptyMap());
            this.number = i;
            this.runtime = j;
        }

        public int getNumber() {
            return this.number;
        }

        public long getRuntimeMillis() {
            return this.runtime;
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/overlord/TaskQueueScaleTest$TestTaskRunner.class */
    private static class TestTaskRunner implements TaskRunner {
        private static final Logger log = new Logger(TestTaskRunner.class);
        private static final Duration T_PENDING_TO_RUNNING = Duration.standardSeconds(2);
        private static final Duration T_SHUTDOWN_ACK = Duration.millis(8);
        private static final Duration T_SHUTDOWN_COMPLETE = Duration.standardSeconds(2);

        @GuardedBy("knownTasks")
        private final Map<String, TestTaskRunnerWorkItem> knownTasks;
        private final ScheduledExecutorService exec;

        private TestTaskRunner() {
            this.knownTasks = new HashMap();
            this.exec = ScheduledExecutors.fixed(8, "TaskQueueScaleTest-%s");
        }

        public void start() {
            throw new UnsupportedOperationException();
        }

        public ListenableFuture<TaskStatus> run(Task task) {
            ListenableFuture<TaskStatus> result;
            synchronized (this.knownTasks) {
                TestTaskRunnerWorkItem computeIfAbsent = this.knownTasks.computeIfAbsent(task.getId(), TestTaskRunnerWorkItem::new);
                this.exec.schedule(() -> {
                    try {
                        synchronized (this.knownTasks) {
                            TestTaskRunnerWorkItem testTaskRunnerWorkItem = this.knownTasks.get(task.getId());
                            if (testTaskRunnerWorkItem.getState() == RunnerTaskState.PENDING) {
                                this.knownTasks.put(task.getId(), testTaskRunnerWorkItem.withState(RunnerTaskState.RUNNING));
                            }
                        }
                        this.exec.schedule(() -> {
                            TestTaskRunnerWorkItem testTaskRunnerWorkItem2;
                            try {
                                synchronized (this.knownTasks) {
                                    testTaskRunnerWorkItem2 = this.knownTasks.get(task.getId());
                                    this.knownTasks.put(task.getId(), testTaskRunnerWorkItem2.withState(RunnerTaskState.NONE));
                                }
                                if (testTaskRunnerWorkItem2 != null) {
                                    testTaskRunnerWorkItem2.setResult(TaskStatus.success(task.getId()));
                                }
                            } catch (Throwable th) {
                                log.error(th, "Error in scheduled executor", new Object[0]);
                            }
                        }, ((TestTask) task).getRuntimeMillis(), TimeUnit.MILLISECONDS);
                    } catch (Throwable th) {
                        log.error(th, "Error in scheduled executor", new Object[0]);
                    }
                }, T_PENDING_TO_RUNNING.getMillis(), TimeUnit.MILLISECONDS);
                result = computeIfAbsent.getResult();
            }
            return result;
        }

        public void shutdown(String str, String str2) {
            TestTaskRunnerWorkItem testTaskRunnerWorkItem;
            synchronized (this.knownTasks) {
                if (this.knownTasks.containsKey(str)) {
                    threadSleep(T_SHUTDOWN_ACK);
                    synchronized (this.knownTasks) {
                        testTaskRunnerWorkItem = this.knownTasks.get(str);
                    }
                    if (testTaskRunnerWorkItem.getResult().isDone()) {
                        return;
                    }
                    this.exec.schedule(() -> {
                        testTaskRunnerWorkItem.setResult(TaskStatus.failure("taskId", "stopped"));
                        synchronized (this.knownTasks) {
                            this.knownTasks.remove(str);
                        }
                    }, T_SHUTDOWN_COMPLETE.getMillis(), TimeUnit.MILLISECONDS);
                }
            }
        }

        static void threadSleep(Duration duration) {
            try {
                Thread.sleep(duration.getMillis());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public void registerListener(TaskRunnerListener taskRunnerListener, Executor executor) {
            throw new UnsupportedOperationException();
        }

        public void unregisterListener(String str) {
            throw new UnsupportedOperationException();
        }

        public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
            return null;
        }

        public void stop() {
            this.exec.shutdownNow();
        }

        public Collection<? extends TaskRunnerWorkItem> getRunningTasks() {
            Collection<? extends TaskRunnerWorkItem> collection;
            synchronized (this.knownTasks) {
                collection = (Collection) this.knownTasks.values().stream().filter(testTaskRunnerWorkItem -> {
                    return testTaskRunnerWorkItem.getState() == RunnerTaskState.RUNNING;
                }).collect(Collectors.toList());
            }
            return collection;
        }

        public Collection<? extends TaskRunnerWorkItem> getPendingTasks() {
            Collection<? extends TaskRunnerWorkItem> collection;
            synchronized (this.knownTasks) {
                collection = (Collection) this.knownTasks.values().stream().filter(testTaskRunnerWorkItem -> {
                    return testTaskRunnerWorkItem.getState() == RunnerTaskState.PENDING;
                }).collect(Collectors.toList());
            }
            return collection;
        }

        public Collection<? extends TaskRunnerWorkItem> getKnownTasks() {
            ImmutableList copyOf;
            synchronized (this.knownTasks) {
                copyOf = ImmutableList.copyOf(this.knownTasks.values());
            }
            return copyOf;
        }

        public Optional<ScalingStats> getScalingStats() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getTotalTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getIdleTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getUsedTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getLazyTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getBlacklistedTaskSlotCount() {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/overlord/TaskQueueScaleTest$TestTaskRunnerWorkItem.class */
    public static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem {
        private final RunnerTaskState state;

        public TestTaskRunnerWorkItem(String str) {
            this(str, SettableFuture.create(), RunnerTaskState.PENDING);
        }

        private TestTaskRunnerWorkItem(String str, ListenableFuture<TaskStatus> listenableFuture, RunnerTaskState runnerTaskState) {
            super(str, listenableFuture);
            this.state = runnerTaskState;
        }

        public RunnerTaskState getState() {
            return this.state;
        }

        public TaskLocation getLocation() {
            return TaskLocation.unknown();
        }

        @Nullable
        public String getTaskType() {
            throw new UnsupportedOperationException();
        }

        public String getDataSource() {
            throw new UnsupportedOperationException();
        }

        public void setResult(TaskStatus taskStatus) {
            getResult().set(taskStatus);
        }

        public TestTaskRunnerWorkItem withState(RunnerTaskState runnerTaskState) {
            return new TestTaskRunnerWorkItem(getTaskId(), getResult(), runnerTaskState);
        }
    }

    @Before
    public void setUp() {
        EmittingLogger.registerEmitter(new NoopServiceEmitter());
        this.closer = Closer.create();
        this.taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(Period.hours(1)));
        this.taskRunner = new TestTaskRunner();
        Closer closer = this.closer;
        TestTaskRunner testTaskRunner = this.taskRunner;
        testTaskRunner.getClass();
        closer.register(testTaskRunner::stop);
        IndexerSQLMetadataStorageCoordinator indexerSQLMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(TestHelper.makeJsonMapper(), (MetadataStorageTablesConfig) this.derbyConnectorRule.metadataTablesConfigSupplier().get(), this.derbyConnectorRule.getConnector());
        this.taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig((Integer) null, Period.millis(1), (Period) null, (Period) null), new DefaultTaskConfig(), this.taskStorage, this.taskRunner, task -> {
            return new TaskActionClient() { // from class: org.apache.druid.indexing.overlord.TaskQueueScaleTest.1
                public <RetType> RetType submit(TaskAction<RetType> taskAction) {
                    throw new UnsupportedOperationException();
                }
            };
        }, new TaskLockbox(this.taskStorage, indexerSQLMetadataStorageCoordinator), new NoopServiceEmitter());
        this.taskQueue.start();
        Closer closer2 = this.closer;
        TaskQueue taskQueue = this.taskQueue;
        taskQueue.getClass();
        closer2.register(taskQueue::stop);
    }

    @After
    public void tearDown() throws Exception {
        this.closer.close();
    }

    @Test(timeout = 60000)
    public void doMassLaunchAndExit() throws Exception {
        Assert.assertEquals("no tasks should be running", 0L, this.taskRunner.getKnownTasks().size());
        Assert.assertEquals("no tasks should be known", 0L, this.taskQueue.getTasks().size());
        Assert.assertEquals("no tasks should be running", 0L, this.taskQueue.getRunningTaskCount().size());
        for (int i = 0; i < 1000; i++) {
            this.taskQueue.add(new TestTask(i, 2000L));
        }
        Assert.assertEquals("all tasks should be known", 1000L, this.taskQueue.getTasks().size());
        Assert.assertEquals("all tasks should be known", 1000L, this.taskQueue.getRunningTaskCount().values().stream().mapToLong((v0) -> {
            return v0.longValue();
        }).sum() + this.taskQueue.getPendingTaskCount().values().stream().mapToLong((v0) -> {
            return v0.longValue();
        }).sum() + this.taskQueue.getWaitingTaskCount().values().stream().mapToLong((v0) -> {
            return v0.longValue();
        }).sum());
        TaskLookup.CompleteTaskLookup of = TaskLookup.CompleteTaskLookup.of(1000, Duration.standardHours(1L));
        while (this.taskStorage.getTaskInfos(of, DATASOURCE).size() < 1000) {
            Thread.sleep(100L);
        }
        Thread.sleep(100L);
        Assert.assertEquals("no tasks should be active", 0L, this.taskStorage.getActiveTasks().size());
        long sum = this.taskQueue.getRunningTaskCount().values().stream().mapToLong((v0) -> {
            return v0.longValue();
        }).sum();
        long sum2 = this.taskQueue.getPendingTaskCount().values().stream().mapToLong((v0) -> {
            return v0.longValue();
        }).sum();
        long sum3 = this.taskQueue.getWaitingTaskCount().values().stream().mapToLong((v0) -> {
            return v0.longValue();
        }).sum();
        Assert.assertEquals("no tasks should be running", 0L, sum);
        Assert.assertEquals("no tasks should be pending", 0L, sum2);
        Assert.assertEquals("no tasks should be waiting", 0L, sum3);
    }

    @Test(timeout = 60000)
    public void doMassLaunchAndShutdown() throws Exception {
        Assert.assertEquals("no tasks should be running", 0L, this.taskRunner.getKnownTasks().size());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1000; i++) {
            TestTask testTask = new TestTask(i, Duration.standardHours(1L).getMillis());
            this.taskQueue.add(testTask);
            arrayList.add(testTask.getId());
        }
        while (this.taskStorage.getActiveTasks().size() < 1000) {
            Thread.sleep(100L);
        }
        Assert.assertEquals("all tasks should be running", 1000L, this.taskStorage.getActiveTasks().size());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.taskQueue.shutdown((String) it.next(), "test shutdown", new Object[0]);
        }
        while (!this.taskStorage.getActiveTasks().isEmpty()) {
            Thread.sleep(100L);
        }
        Assert.assertEquals("no tasks should be running", 0L, this.taskStorage.getActiveTasks().size());
        Assert.assertEquals("all tasks should have completed", 1000L, this.taskStorage.getTaskInfos(TaskLookup.CompleteTaskLookup.of(1000, Duration.standardHours(1L)), DATASOURCE).size());
    }
}
