package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.class */
public class TaskMonitorTest {
    private static final int SPLIT_NUM = 10;
    private final ExecutorService taskRunner = Execs.multiThreaded(5, "task-monitor-test-%d");
    private final ConcurrentMap<String, TaskState> tasks = new ConcurrentHashMap();
    private final TaskMonitor<TestTask, SimpleSubTaskReport> monitor = new TaskMonitor<>(new TestOverlordClient(), 3, SPLIT_NUM);

    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest$IntegerInputSplit.class */
    private static class IntegerInputSplit extends InputSplit<Integer> {
        IntegerInputSplit(int i) {
            super(Integer.valueOf(i));
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest$SimpleSubTaskReport.class */
    private static class SimpleSubTaskReport implements SubTaskReport {
        private final String taskId;

        private SimpleSubTaskReport(String str) {
            this.taskId = str;
        }

        public String getTaskId() {
            return this.taskId;
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest$TestOverlordClient.class */
    private class TestOverlordClient extends NoopOverlordClient {
        private TestOverlordClient() {
        }

        public ListenableFuture<Void> runTask(String str, Object obj) {
            TestTask testTask = (TestTask) obj;
            TaskMonitorTest.this.tasks.put(testTask.getId(), TaskState.RUNNING);
            if (testTask.throwUnknownTypeIdError) {
                throw new RuntimeException((Throwable) new ISE("Could not resolve type id 'test_task_id'", new Object[0]));
            }
            TaskMonitorTest.this.taskRunner.submit(() -> {
                return (TaskState) TaskMonitorTest.this.tasks.put(testTask.getId(), testTask.run(null).getStatusCode());
            });
            return Futures.immediateFuture((Object) null);
        }

        public ListenableFuture<TaskStatusResponse> taskStatus(String str) {
            return Futures.immediateFuture(new TaskStatusResponse(str, new TaskStatusPlus(str, "groupId", "testTask", DateTimes.EPOCH, DateTimes.EPOCH, (TaskState) TaskMonitorTest.this.tasks.get(str), RunnerTaskState.RUNNING, -1L, TaskLocation.unknown(), "testDataSource", (String) null)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest$TestTask.class */
    public class TestTask extends NoopTask {
        private final boolean shouldFail;
        private final boolean throwUnknownTypeIdError;

        TestTask(String str, long j, boolean z, boolean z2) {
            super(str, (String) null, "testDataSource", j, 0L, (String) null, (FirehoseFactory) null, (Map) null);
            this.shouldFail = z;
            this.throwUnknownTypeIdError = z2;
        }

        @Nullable
        public String setup(TaskToolbox taskToolbox) {
            return null;
        }

        public void cleanUp(TaskToolbox taskToolbox, TaskStatus taskStatus) {
        }

        public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
            TaskMonitorTest.this.monitor.collectReport(new SimpleSubTaskReport(getId()));
            if (!this.shouldFail) {
                return super.runTask(taskToolbox);
            }
            Thread.sleep(getRunTime());
            return TaskStatus.failure(getId(), "Dummy task status failure for testing");
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest$TestTaskSpec.class */
    private class TestTaskSpec extends SubTaskSpec<TestTask> {
        private final long runTime;
        private final int numMaxFails;
        private final boolean throwUnknownTypeIdError;
        private int numFails;

        TestTaskSpec(String str, String str2, String str3, Map<String, Object> map, InputSplit inputSplit, long j, int i, boolean z) {
            super(str, str2, str3, map, inputSplit);
            this.runTime = j;
            this.numMaxFails = i;
            this.throwUnknownTypeIdError = z;
        }

        /* renamed from: newSubTask, reason: merged with bridge method [inline-methods] */
        public TestTask m71newSubTask(int i) {
            TaskMonitorTest taskMonitorTest = TaskMonitorTest.this;
            String id = getId();
            long j = this.runTime;
            int i2 = this.numFails;
            this.numFails = i2 + 1;
            return new TestTask(id, j, i2 < this.numMaxFails, this.throwUnknownTypeIdError);
        }

        /* renamed from: newSubTaskWithBackwardCompatibleType, reason: merged with bridge method [inline-methods] */
        public TestTask m70newSubTaskWithBackwardCompatibleType(int i) {
            TaskMonitorTest taskMonitorTest = TaskMonitorTest.this;
            String id = getId();
            long j = this.runTime;
            int i2 = this.numFails;
            this.numFails = i2 + 1;
            return new TestTask(id, j, i2 < this.numMaxFails, false);
        }
    }

    @Before
    public void setup() {
        this.tasks.clear();
        this.monitor.start(100L);
    }

    @After
    public void teardown() {
        this.monitor.stop();
        this.taskRunner.shutdownNow();
    }

    @Test
    public void testBasic() throws InterruptedException, ExecutionException, TimeoutException {
        List list = (List) IntStream.range(0, SPLIT_NUM).mapToObj(i -> {
            return this.monitor.submit(new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 0, false));
        }).collect(Collectors.toList());
        for (int i2 = 0; i2 < list.size(); i2++) {
            TaskMonitor.SubTaskCompleteEvent subTaskCompleteEvent = (TaskMonitor.SubTaskCompleteEvent) ((ListenableFuture) list.get(i2)).get(1L, TimeUnit.SECONDS);
            Assert.assertEquals("supervisorId", subTaskCompleteEvent.getSpec().getSupervisorTaskId());
            Assert.assertEquals("specId" + i2, subTaskCompleteEvent.getSpec().getId());
            Assert.assertNotNull(subTaskCompleteEvent.getLastStatus());
            Assert.assertEquals(TaskState.SUCCESS, subTaskCompleteEvent.getLastStatus().getStatusCode());
            Assert.assertEquals(TaskState.SUCCESS, subTaskCompleteEvent.getLastState());
        }
    }

    @Test
    public void testRetry() throws InterruptedException, ExecutionException, TimeoutException {
        List list = (List) IntStream.range(0, SPLIT_NUM).mapToObj(i -> {
            return new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 2, false);
        }).collect(Collectors.toList());
        Stream stream = list.stream();
        TaskMonitor<TestTask, SimpleSubTaskReport> taskMonitor = this.monitor;
        taskMonitor.getClass();
        List list2 = (List) stream.map((v1) -> {
            return r1.submit(v1);
        }).collect(Collectors.toList());
        for (int i2 = 0; i2 < list2.size(); i2++) {
            TaskMonitor.SubTaskCompleteEvent subTaskCompleteEvent = (TaskMonitor.SubTaskCompleteEvent) ((ListenableFuture) list2.get(i2)).get(2L, TimeUnit.SECONDS);
            Assert.assertEquals("supervisorId", subTaskCompleteEvent.getSpec().getSupervisorTaskId());
            Assert.assertEquals("specId" + i2, subTaskCompleteEvent.getSpec().getId());
            Assert.assertNotNull(subTaskCompleteEvent.getLastStatus());
            Assert.assertEquals(TaskState.SUCCESS, subTaskCompleteEvent.getLastStatus().getStatusCode());
            Assert.assertEquals(TaskState.SUCCESS, subTaskCompleteEvent.getLastState());
            TaskHistory completeSubTaskSpecHistory = this.monitor.getCompleteSubTaskSpecHistory(((TestTaskSpec) list.get(i2)).getId());
            Assert.assertNotNull(completeSubTaskSpecHistory);
            List attemptHistory = completeSubTaskSpecHistory.getAttemptHistory();
            Assert.assertNotNull(attemptHistory);
            Assert.assertEquals(3L, attemptHistory.size());
            Assert.assertEquals(TaskState.FAILED, ((TaskStatusPlus) attemptHistory.get(0)).getStatusCode());
            Assert.assertEquals(TaskState.FAILED, ((TaskStatusPlus) attemptHistory.get(1)).getStatusCode());
        }
    }

    @Test
    public void testResubmitWithOldType() throws InterruptedException, ExecutionException, TimeoutException {
        List list = (List) IntStream.range(0, SPLIT_NUM).mapToObj(i -> {
            return new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 0, true);
        }).collect(Collectors.toList());
        Stream stream = list.stream();
        TaskMonitor<TestTask, SimpleSubTaskReport> taskMonitor = this.monitor;
        taskMonitor.getClass();
        List list2 = (List) stream.map((v1) -> {
            return r1.submit(v1);
        }).collect(Collectors.toList());
        for (int i2 = 0; i2 < list2.size(); i2++) {
            TaskMonitor.SubTaskCompleteEvent subTaskCompleteEvent = (TaskMonitor.SubTaskCompleteEvent) ((ListenableFuture) list2.get(i2)).get(2L, TimeUnit.SECONDS);
            Assert.assertEquals("supervisorId", subTaskCompleteEvent.getSpec().getSupervisorTaskId());
            Assert.assertEquals("specId" + i2, subTaskCompleteEvent.getSpec().getId());
            Assert.assertNotNull(subTaskCompleteEvent.getLastStatus());
            Assert.assertEquals(TaskState.SUCCESS, subTaskCompleteEvent.getLastStatus().getStatusCode());
            Assert.assertEquals(TaskState.SUCCESS, subTaskCompleteEvent.getLastState());
            TaskHistory completeSubTaskSpecHistory = this.monitor.getCompleteSubTaskSpecHistory(((TestTaskSpec) list.get(i2)).getId());
            Assert.assertNotNull(completeSubTaskSpecHistory);
            List attemptHistory = completeSubTaskSpecHistory.getAttemptHistory();
            Assert.assertNotNull(attemptHistory);
            Assert.assertEquals(1L, attemptHistory.size());
            Assert.assertEquals(TaskState.SUCCESS, ((TaskStatusPlus) attemptHistory.get(0)).getStatusCode());
        }
    }
}
