/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.coordinator;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import jakarta.ws.rs.NotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.CapturingCommandRunner;
import org.apache.kafka.trogdor.common.ExpectedTasks;
import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStateType;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
import org.apache.kafka.trogdor.task.SampleTaskSpec;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(value="integration")
@Timeout(value=240L)
public class CoordinatorTest {
    private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class);

    @Test
    public void testCoordinatorStatus() throws Exception {
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").build();){
            CoordinatorStatusResponse status = cluster.coordinatorClient().status();
            Assertions.assertEquals((Object)cluster.coordinator().status(), (Object)status);
        }
    }

    @Test
    public void testCoordinatorUptime() throws Exception {
        MockTime time = new MockTime(0L, 200L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").scheduler((Scheduler)scheduler).build();){
            UptimeResponse uptime = cluster.coordinatorClient().uptime();
            Assertions.assertEquals((Object)cluster.coordinator().uptime(), (Object)uptime);
            time.setCurrentTimeMs(250L);
            Assertions.assertNotEquals((Object)cluster.coordinator().uptime(), (Object)uptime);
        }
    }

    @Test
    public void testCreateTask() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node02").scheduler((Scheduler)scheduler).build();){
            new ExpectedTasks().waitFor(cluster.coordinatorClient());
            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1L, 2L);
            cluster.coordinatorClient().createTask(new CreateTaskRequest("foo", (TaskSpec)fooSpec));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskPending((TaskSpec)fooSpec)).build()).waitFor(cluster.coordinatorClient());
            cluster.coordinatorClient().createTask(new CreateTaskRequest("foo", (TaskSpec)fooSpec));
            NoOpTaskSpec barSpec = new NoOpTaskSpec(1000L, 2000L);
            Assertions.assertThrows(RequestConflictException.class, () -> cluster.coordinatorClient().createTask(new CreateTaskRequest("foo", (TaskSpec)barSpec)), (String)"Recreating task with different task spec is not allowed");
            time.sleep(2L);
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).build()).waitFor(cluster.coordinatorClient()).waitFor(cluster.agentClient("node02"));
            time.sleep(3L);
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskDone((TaskSpec)fooSpec, 2L, 5L, "", false, (JsonNode)new TextNode("done"))).build()).waitFor(cluster.coordinatorClient());
        }
    }

    @Test
    public void testTaskDistribution() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node01").addAgent("node02").scheduler((Scheduler)scheduler).build();){
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            AgentClient agentClient1 = cluster.agentClient("node01");
            AgentClient agentClient2 = cluster.agentClient("node02");
            new ExpectedTasks().waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            NoOpTaskSpec fooSpec = new NoOpTaskSpec(5L, 7L);
            coordinatorClient.createTask(new CreateTaskRequest("foo", (TaskSpec)fooSpec));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskPending((TaskSpec)fooSpec)).build()).waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            time.sleep(11L);
            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
            status1.set("node01", (JsonNode)new TextNode("active"));
            status1.set("node02", (JsonNode)new TextNode("active"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 11L, (JsonNode)status1)).workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 11L, (JsonNode)new TextNode("active"))).build()).waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            time.sleep(7L);
            ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
            status2.set("node01", (JsonNode)new TextNode("done"));
            status2.set("node02", (JsonNode)new TextNode("done"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskDone((TaskSpec)fooSpec, 11L, 18L, "", false, (JsonNode)status2)).workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, 11L, 18L, (JsonNode)new TextNode("done"), "")).build()).waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
        }
    }

    @Test
    public void testTaskCancellation() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node01").addAgent("node02").scheduler((Scheduler)scheduler).build();){
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            AgentClient agentClient1 = cluster.agentClient("node01");
            AgentClient agentClient2 = cluster.agentClient("node02");
            new ExpectedTasks().waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            NoOpTaskSpec fooSpec = new NoOpTaskSpec(5L, 7L);
            coordinatorClient.createTask(new CreateTaskRequest("foo", (TaskSpec)fooSpec));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskPending((TaskSpec)fooSpec)).build()).waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            time.sleep(11L);
            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
            status1.set("node01", (JsonNode)new TextNode("active"));
            status1.set("node02", (JsonNode)new TextNode("active"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 11L, (JsonNode)status1)).workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 11L, (JsonNode)new TextNode("active"))).build()).waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
            status2.set("node01", (JsonNode)new TextNode("done"));
            status2.set("node02", (JsonNode)new TextNode("done"));
            time.sleep(7L);
            coordinatorClient.stopTask(new StopTaskRequest("foo"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskDone((TaskSpec)fooSpec, 11L, 18L, "", true, (JsonNode)status2)).workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, 11L, 18L, (JsonNode)new TextNode("done"), "")).build()).waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
            new ExpectedTasks().waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
        }
    }

    @Test
    public void testTaskDestruction() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node01").addAgent("node02").scheduler((Scheduler)scheduler).build();){
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            AgentClient agentClient1 = cluster.agentClient("node01");
            AgentClient agentClient2 = cluster.agentClient("node02");
            new ExpectedTasks().waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            NoOpTaskSpec fooSpec = new NoOpTaskSpec(2L, 12L);
            coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
            coordinatorClient.createTask(new CreateTaskRequest("foo", (TaskSpec)fooSpec));
            NoOpTaskSpec barSpec = new NoOpTaskSpec(20L, 20L);
            coordinatorClient.createTask(new CreateTaskRequest("bar", (TaskSpec)barSpec));
            coordinatorClient.destroyTask(new DestroyTaskRequest("bar"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskPending((TaskSpec)fooSpec)).build()).waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            time.sleep(10L);
            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
            status1.set("node01", (JsonNode)new TextNode("active"));
            status1.set("node02", (JsonNode)new TextNode("active"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 10L, (JsonNode)status1)).build()).waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
            coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
            new ExpectedTasks().waitFor(coordinatorClient).waitFor(agentClient1).waitFor(agentClient2);
        }
    }

    private static List<List<String>> createPartitionLists(String[][] array) {
        ArrayList<List<String>> list = new ArrayList<List<String>>();
        for (String[] a : array) {
            list.add(Arrays.asList(a));
        }
        return list;
    }

    @Test
    public void testNetworkPartitionFault() throws Exception {
        CapturingCommandRunner runner = new CapturingCommandRunner();
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node01").addAgent("node02").addAgent("node03").commandRunner(runner).scheduler((Scheduler)scheduler).build();){
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            NetworkPartitionFaultSpec spec = new NetworkPartitionFaultSpec(0L, Long.MAX_VALUE, CoordinatorTest.createPartitionLists(new String[][]{{"node01", "node02"}, {"node03"}}));
            coordinatorClient.createTask(new CreateTaskRequest("netpart", (TaskSpec)spec));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("netpart").taskSpec((TaskSpec)spec).build()).waitFor(coordinatorClient);
            this.checkLines("-A", runner);
        }
        this.checkLines("-D", runner);
    }

    private void checkLines(String prefix, CapturingCommandRunner runner) throws InterruptedException {
        new ExpectedLines().addLine("sudo iptables " + prefix + " INPUT -p tcp -s 127.0.0.1 -j DROP -m comment --comment node03").waitFor("node01", runner);
        new ExpectedLines().addLine("sudo iptables " + prefix + " INPUT -p tcp -s 127.0.0.1 -j DROP -m comment --comment node03").waitFor("node02", runner);
        new ExpectedLines().addLine("sudo iptables " + prefix + " INPUT -p tcp -s 127.0.0.1 -j DROP -m comment --comment node01").addLine("sudo iptables " + prefix + " INPUT -p tcp -s 127.0.0.1 -j DROP -m comment --comment node02").waitFor("node03", runner);
    }

    @Test
    public void testTasksRequestMatches() {
        TasksRequest req1 = new TasksRequest(null, 0L, 0L, 0L, 0L, Optional.empty());
        Assertions.assertTrue((boolean)req1.matches("foo1", -1L, -1L, TaskStateType.PENDING));
        Assertions.assertTrue((boolean)req1.matches("bar1", 100L, 200L, TaskStateType.DONE));
        Assertions.assertTrue((boolean)req1.matches("baz1", 100L, -1L, TaskStateType.RUNNING));
        TasksRequest req2 = new TasksRequest(null, 100L, 0L, 0L, 0L, Optional.empty());
        Assertions.assertFalse((boolean)req2.matches("foo1", -1L, -1L, TaskStateType.PENDING));
        Assertions.assertTrue((boolean)req2.matches("bar1", 100L, 200L, TaskStateType.DONE));
        Assertions.assertFalse((boolean)req2.matches("bar1", 99L, 200L, TaskStateType.DONE));
        Assertions.assertFalse((boolean)req2.matches("baz1", 99L, -1L, TaskStateType.RUNNING));
        TasksRequest req3 = new TasksRequest(null, 200L, 900L, 200L, 900L, Optional.empty());
        Assertions.assertFalse((boolean)req3.matches("foo1", -1L, -1L, TaskStateType.PENDING));
        Assertions.assertFalse((boolean)req3.matches("bar1", 100L, 200L, TaskStateType.DONE));
        Assertions.assertFalse((boolean)req3.matches("bar1", 200L, 1000L, TaskStateType.DONE));
        Assertions.assertTrue((boolean)req3.matches("bar1", 200L, 700L, TaskStateType.DONE));
        Assertions.assertFalse((boolean)req3.matches("baz1", 101L, -1L, TaskStateType.RUNNING));
        ArrayList<String> taskIds = new ArrayList<String>();
        taskIds.add("foo1");
        taskIds.add("bar1");
        taskIds.add("baz1");
        TasksRequest req4 = new TasksRequest(taskIds, 1000L, -1L, -1L, -1L, Optional.empty());
        Assertions.assertFalse((boolean)req4.matches("foo1", -1L, -1L, TaskStateType.PENDING));
        Assertions.assertTrue((boolean)req4.matches("foo1", 1000L, -1L, TaskStateType.RUNNING));
        Assertions.assertFalse((boolean)req4.matches("foo1", 900L, -1L, TaskStateType.RUNNING));
        Assertions.assertFalse((boolean)req4.matches("baz2", 2000L, -1L, TaskStateType.RUNNING));
        Assertions.assertFalse((boolean)req4.matches("baz2", -1L, -1L, TaskStateType.PENDING));
        TasksRequest req5 = new TasksRequest(null, 0L, 0L, 0L, 0L, Optional.of(TaskStateType.RUNNING));
        Assertions.assertTrue((boolean)req5.matches("foo1", -1L, -1L, TaskStateType.RUNNING));
        Assertions.assertFalse((boolean)req5.matches("bar1", -1L, -1L, TaskStateType.DONE));
        Assertions.assertFalse((boolean)req5.matches("baz1", -1L, -1L, TaskStateType.STOPPING));
        Assertions.assertFalse((boolean)req5.matches("baz1", -1L, -1L, TaskStateType.PENDING));
    }

    @Test
    public void testTasksRequest() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node02").scheduler((Scheduler)scheduler).build();){
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            new ExpectedTasks().waitFor(coordinatorClient);
            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1L, 10L);
            NoOpTaskSpec barSpec = new NoOpTaskSpec(3L, 1L);
            coordinatorClient.createTask(new CreateTaskRequest("foo", (TaskSpec)fooSpec));
            coordinatorClient.createTask(new CreateTaskRequest("bar", (TaskSpec)barSpec));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskPending((TaskSpec)fooSpec)).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").taskState((TaskState)new TaskPending((TaskSpec)barSpec)).build()).waitFor(coordinatorClient);
            Assertions.assertEquals((int)0, (int)coordinatorClient.tasks(new TasksRequest(null, 10L, 0L, 10L, 0L, Optional.empty())).tasks().size());
            TasksResponse resp1 = coordinatorClient.tasks(new TasksRequest(Arrays.asList("foo", "baz"), 0L, 0L, 0L, 0L, Optional.empty()));
            Assertions.assertTrue((boolean)resp1.tasks().containsKey("foo"));
            Assertions.assertFalse((boolean)resp1.tasks().containsKey("bar"));
            Assertions.assertEquals((int)1, (int)resp1.tasks().size());
            time.sleep(2L);
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").taskState((TaskState)new TaskPending((TaskSpec)barSpec)).build()).waitFor(coordinatorClient).waitFor(cluster.agentClient("node02"));
            TasksResponse resp2 = coordinatorClient.tasks(new TasksRequest(null, 1L, 0L, 0L, 0L, Optional.empty()));
            Assertions.assertTrue((boolean)resp2.tasks().containsKey("foo"));
            Assertions.assertFalse((boolean)resp2.tasks().containsKey("bar"));
            Assertions.assertEquals((int)1, (int)resp2.tasks().size());
            Assertions.assertEquals((int)0, (int)coordinatorClient.tasks(new TasksRequest(null, 3L, 0L, 0L, 0L, Optional.empty())).tasks().size());
        }
    }

    @Test
    public void testAgentFailureAndTaskExpiry() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node02").scheduler((Scheduler)scheduler).build();){
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1L, 500L);
            coordinatorClient.createTask(new CreateTaskRequest("foo", (TaskSpec)fooSpec));
            TaskState expectedState = new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskPending((TaskSpec)fooSpec)).build().taskState();
            TaskState resp = coordinatorClient.task(new TaskRequest("foo"));
            Assertions.assertEquals((Object)expectedState, (Object)resp);
            time.sleep(2L);
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).build()).waitFor(coordinatorClient).waitFor(cluster.agentClient("node02"));
            cluster.restartAgent("node02");
            time.sleep(550L);
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskDone((TaskSpec)fooSpec, 2L, 552L, "worker expired", false, null)).workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, 552L, 552L, null, "worker expired")).build()).waitFor(coordinatorClient).waitFor(cluster.agentClient("node02"));
            cluster.restartAgent("node02");
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskDone((TaskSpec)fooSpec, 2L, 552L, "worker expired", false, null)).build()).waitFor(coordinatorClient).waitFor(cluster.agentClient("node02"));
        }
    }

    @Test
    public void testTaskRequestWithOldStartMsGetsUpdated() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node02").scheduler((Scheduler)scheduler).build();){
            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1L, 500L);
            time.sleep(552L);
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            NoOpTaskSpec updatedSpec = new NoOpTaskSpec(552L, 500L);
            coordinatorClient.createTask(new CreateTaskRequest("fooSpec", (TaskSpec)fooSpec));
            TaskState expectedState = new ExpectedTasks.ExpectedTaskBuilder("fooSpec").taskState((TaskState)new TaskRunning((TaskSpec)updatedSpec, 552L, (JsonNode)new TextNode("receiving"))).build().taskState();
            TaskState resp = coordinatorClient.task(new TaskRequest("fooSpec"));
            Assertions.assertEquals((Object)expectedState, (Object)resp);
        }
    }

    @Test
    public void testTaskRequestWithFutureStartMsDoesNotGetRun() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node02").scheduler((Scheduler)scheduler).build();){
            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000L, 500L);
            time.sleep(999L);
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            coordinatorClient.createTask(new CreateTaskRequest("fooSpec", (TaskSpec)fooSpec));
            TaskState expectedState = new ExpectedTasks.ExpectedTaskBuilder("fooSpec").taskState((TaskState)new TaskPending((TaskSpec)fooSpec)).build().taskState();
            TaskState resp = coordinatorClient.task(new TaskRequest("fooSpec"));
            Assertions.assertEquals((Object)expectedState, (Object)resp);
        }
    }

    @Test
    public void testTaskRequest() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node02").scheduler((Scheduler)scheduler).build();){
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1L, 10L);
            coordinatorClient.createTask(new CreateTaskRequest("foo", (TaskSpec)fooSpec));
            TaskState expectedState = new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskPending((TaskSpec)fooSpec)).build().taskState();
            TaskState resp = coordinatorClient.task(new TaskRequest("foo"));
            Assertions.assertEquals((Object)expectedState, (Object)resp);
            time.sleep(2L);
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).build()).waitFor(coordinatorClient).waitFor(cluster.agentClient("node02"));
            Assertions.assertThrows(NotFoundException.class, () -> coordinatorClient.task(new TaskRequest("non-existent-foo")));
        }
    }

    @Test
    public void testWorkersExitingAtDifferentTimes() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().addCoordinator("node01").addAgent("node02").addAgent("node03").scheduler((Scheduler)scheduler).build();){
            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
            new ExpectedTasks().waitFor(coordinatorClient);
            HashMap<String, Long> nodeToExitMs = new HashMap<String, Long>();
            nodeToExitMs.put("node02", 10L);
            nodeToExitMs.put("node03", 20L);
            SampleTaskSpec fooSpec = new SampleTaskSpec(2L, 100L, nodeToExitMs, "");
            coordinatorClient.createTask(new CreateTaskRequest("foo", (TaskSpec)fooSpec));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskPending((TaskSpec)fooSpec)).build()).waitFor(coordinatorClient);
            time.sleep(2L);
            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
            status1.set("node02", (JsonNode)new TextNode("active"));
            status1.set("node03", (JsonNode)new TextNode("active"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 2L, (JsonNode)status1)).workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).build()).waitFor(coordinatorClient).waitFor(cluster.agentClient("node02")).waitFor(cluster.agentClient("node03"));
            time.sleep(10L);
            ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
            status2.set("node02", (JsonNode)new TextNode("halted"));
            status2.set("node03", (JsonNode)new TextNode("active"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 2L, (JsonNode)status2)).workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 2L, (JsonNode)new TextNode("active"))).build()).waitFor(coordinatorClient).waitFor(cluster.agentClient("node03"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskRunning((TaskSpec)fooSpec, 2L, (JsonNode)status2)).workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, 2L, 12L, (JsonNode)new TextNode("halted"), "")).build()).waitFor(cluster.agentClient("node02"));
            time.sleep(10L);
            ObjectNode status3 = new ObjectNode(JsonNodeFactory.instance);
            status3.set("node02", (JsonNode)new TextNode("halted"));
            status3.set("node03", (JsonNode)new TextNode("halted"));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").taskState((TaskState)new TaskDone((TaskSpec)fooSpec, 2L, 22L, "", false, (JsonNode)status3)).build()).waitFor(coordinatorClient);
        }
    }

    public static class ExpectedLines {
        List<String> expectedLines = new ArrayList<String>();

        public ExpectedLines addLine(String line) {
            this.expectedLines.add(line);
            return this;
        }

        public ExpectedLines waitFor(String nodeName, CapturingCommandRunner runner) throws InterruptedException {
            TestUtils.waitForCondition(() -> this.linesMatch(nodeName, runner.lines(nodeName)), (String)("failed to find the expected lines " + String.valueOf(this)));
            return this;
        }

        private boolean linesMatch(String nodeName, List<String> actualLines) {
            int matchIdx = 0;
            int i = 0;
            while (true) {
                if (matchIdx == this.expectedLines.size()) {
                    log.debug("Got expected lines for {}", (Object)nodeName);
                    return true;
                }
                if (i == actualLines.size()) {
                    log.info("Failed to find the expected lines for {}.  First missing line on index {}: {}", new Object[]{nodeName, matchIdx, this.expectedLines.get(matchIdx)});
                    return false;
                }
                String actualLine = actualLines.get(i++);
                String expectedLine = this.expectedLines.get(matchIdx);
                if (expectedLine.equals(actualLine)) {
                    ++matchIdx;
                    continue;
                }
                log.trace("Expected:\n'{}', Got:\n'{}'", (Object)expectedLine, (Object)actualLine);
                matchIdx = 0;
            }
        }

        public String toString() {
            return String.join((CharSequence)", ", this.expectedLines);
        }
    }
}

