package org.apache.druid.indexing.overlord;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.ISE;
import org.apache.zookeeper.ZooKeeper;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.class */
public class RemoteTaskRunnerRunPendingTasksConcurrencyTest {
    private RemoteTaskRunner remoteTaskRunner;
    private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils();

    @Before
    public void setUp() throws Exception {
        this.rtrTestUtils.setUp();
    }

    @After
    public void tearDown() throws Exception {
        if (this.remoteTaskRunner != null) {
            this.remoteTaskRunner.stop();
        }
        this.rtrTestUtils.tearDown();
    }

    @Test(timeout = 60000)
    public void testConcurrency() throws Exception {
        this.rtrTestUtils.makeWorker("worker0", 3);
        this.rtrTestUtils.makeWorker("worker1", 3);
        this.remoteTaskRunner = this.rtrTestUtils.makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT3600S")) { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerRunPendingTasksConcurrencyTest.1
            public int getPendingTasksRunnerNumThreads() {
                return 2;
            }
        });
        ListenableFuture[] listenableFutureArr = new ListenableFuture[6];
        Task[] taskArr = new Task[6];
        for (int i = 0; i < 2; i++) {
            taskArr[i] = TestTasks.unending("task" + i);
            listenableFutureArr[i] = this.remoteTaskRunner.run(taskArr[i]);
        }
        waitForBothWorkersToHaveUnackedTasks();
        for (int i2 = 2; i2 < 5; i2++) {
            taskArr[i2] = TestTasks.unending("task" + i2);
            listenableFutureArr[i2] = this.remoteTaskRunner.run(taskArr[i2]);
        }
        mockWorkerRunningAndCompletionSuccessfulTasks(taskArr[0], taskArr[1]);
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) listenableFutureArr[0].get()).getStatusCode());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) listenableFutureArr[1].get()).getStatusCode());
        waitForBothWorkersToHaveUnackedTasks();
        if (this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(taskArr[2].getId()) && this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(taskArr[3].getId())) {
            this.remoteTaskRunner.shutdown("task4", IngestionTestBase.DATA_SOURCE);
            mockWorkerRunningAndCompletionSuccessfulTasks(taskArr[3], taskArr[2]);
            Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) listenableFutureArr[3].get()).getStatusCode());
            Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) listenableFutureArr[2].get()).getStatusCode());
        } else if (this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(taskArr[3].getId()) && this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(taskArr[4].getId())) {
            this.remoteTaskRunner.shutdown("task2", IngestionTestBase.DATA_SOURCE);
            mockWorkerRunningAndCompletionSuccessfulTasks(taskArr[4], taskArr[3]);
            Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) listenableFutureArr[4].get()).getStatusCode());
            Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) listenableFutureArr[3].get()).getStatusCode());
        } else {
            if (!this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(taskArr[4].getId()) || !this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(taskArr[2].getId())) {
                throw new ISE("two out of three tasks 2,3 and 4 must be waiting for ack.", new Object[0]);
            }
            this.remoteTaskRunner.shutdown("task3", IngestionTestBase.DATA_SOURCE);
            mockWorkerRunningAndCompletionSuccessfulTasks(taskArr[4], taskArr[2]);
            Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) listenableFutureArr[4].get()).getStatusCode());
            Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) listenableFutureArr[2].get()).getStatusCode());
        }
        taskArr[5] = TestTasks.unending("task5");
        listenableFutureArr[5] = this.remoteTaskRunner.run(taskArr[5]);
        waitForOneWorkerToHaveUnackedTasks();
        if (this.rtrTestUtils.taskAnnounced("worker0", taskArr[5].getId())) {
            this.rtrTestUtils.mockWorkerRunningTask("worker0", taskArr[5]);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", taskArr[5]);
        } else {
            this.rtrTestUtils.mockWorkerRunningTask("worker1", taskArr[5]);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", taskArr[5]);
        }
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) listenableFutureArr[5].get()).getStatusCode());
    }

    private void mockWorkerRunningAndCompletionSuccessfulTasks(Task task, Task task2) throws Exception {
        if (this.rtrTestUtils.taskAnnounced("worker0", task.getId())) {
            this.rtrTestUtils.mockWorkerRunningTask("worker0", task);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", task);
            this.rtrTestUtils.mockWorkerRunningTask("worker1", task2);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", task2);
            return;
        }
        this.rtrTestUtils.mockWorkerRunningTask("worker1", task);
        this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", task);
        this.rtrTestUtils.mockWorkerRunningTask("worker0", task2);
        this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", task2);
    }

    private void waitForOneWorkerToHaveUnackedTasks() throws Exception {
        while (this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 1) {
            Thread.sleep(5L);
        }
        ZooKeeper zooKeeper = this.rtrTestUtils.getCuratorFramework().getZookeeperClient().getZooKeeper();
        while (true) {
            StringBuilder sb = new StringBuilder();
            RemoteTaskRunnerTestUtils remoteTaskRunnerTestUtils = this.rtrTestUtils;
            if (zooKeeper.getChildren(sb.append(RemoteTaskRunnerTestUtils.tasksPath).append("/worker0").toString(), false).size() >= 1) {
                return;
            }
            StringBuilder sb2 = new StringBuilder();
            RemoteTaskRunnerTestUtils remoteTaskRunnerTestUtils2 = this.rtrTestUtils;
            if (zooKeeper.getChildren(sb2.append(RemoteTaskRunnerTestUtils.tasksPath).append("/worker1").toString(), false).size() >= 1) {
                return;
            } else {
                Thread.sleep(5L);
            }
        }
    }

    private void waitForBothWorkersToHaveUnackedTasks() throws Exception {
        while (this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) {
            Thread.sleep(5L);
        }
        ZooKeeper zooKeeper = this.rtrTestUtils.getCuratorFramework().getZookeeperClient().getZooKeeper();
        while (true) {
            StringBuilder sb = new StringBuilder();
            RemoteTaskRunnerTestUtils remoteTaskRunnerTestUtils = this.rtrTestUtils;
            if (zooKeeper.getChildren(sb.append(RemoteTaskRunnerTestUtils.tasksPath).append("/worker0").toString(), false).size() >= 1) {
                StringBuilder sb2 = new StringBuilder();
                RemoteTaskRunnerTestUtils remoteTaskRunnerTestUtils2 = this.rtrTestUtils;
                if (zooKeeper.getChildren(sb2.append(RemoteTaskRunnerTestUtils.tasksPath).append("/worker1").toString(), false).size() >= 1) {
                    return;
                }
            }
            Thread.sleep(5L);
        }
    }
}
