package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexingServiceCondition;
import org.apache.druid.indexing.common.TestRealtimeTask;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerTestUtils;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.class */
public class RemoteTaskRunnerTest {
    private static final Logger LOG = new Logger(RemoteTaskRunnerTest.class);
    private static final Joiner JOINER = RemoteTaskRunnerTestUtils.JOINER;
    private static final String WORKER_HOST = "worker";
    private static final String ANNOUCEMENTS_PATH = JOINER.join(RemoteTaskRunnerTestUtils.ANNOUNCEMENTS_PATH, WORKER_HOST, new Object[0]);
    private static final String STATUS_PATH = JOINER.join(RemoteTaskRunnerTestUtils.STATUS_PATH, WORKER_HOST, new Object[0]);
    private static final Period TIMEOUT_PERIOD = Period.millis(30000);
    private RemoteTaskRunner remoteTaskRunner;
    private HttpClient httpClient;
    private ObjectMapper jsonMapper;
    private CuratorFramework cf;
    private Task task;
    private Worker worker;
    private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils();

    @Rule
    public TestRule watcher = new TestWatcher() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.1
        protected void starting(Description description) {
            RemoteTaskRunnerTest.LOG.info("Starting test: " + description.getMethodName(), new Object[0]);
        }

        protected void finished(Description description) {
            RemoteTaskRunnerTest.LOG.info("Finishing test: " + description.getMethodName(), new Object[0]);
        }
    };

    @Rule
    public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS);

    @Before
    public void setUp() throws Exception {
        this.rtrTestUtils.setUp();
        this.jsonMapper = this.rtrTestUtils.getObjectMapper();
        this.cf = this.rtrTestUtils.getCuratorFramework();
        this.task = TestTasks.unending("task id with spaces");
        EmittingLogger.registerEmitter(new NoopServiceEmitter());
    }

    @After
    public void tearDown() throws Exception {
        if (this.remoteTaskRunner != null) {
            this.remoteTaskRunner.stop();
        }
        this.rtrTestUtils.tearDown();
    }

    @Test
    public void testRun() throws Exception {
        doSetup();
        Assert.assertEquals(3L, ((Long) this.remoteTaskRunner.getTotalTaskSlotCount().get("_default_worker_category")).longValue());
        Assert.assertEquals(3L, ((Long) this.remoteTaskRunner.getIdleTaskSlotCount().get("_default_worker_category")).longValue());
        Assert.assertEquals(0L, ((Long) this.remoteTaskRunner.getUsedTaskSlotCount().get("_default_worker_category")).longValue());
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) run.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) run.get()).getStatusCode());
        ((ChildrenDeletable) this.cf.delete().guaranteed()).forPath(JOINER.join(STATUS_PATH, this.task.getId(), new Object[0]));
        Assert.assertEquals(3L, ((Long) this.remoteTaskRunner.getTotalTaskSlotCount().get("_default_worker_category")).longValue());
        Assert.assertEquals(3L, ((Long) this.remoteTaskRunner.getIdleTaskSlotCount().get("_default_worker_category")).longValue());
        Assert.assertEquals(0L, ((Long) this.remoteTaskRunner.getUsedTaskSlotCount().get("_default_worker_category")).longValue());
    }

    @Test
    public void testRunTaskThatAlreadyPending() throws Exception {
        doSetup();
        this.remoteTaskRunner.addPendingTask(this.task);
        this.remoteTaskRunner.runPendingTasks();
        Assert.assertFalse(workerRunningTask(this.task.getId()));
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) run.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) run.get()).getStatusCode());
    }

    @Test
    public void testStartWithNoWorker() {
        makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD));
    }

    @Test
    public void testRunExistingTaskThatHasntStartedRunning() throws Exception {
        doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertFalse(run.isDone());
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) run.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) run.get()).getStatusCode());
    }

    @Test
    public void testRunExistingTaskThatHasStartedRunning() throws Exception {
        doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertFalse(run.isDone());
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) run.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) run.get()).getStatusCode());
    }

    @Test
    public void testRunTooMuchZKData() throws Exception {
        ServiceEmitter serviceEmitter = (ServiceEmitter) EasyMock.createMock(ServiceEmitter.class);
        EmittingLogger.registerEmitter(serviceEmitter);
        EasyMock.replay(new Object[]{serviceEmitter});
        doSetup();
        this.remoteTaskRunner.run(TestTasks.unending(new String(new char[5000])));
        EasyMock.verify(new Object[]{serviceEmitter});
    }

    @Test
    public void testRunSameAvailabilityGroup() throws Exception {
        doSetup();
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), this.jsonMapper);
        this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        this.remoteTaskRunner.run(new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"), this.jsonMapper));
        this.remoteTaskRunner.run(new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"), this.jsonMapper));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.2
            @Override // org.apache.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRunningTasks().size() == 2;
            }
        }));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.3
            @Override // org.apache.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getPendingTasks().size() == 1;
            }
        }));
        Assert.assertTrue(((RemoteTaskRunnerWorkItem) this.remoteTaskRunner.getPendingTasks().iterator().next()).getTaskId().equals("rt2"));
    }

    @Test
    public void testRunWithCapacity() throws Exception {
        doSetup();
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), this.jsonMapper);
        this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        this.remoteTaskRunner.run(new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"), this.jsonMapper));
        TestRealtimeTask testRealtimeTask2 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"), this.jsonMapper);
        this.remoteTaskRunner.run(testRealtimeTask2);
        Assert.assertTrue(taskAnnounced(testRealtimeTask2.getId()));
        mockWorkerRunningTask(testRealtimeTask2);
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.4
            @Override // org.apache.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRunningTasks().size() == 2;
            }
        }));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.5
            @Override // org.apache.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getPendingTasks().size() == 1;
            }
        }));
        Assert.assertTrue(((RemoteTaskRunnerWorkItem) this.remoteTaskRunner.getPendingTasks().iterator().next()).getTaskId().equals("rt2"));
    }

    @Test
    public void testStatusRemoved() throws Exception {
        doSetup();
        ListenableFuture run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        Assert.assertTrue(((RemoteTaskRunnerWorkItem) this.remoteTaskRunner.getRunningTasks().iterator().next()).getTaskId().equals(this.task.getId()));
        this.cf.delete().forPath(JOINER.join(STATUS_PATH, this.task.getId(), new Object[0]));
        TaskStatus taskStatus = (TaskStatus) run.get();
        Assert.assertEquals(taskStatus.getStatusCode(), TaskState.FAILED);
        Assert.assertNotNull(taskStatus.getErrorMsg());
        Assert.assertTrue(taskStatus.getErrorMsg().contains("The worker that this task was assigned disappeared"));
    }

    @Test
    public void testBootstrap() throws Exception {
        makeWorker();
        TestRemoteTaskRunnerConfig testRemoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        testRemoteTaskRunnerConfig.setMaxPercentageBlacklistWorkers(100);
        makeRemoteTaskRunner(testRemoteTaskRunnerConfig);
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("first", new TaskResource("first", 1), "foo", TaskStatus.running("first"), this.jsonMapper);
        this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        this.remoteTaskRunner.run(new TestRealtimeTask("second", new TaskResource("task", 2), "foo", TaskStatus.running("task"), this.jsonMapper));
        TestRealtimeTask testRealtimeTask2 = new TestRealtimeTask("second", new TaskResource("second", 2), "foo", TaskStatus.running("second"), this.jsonMapper);
        this.remoteTaskRunner.run(testRealtimeTask2);
        Assert.assertTrue(taskAnnounced(testRealtimeTask2.getId()));
        mockWorkerRunningTask(testRealtimeTask2);
        Assert.assertEquals("runningTasks", ImmutableSet.of("first", "second"), Sets.newHashSet(Iterables.transform(this.remoteTaskRunner.getRunningTasks(), new Function<RemoteTaskRunnerWorkItem, String>() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.6
            public String apply(RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem) {
                return remoteTaskRunnerWorkItem.getTaskId();
            }
        })));
    }

    @Test
    public void testRunWithTaskComplete() throws Exception {
        doSetup();
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("testTask", new TaskResource("testTask", 2), "foo", TaskStatus.success("testTask"), this.jsonMapper);
        this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        mockWorkerCompleteSuccessfulTask(testRealtimeTask);
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) this.remoteTaskRunner.run(testRealtimeTask).get()).getStatusCode());
    }

    @Test
    public void testWorkerRemoved() throws Exception {
        doSetup();
        Assert.assertEquals(3L, ((Long) this.remoteTaskRunner.getTotalTaskSlotCount().get("_default_worker_category")).longValue());
        Assert.assertEquals(3L, ((Long) this.remoteTaskRunner.getIdleTaskSlotCount().get("_default_worker_category")).longValue());
        ListenableFuture run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        this.cf.delete().forPath(ANNOUCEMENTS_PATH);
        TaskStatus taskStatus = (TaskStatus) run.get();
        Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
        Assert.assertNotNull(taskStatus.getErrorMsg());
        Assert.assertTrue(taskStatus.getErrorMsg().contains("Canceled for worker cleanup"));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.7
            @Override // org.apache.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRemovedWorkerCleanups().isEmpty();
            }
        }, this.remoteTaskRunner.getRemoteTaskRunnerConfig().getTaskCleanupTimeout().toStandardDuration().getMillis() * 2));
        Assert.assertNull(this.cf.checkExists().forPath(STATUS_PATH));
        Assert.assertFalse(this.remoteTaskRunner.getTotalTaskSlotCount().containsKey("_default_worker_category"));
        Assert.assertFalse(this.remoteTaskRunner.getIdleTaskSlotCount().containsKey("_default_worker_category"));
    }

    @Test
    public void testWorkerDisabled() throws Exception {
        doSetup();
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        disableWorker();
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) run.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) run.get()).getStatusCode());
        Assert.assertEquals("", ((ImmutableWorkerInfo) Iterables.getOnlyElement(this.remoteTaskRunner.getWorkers())).getWorker().getVersion());
    }

    @Test
    public void testRestartRemoteTaskRunner() throws Exception {
        doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        this.remoteTaskRunner.stop();
        makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD));
        ListenableFuture<TaskStatus> result = ((RemoteTaskRunnerWorkItem) this.remoteTaskRunner.getKnownTasks().stream().filter(remoteTaskRunnerWorkItem -> {
            return remoteTaskRunnerWorkItem.getTaskId().equals(this.task.getId());
        }).findFirst().orElse(null)).getResult();
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(result));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) result.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) result.get()).getStatusCode());
    }

    @Test
    public void testRunPendingTaskFailToAssignTask() throws Exception {
        doSetup();
        RemoteTaskRunnerWorkItem addPendingTask = this.remoteTaskRunner.addPendingTask(this.task);
        RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem = (RemoteTaskRunnerWorkItem) Mockito.mock(RemoteTaskRunnerWorkItem.class);
        Mockito.when(remoteTaskRunnerWorkItem.getTaskId()).thenReturn(addPendingTask.getTaskId()).thenReturn("wrongId");
        this.remoteTaskRunner.runPendingTask(remoteTaskRunnerWorkItem);
        TaskStatus taskStatus = (TaskStatus) addPendingTask.getResult().get(0L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
        Assert.assertEquals("Failed to assign this task. See overlord logs for more details.", taskStatus.getErrorMsg());
    }

    @Test
    public void testRunPendingTaskTimeoutToAssign() throws Exception {
        makeWorker();
        makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD));
        RemoteTaskRunnerWorkItem addPendingTask = this.remoteTaskRunner.addPendingTask(this.task);
        this.remoteTaskRunner.runPendingTask(addPendingTask);
        TaskStatus taskStatus = (TaskStatus) addPendingTask.getResult().get(0L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
        Assert.assertNotNull(taskStatus.getErrorMsg());
        Assert.assertTrue(taskStatus.getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout"));
    }

    private void doSetup() throws Exception {
        makeWorker();
        makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD));
    }

    private void makeRemoteTaskRunner(RemoteTaskRunnerConfig remoteTaskRunnerConfig) {
        this.httpClient = (HttpClient) EasyMock.createMock(HttpClient.class);
        this.remoteTaskRunner = this.rtrTestUtils.makeRemoteTaskRunner(remoteTaskRunnerConfig, this.httpClient);
    }

    private void makeWorker() throws Exception {
        this.worker = this.rtrTestUtils.makeWorker(WORKER_HOST, 3);
    }

    private void disableWorker() throws Exception {
        this.rtrTestUtils.disableWorker(this.worker);
    }

    private boolean taskAnnounced(String str) {
        return this.rtrTestUtils.taskAnnounced(WORKER_HOST, str);
    }

    private boolean workerRunningTask(String str) {
        return this.rtrTestUtils.workerRunningTask(WORKER_HOST, str);
    }

    private boolean workerCompletedTask(final ListenableFuture<TaskStatus> listenableFuture) {
        return TestUtils.conditionValid(new IndexingServiceCondition() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.8
            @Override // org.apache.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return listenableFuture.isDone();
            }
        });
    }

    private void mockWorkerRunningTask(Task task) throws Exception {
        this.rtrTestUtils.mockWorkerRunningTask(WORKER_HOST, task);
    }

    private void mockWorkerCompleteSuccessfulTask(Task task) throws Exception {
        this.rtrTestUtils.mockWorkerCompleteSuccessfulTask(WORKER_HOST, task);
    }

    private void mockWorkerCompleteFailedTask(Task task) throws Exception {
        this.rtrTestUtils.mockWorkerCompleteFailedTask(WORKER_HOST, task);
    }

    @Test
    public void testFindLazyWorkerTaskRunning() throws Exception {
        doSetup();
        this.remoteTaskRunner.start();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(this.remoteTaskRunner.markWorkersLazy(new Predicate<ImmutableWorkerInfo>() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.9
            public boolean apply(ImmutableWorkerInfo immutableWorkerInfo) {
                return true;
            }
        }, 1).isEmpty());
        Assert.assertTrue(this.remoteTaskRunner.getLazyWorkers().isEmpty());
        Assert.assertEquals(1L, this.remoteTaskRunner.getWorkers().size());
    }

    @Test
    public void testFindLazyWorkerForWorkerJustAssignedTask() throws Exception {
        doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        Assert.assertTrue(this.remoteTaskRunner.markWorkersLazy(new Predicate<ImmutableWorkerInfo>() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.10
            public boolean apply(ImmutableWorkerInfo immutableWorkerInfo) {
                return true;
            }
        }, 1).isEmpty());
        Assert.assertTrue(this.remoteTaskRunner.getLazyWorkers().isEmpty());
        Assert.assertEquals(1L, this.remoteTaskRunner.getWorkers().size());
    }

    @Test
    public void testFindLazyWorkerNotRunningAnyTask() throws Exception {
        doSetup();
        Assert.assertEquals(1L, this.remoteTaskRunner.markWorkersLazy(new Predicate<ImmutableWorkerInfo>() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.11
            public boolean apply(ImmutableWorkerInfo immutableWorkerInfo) {
                return true;
            }
        }, 1).size());
        Assert.assertEquals(1L, this.remoteTaskRunner.getLazyWorkers().size());
        Assert.assertEquals(3L, ((Long) this.remoteTaskRunner.getTotalTaskSlotCount().get("_default_worker_category")).longValue());
        Assert.assertFalse(this.remoteTaskRunner.getIdleTaskSlotCount().containsKey("_default_worker_category"));
        Assert.assertEquals(3L, ((Long) this.remoteTaskRunner.getLazyTaskSlotCount().get("_default_worker_category")).longValue());
    }

    @Test
    public void testFindLazyWorkerNotRunningAnyTaskButWithZeroMaxWorkers() throws Exception {
        doSetup();
        Assert.assertEquals(0L, this.remoteTaskRunner.markWorkersLazy(new Predicate<ImmutableWorkerInfo>() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.12
            public boolean apply(ImmutableWorkerInfo immutableWorkerInfo) {
                return true;
            }
        }, 0).size());
        Assert.assertEquals(0L, this.remoteTaskRunner.getLazyWorkers().size());
    }

    @Test
    public void testWorkerZKReconnect() throws Exception {
        makeWorker();
        makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5M")));
        ListenableFuture run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        byte[] bArr = (byte[]) this.cf.getData().forPath(ANNOUCEMENTS_PATH);
        this.cf.delete().forPath(ANNOUCEMENTS_PATH);
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.13
            @Override // org.apache.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRemovedWorkerCleanups().containsKey(RemoteTaskRunnerTest.this.worker.getHost());
            }
        }));
        this.cf.create().forPath(ANNOUCEMENTS_PATH, bArr);
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunnerTest.14
            @Override // org.apache.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return !RemoteTaskRunnerTest.this.remoteTaskRunner.getRemovedWorkerCleanups().containsKey(RemoteTaskRunnerTest.this.worker.getHost());
            }
        }));
        mockWorkerCompleteSuccessfulTask(this.task);
        TaskStatus taskStatus = (TaskStatus) run.get();
        Assert.assertEquals(taskStatus.getStatusCode(), TaskState.SUCCESS);
        Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
    }

    @Test
    public void testSortByInsertionTime() {
        RemoteTaskRunnerWorkItem withQueueInsertionTime = new RemoteTaskRunnerWorkItem("b", "t", (Worker) null, (TaskLocation) null, "ds_test").withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:03Z"));
        RemoteTaskRunnerWorkItem withQueueInsertionTime2 = new RemoteTaskRunnerWorkItem("a", "t", (Worker) null, (TaskLocation) null, "ds_test").withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:02Z"));
        RemoteTaskRunnerWorkItem withQueueInsertionTime3 = new RemoteTaskRunnerWorkItem("c", "t", (Worker) null, (TaskLocation) null, "ds_test").withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:01Z"));
        ArrayList newArrayList = Lists.newArrayList(new RemoteTaskRunnerWorkItem[]{withQueueInsertionTime, withQueueInsertionTime2, withQueueInsertionTime3});
        RemoteTaskRunner.sortByInsertionTime(newArrayList);
        Assert.assertEquals(withQueueInsertionTime3, newArrayList.get(0));
        Assert.assertEquals(withQueueInsertionTime2, newArrayList.get(1));
        Assert.assertEquals(withQueueInsertionTime, newArrayList.get(2));
    }

    @Test
    public void testBlacklistZKWorkers() throws Exception {
        makeWorker();
        TestRemoteTaskRunnerConfig testRemoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        testRemoteTaskRunnerConfig.setMaxPercentageBlacklistWorkers(100);
        makeRemoteTaskRunner(testRemoteTaskRunnerConfig);
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success("realtime1"), this.jsonMapper);
        ListenableFuture run = this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        mockWorkerCompleteFailedTask(testRealtimeTask);
        Assert.assertTrue(((TaskStatus) run.get()).isFailure());
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals(1L, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask.getId()).getContinuouslyFailedTasksCount());
        TestRealtimeTask testRealtimeTask2 = new TestRealtimeTask("realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.running("realtime2"), this.jsonMapper);
        ListenableFuture run2 = this.remoteTaskRunner.run(testRealtimeTask2);
        Assert.assertTrue(taskAnnounced(testRealtimeTask2.getId()));
        mockWorkerRunningTask(testRealtimeTask2);
        mockWorkerCompleteFailedTask(testRealtimeTask2);
        Assert.assertTrue(((TaskStatus) run2.get()).isFailure());
        Assert.assertEquals(1L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals(2L, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask2.getId()).getContinuouslyFailedTasksCount());
        ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) this.remoteTaskRunner).setCurrentTimeMillis(System.currentTimeMillis());
        this.remoteTaskRunner.checkBlackListedNodes();
        Assert.assertEquals(1L, this.remoteTaskRunner.getBlackListedWorkers().size());
        ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) this.remoteTaskRunner).setCurrentTimeMillis(System.currentTimeMillis() + (2 * TIMEOUT_PERIOD.toStandardDuration().getMillis()));
        this.remoteTaskRunner.checkBlackListedNodes();
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals(0L, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask2.getId()).getContinuouslyFailedTasksCount());
        TestRealtimeTask testRealtimeTask3 = new TestRealtimeTask("realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.running("realtime3"), this.jsonMapper);
        ListenableFuture run3 = this.remoteTaskRunner.run(testRealtimeTask3);
        Assert.assertTrue(taskAnnounced(testRealtimeTask3.getId()));
        mockWorkerRunningTask(testRealtimeTask3);
        mockWorkerCompleteSuccessfulTask(testRealtimeTask3);
        Assert.assertTrue(((TaskStatus) run3.get()).isSuccess());
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals(0L, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask3.getId()).getContinuouslyFailedTasksCount());
    }

    @Test
    public void testBlacklistZKWorkers25Percent() throws Exception {
        this.rtrTestUtils.makeWorker(WORKER_HOST, 10);
        this.rtrTestUtils.makeWorker("worker2", 10);
        TestRemoteTaskRunnerConfig testRemoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        testRemoteTaskRunnerConfig.setMaxPercentageBlacklistWorkers(25);
        makeRemoteTaskRunner(testRemoteTaskRunnerConfig);
        String str = null;
        String str2 = null;
        for (int i = 1; i < 13; i++) {
            String format = StringUtils.format("rt-%d", new Object[]{Integer.valueOf(i)});
            Task testRealtimeTask = new TestRealtimeTask(format, new TaskResource(format, 1), "foo", TaskStatus.success(format), this.jsonMapper);
            ListenableFuture run = this.remoteTaskRunner.run(testRealtimeTask);
            if (i == 1) {
                if (this.rtrTestUtils.taskAnnounced("worker2", testRealtimeTask.getId())) {
                    str = "worker2";
                    str2 = WORKER_HOST;
                } else {
                    str = WORKER_HOST;
                    str2 = "worker2";
                }
            }
            String str3 = i % 2 == 0 ? str2 : str;
            Assert.assertTrue(this.rtrTestUtils.taskAnnounced(str3, testRealtimeTask.getId()));
            this.rtrTestUtils.mockWorkerRunningTask(str3, testRealtimeTask);
            this.rtrTestUtils.mockWorkerCompleteFailedTask(str3, testRealtimeTask);
            Assert.assertTrue(((TaskStatus) run.get()).isFailure());
            Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
            Assert.assertEquals((i + 1) / 2, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask.getId()).getContinuouslyFailedTasksCount());
        }
    }

    @Test
    public void testBlacklistZKWorkers50Percent() throws Exception {
        this.rtrTestUtils.makeWorker(WORKER_HOST, 10);
        this.rtrTestUtils.makeWorker("worker2", 10);
        TestRemoteTaskRunnerConfig testRemoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        testRemoteTaskRunnerConfig.setMaxPercentageBlacklistWorkers(50);
        makeRemoteTaskRunner(testRemoteTaskRunnerConfig);
        String str = null;
        String str2 = null;
        int i = 1;
        while (i < 13) {
            String format = StringUtils.format("rt-%d", new Object[]{Integer.valueOf(i)});
            Task testRealtimeTask = new TestRealtimeTask(format, new TaskResource(format, 1), "foo", TaskStatus.success(format), this.jsonMapper);
            ListenableFuture run = this.remoteTaskRunner.run(testRealtimeTask);
            if (i == 1) {
                if (this.rtrTestUtils.taskAnnounced("worker2", testRealtimeTask.getId())) {
                    str = "worker2";
                    str2 = WORKER_HOST;
                } else {
                    str = WORKER_HOST;
                    str2 = "worker2";
                }
            }
            String str3 = (i % 2 == 0 || i > 4) ? str2 : str;
            Assert.assertTrue(this.rtrTestUtils.taskAnnounced(str3, testRealtimeTask.getId()));
            this.rtrTestUtils.mockWorkerRunningTask(str3, testRealtimeTask);
            this.rtrTestUtils.mockWorkerCompleteFailedTask(str3, testRealtimeTask);
            Assert.assertTrue(((TaskStatus) run.get()).isFailure());
            Assert.assertEquals(i > 2 ? 1L : 0L, this.remoteTaskRunner.getBlackListedWorkers().size());
            Assert.assertEquals(i > 4 ? i - 2 : (i + 1) / 2, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask.getId()).getContinuouslyFailedTasksCount());
            i++;
        }
    }

    @Test
    public void testSuccessfulTaskOnBlacklistedWorker() throws Exception {
        makeWorker();
        TestRemoteTaskRunnerConfig testRemoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        testRemoteTaskRunnerConfig.setMaxPercentageBlacklistWorkers(100);
        makeRemoteTaskRunner(testRemoteTaskRunnerConfig);
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success("realtime1"), this.jsonMapper);
        TestRealtimeTask testRealtimeTask2 = new TestRealtimeTask("realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.success("realtime2"), this.jsonMapper);
        TestRealtimeTask testRealtimeTask3 = new TestRealtimeTask("realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.success("realtime3"), this.jsonMapper);
        ListenableFuture run = this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        mockWorkerCompleteFailedTask(testRealtimeTask);
        Assert.assertTrue(((TaskStatus) run.get()).isFailure());
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertFalse(this.remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey("_default_worker_category"));
        ListenableFuture run2 = this.remoteTaskRunner.run(testRealtimeTask2);
        Assert.assertTrue(taskAnnounced(testRealtimeTask2.getId()));
        mockWorkerRunningTask(testRealtimeTask2);
        Assert.assertFalse(this.remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey("_default_worker_category"));
        ListenableFuture run3 = this.remoteTaskRunner.run(testRealtimeTask3);
        Assert.assertTrue(taskAnnounced(testRealtimeTask3.getId()));
        mockWorkerRunningTask(testRealtimeTask3);
        mockWorkerCompleteFailedTask(testRealtimeTask3);
        Assert.assertTrue(((TaskStatus) run3.get()).isFailure());
        Assert.assertEquals(1L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals(3L, ((Long) this.remoteTaskRunner.getBlacklistedTaskSlotCount().get("_default_worker_category")).longValue());
        mockWorkerCompleteSuccessfulTask(testRealtimeTask2);
        Assert.assertTrue(((TaskStatus) run2.get()).isSuccess());
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertFalse(this.remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey("_default_worker_category"));
    }

    @Test
    public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception {
        Worker worker = (Worker) EasyMock.createMock(Worker.class);
        EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce();
        EasyMock.replay(new Object[]{worker});
        ServiceEmitter serviceEmitter = (ServiceEmitter) EasyMock.createMock(ServiceEmitter.class);
        Capture newInstance = Capture.newInstance();
        serviceEmitter.emit((ServiceEventBuilder) EasyMock.capture(newInstance));
        EasyMock.expectLastCall().atLeastOnce();
        EmittingLogger.registerEmitter(serviceEmitter);
        EasyMock.replay(new Object[]{serviceEmitter});
        PathChildrenCache pathChildrenCache = new PathChildrenCache(this.cf, "/test", true);
        testStartWithNoWorker();
        pathChildrenCache.getListenable().addListener(this.remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, pathChildrenCache, this.jsonMapper), (SettableFuture) null));
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        Assert.assertTrue(TestUtils.conditionValid(() -> {
            return pathChildrenCache.getCurrentData().size() == 1;
        }));
        EasyMock.verify(new Object[]{worker});
        EasyMock.verify(new Object[]{serviceEmitter});
        Map dataMap = ((EmittingLogger.EmittingAlertBuilder) newInstance.getValue()).build((ImmutableMap) null).getDataMap();
        Assert.assertTrue(dataMap.containsKey("znode"));
        Assert.assertNull(dataMap.get("znode"));
    }

    @Test
    public void testStreamTaskReportsUnknownTask() throws Exception {
        doSetup();
        Assert.assertEquals(Optional.absent(), this.remoteTaskRunner.streamTaskReports("foo"));
    }

    @Test
    public void testStreamTaskReportsKnownTask() throws Exception {
        doSetup();
        Capture newInstance = Capture.newInstance();
        EasyMock.expect(this.httpClient.go((Request) EasyMock.capture(newInstance), (HttpResponseHandler) EasyMock.anyObject())).andReturn(Futures.immediateFuture(new ByteArrayInputStream(StringUtils.toUtf8("my report!"))));
        EasyMock.replay(new Object[]{this.httpClient});
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(TestUtils.conditionValid(() -> {
            return (this.remoteTaskRunner.getRunningTasks().isEmpty() || ((RemoteTaskRunnerWorkItem) Iterables.getOnlyElement(this.remoteTaskRunner.getRunningTasks())).getLocation().equals(TaskLocation.unknown())) ? false : true;
        }));
        InputStream inputStream = (InputStream) this.remoteTaskRunner.streamTaskReports(this.task.getId()).get();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ByteStreams.copy(inputStream, byteArrayOutputStream);
        Assert.assertEquals("my report!", StringUtils.fromUtf8(byteArrayOutputStream.toByteArray()));
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(Optional.absent(), this.remoteTaskRunner.streamTaskReports(this.task.getId()));
        EasyMock.verify(new Object[]{this.httpClient});
        Assert.assertEquals("http://dummy:9000/druid/worker/v1/chat/task%20id%20with%20spaces/liveReports", ((Request) newInstance.getValue()).getUrl().toString());
    }
}
